在流 API 中使用自动关闭接口

今天,我试图重构这段代码,从目录中的文件读取ID,

Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) {
    InputStream stream = fileSystem.openInputStream(fileName);
    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
    String line;
    while ((line = br.readLine()) != null) {
        ids.add(Long.valueOf(line.trim()));
    }
    br.close();
}

使用流 API

Set<Long> ids = fileSystem.list("my-directory").stream()
    .map(fileName -> fileSystem::openInputStream)
    .map(is -> new BufferedReader(new InputStreamReader(is)))
    .flatMap(BufferedReader::lines)
    .map(String::trim)
    .map(Long::valueOf)
    .collect(Collectors.toSet());

然后我发现IO流不会被关闭,我没有看到关闭它们的简单方法,因为它们是在管道内创建的。

有什么想法吗?

更新:FileSystem的例子是HDFS,类似的方法不能使用。Files#lines


答案 1

一旦流的所有元素都被消耗殆尽,就可以挂接到流中以“关闭”资源。因此,在阅读完所有行后,可以通过以下修改关闭读取器:

.flatMap(reader -> reader.lines().onClose(() -> close(reader)))

其中处理 IOException。close(AutoClosable)

作为概念证明,以下代码和输出已经过测试:

import java.util.stream.Stream;

class Test {
    public static void main(String[] args) {
        Stream.of(1, 2, 3).flatMap(i ->
                Stream.of(i, i * 2).onClose(() ->
                        System.out.println("Closed!")
                )
        ).forEach(System.out::println);
    }
}

1
2
Closed!
2
4
Closed!
3
6
Closed!

答案 2

为什么不更简单一点,通过:Files.lines

try (Stream<String> s = Files.lines(Paths.get("yourpath" + fileName))) {
    s.map(String::trim)
      .map(Long::valueOf)
      .collect(Collectors.toSet());
}

推荐