如何正确处理更新数据库中同一行的两个线程

我有一个线程需要读取平面文件并解析它。我需要创建一个用于解析此文件某些部分的新线程,稍后此线程将需要更新原始实体的状态,该实体也正在由原始线程解析和更新。我该如何处理这种情况?T1T2T2T1

我收到一个包含以下示例记录的平面文件:

AAAA
BBBB
AACC
BBCC
AADD
BBDD

首先,此文件以状态保存在数据库中。现在,所有以 开头或以 开头的记录都需要在单独的线程中处理。成功解析后,两个线程都将尝试将数据库中此文件对象的状态更新为 。在某些情况下,我会得到.编辑:以及任何线程在异常丢失之前完成的工作。我们正在使用乐观锁定。避免此问题的最佳方法是什么?ReceivedBBAAParsedstaleObjectException

当两个线程更新同一对象时,可能的休眠异常?

上面的帖子有助于理解其中的某些部分,但它无助于解决我的问题。


答案 1

第 1 部分 - 您的问题

您收到此异常的主要原因是您正在使用具有乐观锁定功能的Hibernate。这基本上告诉您,线程 T1 或线程 T2 已将状态更新为 PARSED,现在另一个线程正在保存旧版本的行,其版本比数据库中保存的版本更小,并尝试将状态更新为 PARSED

这里的问题是“两个线程是否试图保留相同的数据?如果答案是肯定的,那么即使上次更新成功,也不应该有任何问题,因为最终它们会将行更新到相同的状态。在这种情况下,您不需要乐观锁定,因为无论如何,您的数据都将保持同步。

如果将状态设置为 RECIEVED 后,如果两个线程 T1 和 T2 在重置为下一个状态时实际上相互依赖,则主要问题就来了。在这种情况下,您需要确保如果 T1 已先执行(反之亦然),则 T2 需要刷新已更新行的数据,并根据 T1 已推送的更改重新应用其更改。在这种情况下,解决方案如下。如果遇到过时的ObjectException,您基本上需要从数据库中刷新数据并重新启动操作。

第 2 部分 分析了链接上发布 当两个线程更新同一对象时可能存在休眠异常? 方法 1,这或多或少是最后一次更新 Wins 的情况。它或多或少地避免了乐观锁定(版本计数)。如果您没有从 T1 到 T2 的依赖关系,或者为了设置状态解析而反向操作。这应该很好。

Aproach 2 乐观锁定这就是你现在拥有的。解决方案是刷新数据并重新启动操作。

Aproach 3 Row Level DB Lock这里的解决方案或多或少与方法2的解决方案相同,具有悲观锁定过程中的小修正。主要区别在于,在这种情况下,它可能是一个 READ 锁,如果它是悲观的 READ,您甚至可能无法从数据库中读取数据以刷新它。

Aproach 4 应用程序级同步有许多不同的方法可以执行同步。一个例子是将所有更新实际安排在BlockingQueue或JMS队列中(如果您希望它是持久的),并从单个线程推送所有更新。为了可视化它,T1和T2将元素放在队列上,并且将有一个T3线程读取操作并将它们推送到数据库服务器。

如果使用应用程序级同步,则应注意,在多服务器部署中不能分发所有结构。

好吧,我现在想不出别的了,:)


答案 2

我不确定我是否理解这个问题,但似乎它会构成线程T1的逻辑错误,该线程T1仅处理以AA开头的记录以将整个文件标记为“已解析”?例如,如果您的应用程序在 T1 更新后崩溃,但 T2 仍在处理 BB 记录,会发生什么情况?有些BB记录很可能会丢失,对吗?

无论如何,问题的症结在于您有一个争用条件,其中两个线程更新同一对象。过时的对象异常仅表示您的一个线程输掉了比赛。更好的解决方案可以完全避免比赛。

(我在这里假设单个记录处理是幂等的,如果不是这种情况,我认为你有更大的问题,因为某些故障模式会导致记录的重新处理。如果记录处理必须进行一次且只进行一次,那么您会遇到一个更难的问题,对于该问题,消息队列可能是更好的解决方案。

我将利用java.util.concurrent的功能将记录分派给线程工作线程,并让线程与休眠块进行交互,直到所有记录都被处理完毕,此时该线程可以将文件标记为“已解析”。

例如

// do something like this during initialization, or use a Guava LoadingCache...
Map<RecordType, Executor> executors = new HashMap<>();
// note I'm assuming RecordType looks like an enum
executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor());

然后在处理文件时,按如下方式分派每条记录,建立与排队任务状态相对应的未来列表。假设成功处理记录返回布尔值“true”:

List<Future<Boolean>> tasks = new ArrayList<>();
for (Record record: file.getRecords()) {
    Executor executorForRecord = executors.get(record.getRecordType());
    tasks.add(executor.submit(new RecordProcessor(record)));
}

现在等待所有任务成功完成 - 有更优雅的方法来做到这一点,特别是番石榴。请注意,如果你的任务失败并出现异常,你还需要在这里处理执行异常,我在这里掩盖了这一点。

boolean allSuccess = true;
for (Future<Boolean> task: tasks) {
    allSuccess = allSuccess && task.get();
    if (!allSuccess) break;
}

// if all your tasks completed successfully, update the file record
if (allSuccess) {
    file.setStatus("Parsed");
}

推荐