JOOQ & Transactions

2022-09-03 00:39:44

我一直在阅读有关事务和jooq的信息,但我很难看到如何在实践中实现它。

假设我向 JOOQ 提供了一个自定义,该自定义恰好使用自动提交设置为 false 的连接池。ConnectionProvider

实现大致如下:

@Override public Connection acquire() throws DataAccessException {
    return pool.getConnection();
}

@Override public void release(Connection connection) throws DataAccessException {
    connection.commit();
    connection.close();
}

如何将两个 jooq 查询包装到单个事务中?

使用DefaultConnectionProvider很容易,因为只有一个连接 - 但是对于池,我不知道如何去做。


答案 1

jOOQ 3.4 事务 API

在 jOOQ 3.4 中,在 JDBC、Spring 或 JTA 事务管理器上添加了一个事务 API 来抽象。此 API 可以与 Java 8 一起使用,如下所示:

DSL.using(configuration)
   .transaction(ctx -> {
       DSL.using(ctx)
          .update(TABLE)
          .set(TABLE.COL, newValue)
          .where(...)
          .execute();
   });

或者使用 Java 8 之前的语法

DSL.using(configuration)
   .transaction(new TransactionRunnable() {
       @Override
       public void run(Configuration ctx) {
           DSL.using(ctx)
              .update(TABLE)
              .set(TABLE.COL, newValue)
              .where(...)
              .execute();
       }
   });

这个想法是lambda表达式(或匿名类)形成事务代码,其:

  • 正常完成时提交
  • 异常时回滚

org.jooq.TransactionProvider SPI 可用于覆盖默认行为,该行为使用 Savepoints 通过 JDBC 实现可嵌套事务。

弹簧示例

当前文档显示了使用 Spring 进行事务处理时的示例:

这个例子基本上可以归结为使用弹簧。TransactionAwareDataSourceProxy

<!-- Using Apache DBCP as a connection pooling library.
     Replace this with your preferred DataSource implementation -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
    init-method="createDataSource" destroy-method="close">
    <property name="driverClassName" value="org.h2.Driver" />
    <property name="url" value="jdbc:h2:~/maven-test" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

<!-- Using Spring JDBC for transaction management -->
<bean id="transactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="transactionAwareDataSource"
    class="org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy">
    <constructor-arg ref="dataSource" />
</bean>

<!-- Bridging Spring JDBC data sources to jOOQ's ConnectionProvider -->
<bean class="org.jooq.impl.DataSourceConnectionProvider" 
      name="connectionProvider">
    <constructor-arg ref="transactionAwareDataSource" />
</bean>

GitHub 提供了一个运行示例:如下所示:

Spring and Guice 示例

虽然我个人不会推荐它,但一些用户已经成功地用Guice替换了Spring的DI的一部分,并与Guice一起处理交易。GitHub 上还有一个经过集成测试的运行示例,用于此用例:


答案 2

这可能不是最好的方法,但它似乎有效。需要注意的是,关闭连接并将其返回到池的方法不是但是方法,这非常令人困惑,如果某些代码“忘记”提交,则可能导致问题...releasecommit

所以客户端代码看起来像这样:

final PostgresConnectionProvider postgres =
            new PostgresConnectionProvider("localhost", 5432, params.getDbName(), params.getUser(), params.getPass())

private static DSLContext sql = DSL.using(postgres, SQLDialect.POSTGRES, settings);

//execute some statements here
sql.execute(...);

//and don't forget to commit or the connection will not be returned to the pool
PostgresConnectionProvider p = (PostgresConnectionProvider) sql.configuration().connectionProvider();
p.commit();

和连接提供者:

public class PostgresConnectionProvider implements ConnectionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresConnectionProvider.class);

    private final ThreadLocal<Connection> connections = new ThreadLocal<>();
    private final BoneCP pool;

    public PostgresConnectionProvider(String serverName, int port, String schema, String user, String password) throws SQLException {
        this.pool = new ConnectionPool(getConnectionString(serverName, port, schema), user, password).pool;
    }

    private String getConnectionString(String serverName, int port, String schema) {
        return "jdbc:postgresql://" + serverName + ":" + port + "/" + schema;
    }

    public void close() {
        pool.shutdown();
    }

    public void commit() {
        LOG.debug("Committing transaction in {}", Thread.currentThread());
        try {
            Connection connection = connections.get();
            if (connection != null) {
                connection.commit();
                connection.close();
                connections.set(null);
            }
        } catch (SQLException ex) {
            throw new DataAccessException("Could not commit transaction in postgres pool", ex);
        }
    }

    @Override
    public Connection acquire() throws DataAccessException {
        LOG.debug("Acquiring connection in {}", Thread.currentThread());
        try {
            Connection connection = connections.get();
            if (connection == null) {
                connection = pool.getConnection();
                connection.setAutoCommit(false);
                connections.set(connection);
            }
            return connection;
        } catch (SQLException ex) {
            throw new DataAccessException("Can't acquire connection from postgres pool", ex);
        }
    }

    @Override
    //no-op => the connection won't be released until it is commited
    public void release(Connection connection) throws DataAccessException {
        LOG.debug("Releasing connection in {}", Thread.currentThread());
    }
}

推荐