java.util.stream with ResultSet乔奥克

2022-08-31 14:19:28

我有几个包含大量数据的表(大约1亿条记录)。因此,我无法将此数据存储在内存中,但我想使用类流式传输此结果集并将此流传递给另一个类。我读到了和运算符,但它们是内存中的缓冲流。那么有没有办法解决这个问题呢?提前致谢。java.util.streamStream.ofStream.Builder

更新 #1

好吧,我用谷歌搜索并找到了jooq图书馆。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有几个包含大量数据的表。我想流式传输我的结果集并将此流传输到另一个方法。像这样:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

可以请某人建议,我走在正确的路上吗?谢谢。

更新 #2

我在jooq上做了一些实验,现在可以说上面的决定不适合我。此代码花费太多时间record = DSL.using(connection).fetch(resultSet).stream();


答案 1

你必须理解的第一件事是,像这样的代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

不起作用,因为当您离开块时,资源已关闭,而处理的甚至尚未开始。tryStream

资源管理构造“尝试使用资源”适用于方法内块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,并且调用方负责关闭 .Stream


此外,您需要一个函数,该函数从 中的单行生成一个项目。假设,你有一个像这样的方法ResultSet

Record createRecord(ResultSet rs) {
    …
}

你可以创建一个基本上喜欢Stream<Record>

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但是要正确地做到这一点,你必须合并异常处理和关闭资源。您可以使用 来注册将在关闭时执行的操作,但它必须是一个不能引发已检查的异常。同样,该方法不允许引发选中的异常。而且由于我们不能简单地在这里嵌套块,因此当已经存在挂起的异常时,在 中抛出的抑制异常的程序逻辑不会免费出现。Stream.onCloseStreamRunnabletryAdvancetry(…)close

为了帮助我们,我们引入了一个新类型,它可以包装关闭操作,该操作可能会引发已检查的异常,并将它们包装在未经检查的异常中。通过实现自身,它可以利用该结构安全地链接关闭操作:AutoCloseabletry(…)

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作就变成了:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法包装所有资源所需的关闭操作,并在上述实用程序类的一个实例中包装。如果在初始化期间发生异常,则会立即执行关闭操作,并将异常传递给调用方。如果流构造成功,则通过 注册关闭操作。ConnectionStatementResultSetonClose

因此,调用方必须确保正确关闭,例如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,该方法中还添加了过孔的传递。因此,您现在可以毫无问题地添加到该方法中。SQLExceptionRuntimeExceptiontryAdvancethrows SQLExceptioncreateRecord


答案 2

乔奥克

我将回答您问题的jOOQ部分。从 jOOQ 3.8 开始,现在有相当多的附加功能与 jOOQ 与 Stream 相结合。其他用法也记录在此 jOOQ 页面上

您的建议用法:

您尝试了以下操作:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

实际上,这不适用于大型结果集,因为 fetch(ResultSet) 将整个结果集提取到内存中,然后对其调用 Collection.stream()。

更好的(懒惰)用法:

相反,你可以这样写:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

...这基本上是方便的:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

另请参阅 DSLContext.fetchStream(ResultSet)

当然,你也可以让 jOOQ 执行你的 SQL 字符串,而不是与 JDBC 摔跤:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}

可怕的SELECT *

正如评论中批评的那样,它们的jOOQ使用似乎很慢,因为尽管使用了.b,但jOOQ仍然急切地将LOB数据提取到内存中。“懒惰”一词对应于懒惰地(逐个)获取记录,而不是懒惰地获取列数据。一次完全提取记录,假设您确实想要投影整行。fetchLazy()

如果您不需要一些沉重的行,请不要投影它们! 在SQL中几乎总是一个坏主意。缺点:SELECT *

  • 它会导致数据库服务器、网络和客户端中出现更多的 I/O 和内存开销。
  • 它防止覆盖索引的使用
  • 它可防止联接消除转换

此博客文章中的详细信息 请点击此处。.

关于使用资源试用

请注意,由 jOOQ 生成的 a 是“足智多谋的”,即它包含对 open (and ) 的引用。因此,如果您真的想在方法之外返回该流,请确保它已正确关闭!StreamResultSetPreparedStatement


推荐