你必须理解的第一件事是,像这样的代码
try (Connection connection = dataSource.getConnection()) {
…
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
…
return stream;
}
}
不起作用,因为当您离开块时,资源已关闭,而处理的甚至尚未开始。try
Stream
资源管理构造“尝试使用资源”适用于方法内块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,并且调用方负责关闭 .Stream
此外,您需要一个函数,该函数从 中的单行生成一个项目。假设,你有一个像这样的方法ResultSet
Record createRecord(ResultSet rs) {
…
}
你可以创建一个基本上喜欢Stream<Record>
Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
}
}, false);
但是要正确地做到这一点,你必须合并异常处理和关闭资源。您可以使用 来注册将在关闭时执行的操作,但它必须是一个不能引发已检查的异常。同样,该方法不允许引发选中的异常。而且由于我们不能简单地在这里嵌套块,因此当已经存在挂起的异常时,在 中抛出的抑制异常的程序逻辑不会免费出现。Stream.onClose
Stream
Runnable
tryAdvance
try(…)
close
为了帮助我们,我们引入了一个新类型,它可以包装关闭操作,该操作可能会引发已检查的异常,并将它们包装在未经检查的异常中。通过实现自身,它可以利用该结构安全地链接关闭操作:AutoCloseable
try(…)
interface UncheckedCloseable extends Runnable, AutoCloseable {
default void run() {
try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
}
static UncheckedCloseable wrap(AutoCloseable c) {
return c::close;
}
default UncheckedCloseable nest(AutoCloseable c) {
return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
}
}
这样,整个操作就变成了:
private Stream<Record> tableAsStream(DataSource dataSource, String table)
throws SQLException {
UncheckedCloseable close=null;
try {
Connection connection = dataSource.getConnection();
close=UncheckedCloseable.wrap(connection);
String sql = "select * from " + table;
PreparedStatement pSt = connection.prepareStatement(sql);
close=close.nest(pSt);
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
close=close.nest(resultSet);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(close);
} catch(SQLException sqlEx) {
if(close!=null)
try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
throw sqlEx;
}
}
此方法包装所有资源所需的关闭操作,并在上述实用程序类的一个实例中包装。如果在初始化期间发生异常,则会立即执行关闭操作,并将异常传递给调用方。如果流构造成功,则通过 注册关闭操作。Connection
Statement
ResultSet
onClose
因此,调用方必须确保正确关闭,例如
try(Stream<Record> s=tableAsStream(dataSource, table)) {
// stream operation
}
请注意,该方法中还添加了过孔的传递。因此,您现在可以毫无问题地添加到该方法中。SQLException
RuntimeException
tryAdvance
throws SQLException
createRecord