无论如何,我无法在Hibernate中批处理MySQL INSERT语句

2022-09-01 12:12:29

我目前正面临着众所周知的常见休眠插入批处理问题。

我需要保存 500 万行长的批次。我首先尝试使用更轻的有效载荷。由于我必须插入只有 2 种类型的实体(首先是 A 类型的所有记录,然后是 B 类型的所有记录,都指向通用类型 C 父项),因此我想充分利用 JDBC 批量插入的优势。ManyToOne

我已经阅读了很多文档,但是我尝试过的任何文档都没有工作。

  • 我知道为了使用批处理插入,我不得使用实体生成器。所以我删除了ID,我用一个技巧设置ID:每次都递增。AUTO_INCREMENTSELECT MAX(ID) FROM ENTITIES
  • 我知道我必须定期刷新会话。我会提前发布代码,但无论如何,我每500个元素执行一次交易。
  • 我知道我必须设置与应用程序的批量大小一致,所以我将其设置为(Spring ORM集成)hibernate.jdbc.batch_sizeLocalSessionFactoryBean
  • 我知道我必须启用在连接URL中重写批处理语句。

这是我的实体

公共父实体。这将首先插入到单个事务中。我在这里不关心自动增量列。每个批处理作业只有条记录

@Entity
@Table(...)
@SequenceGenerator(...)
public class Deal
{

    @Id
    @Column(
            name = "DEAL_ID",
            nullable = false)
    @GeneratedValue(
            strategy = GenerationType.AUTO)
    protected Long id;

    ................
}

其中一个子级(假设每批 250 万条记录)

@Entity
@Table(
        name = "TA_LOANS")
public class Loan
{

    @Id
    @Column(
            name = "LOAN_ID",
            nullable = false)
    protected Long id;

    @ManyToOne(
            optional = false,
            targetEntity = Deal.class,
            fetch = FetchType.LAZY)
    @JoinColumn(
            name = "DEAL_ID",
            nullable = false)
    protected Deal deal;


    .............
}

其他子类型。假设其他250万条记录

@Entity
@Table(
        name = "TA_BONDS")
public class Bond
{

    @Id
    @Column(
            name = "BOND_ID")

    @ManyToOne(
            fetch = FetchType.LAZY,
            optional = false,
            targetEntity = Deal.class)
    @JoinColumn(
            name = "DEAL_ID",
            nullable = false,
            updatable = false)
    protected Deal deal;

}

插入记录的简化代码

    long loanIdCounter = loanDao.getMaxId(), bondIdCounter = bondDao.getMaxId(); //Perform SELECT MAX(ID)

    Deal deal = null;

    List<Bond> bondList = new ArrayList<Bond>(COMMIT_BATCH_SIZE); //500 constant value
    List<Loan> loanList = new ArrayList<Loan>(COMMIT_BATCH_SIZE);

    for (String msg: inputStreamReader)
    {
        log.debug(msg.toString());

        if (this is a deal)
        {
            Deal deal = parseDeal(msg.getMessage());

            deal = dealManager.persist(holder.deal); //Called in a separate transaction using Spring annotation @Transaction(REQUIRES_NEW)

        }
        else if (this is a loan)
        {

            Loan loan = parseLoan(msg.getMessage());
            loan.setId(++loanIdCounter);
            loan.setDeal(deal);

            loanList.add(loan);

            if (loanList.size() == COMMIT_BATCH_SIZE)
            {
                loanManager.bulkInsert(loanList); //Perform a bulk insert in a single transaction, not annotated but handled manually this time
                loanList.clear();
            }
        }
        else if (this is a bond)
        {
            Bond bond = parseBond(msg.getMessage());
            bond.setId(++bondIdCounter);
            bond.setDeal(deal);

            bondList.add(bond);



            if (bondList.size() == COMMIT_BATCH_SIZE) //As above
            {
                bondManager.bulkInsert(bondList);
                bondList.clear();

            }
        }
    }

    if (!bondList.isEmpty())
        bondManager.bulkInsert(bondList);
    if (!loanList.isEmpty())
        loanManager.bulkInsert(loanList);
    //Flush remaining items, not important

实现:bulkInsert

@Override
public void bulkInsert(Collection<Bond> bonds)
{
    // StatelessSession session = sessionFactory.openStatelessSession();
    Session session = sessionFactory.openSession();
    try
    {
        Transaction t = session.beginTransaction();
        try
        {
            for (Bond bond : bonds)
                // session.persist(bond);
                // session.insert(bond);
                session.save(bond);
        }
        catch (RuntimeException ex)
        {
            t.rollback();
        }
        finally
        {
            t.commit();
        }
    }
    finally
    {
        session.close();
    }

}

正如您从评论中看到的,我已经尝试了有状态/无状态的几种组合。没有一个奏效。session

My 是具有以下 URL 的组合池数据源dataSource

<b:property name="jdbcUrl" value="jdbc:mysql://server:3306/db?autoReconnect=true&amp;rewriteBatchedStatements=true" />

SessionFactory

<b:bean id="sessionFactory" class="class.that.extends.org.springframework.orm.hibernate3.LocalSessionFactoryBean" lazy-init="false" depends-on="dataSource">
        <b:property name="dataSource" ref="phoenixDataSource" />
        <b:property name="hibernateProperties">
            <b:props>
                <b:prop key="hibernate.dialect">${hibernate.dialect}</b:prop> <!-- MySQL5InnoDb-->
                <b:prop key="hibernate.show_sql">${hibernate.showSQL}</b:prop>
                <b:prop key="hibernate.jdbc.batch_size">500</b:prop>
                <b:prop key="hibernate.jdbc.use_scrollable_resultset">false</b:prop>
                <b:prop key="hibernate.cache.use_second_level_cache">false</b:prop>
                <b:prop key="hibernate.cache.provider_class">org.hibernate.cache.EhCacheProvider</b:prop>
                <b:prop key="hibernate.cache.use_query_cache">false</b:prop>
                <b:prop key="hibernate.validator.apply_to_ddl">false</b:prop>
                <b:prop key="hibernate.validator.autoregister_listeners">false</b:prop>
                <b:prop key="hibernate.order_inserts">true</b:prop>
                <b:prop key="hibernate.order_updates">true</b:prop>
            </b:props>
        </b:property>
</b:bean>

即使我的项目范围类扩展,它也不会覆盖其方法(只添加一些项目范围的方法)LocalSessionFactoryBean

几天后我就生气了。我读了几篇文章,但没有一篇帮助我启用批量插入。我从使用Spring上下文检测的JUnit测试中运行我的所有代码(所以我可以我的类)。我所有的尝试都只产生了很多单独的陈述。@AutowireINSERT

我错过了什么?


答案 1

您的查询可能正在重写,但通过查看Hibernate SQL日志,您不知道是否。休眠不会重写插入语句 - MySQL驱动程序会重写它们。换句话说,Hibernate 将向驱动程序发送多个插入语句,然后驱动程序将重写它们。因此,休眠日志仅显示驱动程序的 SQL 休眠发送到驱动程序,而不显示驱动程序发送到数据库的 SQL。

您可以通过在连接url中启用MySQL的profileSQL参数来验证这一点:

<b:property name="jdbcUrl" value="jdbc:mysql://server:3306/db?autoReconnect=true&amp;rewriteBatchedStatements=true&amp;profileSQL=true" />

使用一个类似于您的示例,这是我的输出的样子:

insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
Wed Feb 05 13:29:52 MST 2014 INFO: Profiler Event: [QUERY]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) duration: 1 ms, connection-id: 81, statement-id: 33, resultset-id: 0, message: insert into Person (firstName, lastName, id) values ('person1', 'Name', 1),('person2', 'Name', 2),('person3', 'Name', 3),('person4', 'Name', 4),('person5', 'Name', 5),('person6', 'Name', 6),('person7', 'Name', 7),('person8', 'Name', 8),('person9', 'Name', 9),('person10', 'Name', 10)

前10行由Hibernate记录,尽管这不是实际发送到MySQL数据库的内容。最后一行来自MySQL驱动程序,它清楚地显示了具有多个值的单个批处理插入,这就是实际发送到MySQL数据库的内容。


答案 2

推荐