Java 中 MySQL Insert 语句的性能:批处理模式预准备语句与具有多个值的单插入语句

2022-09-01 10:36:07

我正在设计一个数据库,该数据库需要每秒处理各种InnoDB表的约600行插入。我当前的实现使用非批处理预准备语句。但是,写入数据库瓶颈和我的队列大小会随着时间的推移而增加。MySQLMySQL

实现是用Java编写的,我不知道手头的版本。它使用 的 Java 连接器。我需要考虑切换到明天。我假设这是两个不同的连接器包。MySQLJDBC

我已经阅读了以下关于这个问题的帖子:

并从 mysql 站点:

我的问题是:

  • 是否有人对在批处理模式下使用 INSERT 和预准备语句与使用具有多个 VALUE 的单个语句的性能差异有建议或经验。INSERT

  • Java 连接器与 .我应该使用一个还是另一个?MySQLJDBC

  • 这些表用于存档目的,并且将看到~90%的写入到~10%的读取(甚至可能更少)。我正在使用InnoDB。这是MyISAM的正确选择吗?

提前感谢您的帮助。


答案 1

JDBC 只是一个 Java SE 数据库访问标准,提供标准接口,因此您不会真正绑定到特定的 JDBC 实现。MySQL Java connector (Connector/J) 是仅用于 MySQL 数据库的 JDBC 接口的实现。根据经验,我参与了一个使用MySQL使用大量数据的项目,我们更喜欢MyISAM来生成数据:它允许实现更高性能的损失事务,但一般来说,MyISAM更快,但InnoDB更可靠。

大约一年前,我也想知道INSERT语句的性能,并在我的代码架中发现了以下旧的测试代码(抱歉,它有点复杂,有点超出你的问题范围)。下面的代码包含插入测试数据的 4 种方法的示例:

  • s;INSERT
  • 批量 s;INSERT
  • 手动批量(切勿使用它 - 这很危险);INSERT
  • 最后准备散装)。INSERT

它使用TestNG作为运行器,并使用一些自定义代码遗产,例如:

  • 方法 - 确保在执行回调后关闭连接或放回连接池(但下面的代码使用不可靠的语句关闭策略 - 即使没有/来减少代码);runWithConnection()tryfinally
  • IUnsafeIn<T, E extends Throwable>- 一个自定义回调接口,用于接受单个参数但可能引发类型 E 异常的方法,例如:.void handle(T argument) throws E;
package test;

import test.IUnsafeIn;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;

import core.SqlBaseTest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public final class InsertVsBatchInsertTest extends SqlBaseTest {

    private static final int ITERATION_COUNT = 3000;

    private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB";
    private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1";
    private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1";

    private static void withinTimer(String name, Runnable runnable) {
        final long start = currentTimeMillis();
        runnable.run();
        logStdOutF("%20s: %d ms", name, currentTimeMillis() - start);
    }

    @BeforeSuite
    public void createTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @AfterSuite
    public void dropTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @BeforeTest
    public void clearTestTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @Test
    public void run1SingleInserts() {
        withinTimer("Single inserts", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.execute();
                            statement.close();
                        }
                    }
                });
            }
        });
    }

    @Test
    public void run2BatchInsert() {
        withinTimer("Batch insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.addBatch();
                        }
                        statement.executeBatch();
                        statement.close();
                    }
                });
            }
        });
    }

    @Test
    public void run3DirtyBulkInsert() {
        withinTimer("Dirty bulk insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append(format("(%s, %s, '%s')", i, i, i));
                        }
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        statement.execute();
                        statement.close();
                    }
                });
            }
        });
    }

    @Test
    public void run4SafeBulkInsert() {
        withinTimer("Safe bulk insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    private String getInsertPlaceholders(int placeholderCount) {
                        final StringBuilder builder = new StringBuilder("(");
                        for ( int i = 0; i < placeholderCount; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append("?");
                        }
                        return builder.append(")").toString();
                    }

                    @SuppressWarnings("AssignmentToForLoopParameter")
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final int columnCount = 3;
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        final String placeholders = getInsertPlaceholders(columnCount);
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append(placeholders);
                        }
                        final int maxParameterIndex = ITERATION_COUNT * columnCount;
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        int valueIndex = 0;
                        for ( int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++ ) {
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                        }
                        statement.execute();
                        statement.close();
                    }
                });
            }
        });
    }

}

看看用@Test注释注释的方法:它们实际上执行语句。另外,请看一下这个常量:在源代码中,它使用InnoDB在我的计算机上安装了MySQL 5.5(MySQL Connector/J 5.1.12)生成以下结果:INSERTCREATE_TABLE_QUERY

InnoDB
Single inserts: 74148 ms
Batch insert: 84370 ms
Dirty bulk insert: 178 ms
Safe bulk insert: 118 ms

如果将 InnoDB 更改为 MyISAM,则会看到性能显著提高:CREATE_TABLE_QUERY

MyISAM
Single inserts: 604 ms
Batch insert: 447 ms
Dirty bulk insert: 63 ms
Safe bulk insert: 26 ms

希望这有帮助。

UPD:

对于第四种方式,您必须正确自定义in(该部分)以足够大以支持非常大的数据包。max_allowed_packetmysql.ini[mysqld]


答案 2

我知道这个线程已经很老了,但我只是想提一下,如果你在使用mysql时将“rewriteBatchedStatements=true”添加到jdbc url中,那么在使用批处理语句时,它可以带来巨大的性能提升。


推荐