等待异步任务的安全有效方式

2022-09-03 14:43:51

在系统中,我有一个对象 - 让我们称之为 。它保存任务队列,这些任务由某些线程池执行( + )每个任务的结果都保存在数据库中的某个唯一标识符下。TaskProcessorExecutorServicePriorityBlockingQueue

知道此唯一标识符的用户可以检查此任务的结果。结果可能位于数据库中,但任务仍可能在队列中等待执行。在这种情况下,应等到任务完成。UserThread

此外,以下假设是有效的:

  • 其他人可以将任务排队,如果某个随机者知道唯一标识符,则可以访问结果。TaskProcessorUserThread

  • UserThread并在同一应用中。 包含一个线程池,并且只是 servlet 线程。TaskProcessTaskProcessorUserThread

  • UserThread在询问结果时应被阻止,并且结果尚未完成。 应在完成按唯一标识符分组的任务(或多个任务)后立即取消阻止UserThreadTaskProcessor

我的第一次尝试(天真的尝试)是在循环中检查结果并睡一段时间:

// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
  sleep(someTime)

但我不喜欢它。首先,我正在浪费数据库连接。此外,如果任务将在睡眠后立即完成,则即使结果刚刚出现,用户也将等待。

下一次尝试基于等待/通知:

//UserThread 
while (!checkResultIsInDatabase())
  taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()

但我也不喜欢它。如果有更多的人使用,那么每次完成某些任务时,它们都会被不必要地唤醒,而且 - 它们将进行不必要的数据库调用。UserThreadsTaskProcessor

最后一次尝试是基于我称之为:waitingRoom

//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
  mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
  getMutexFromWaitingRoom(taskUniqueIdentifier).notify()

但它似乎并不安全。在数据库检查 和 之间,任务可以完成(不会有效,因为尚未调用),这可能会以死锁结束。wait()notify()UserThreadwait()

看来,我应该在某个地方同步它。但我担心它不会有效。有没有办法纠正我的任何尝试,使它们安全有效?或者也许还有其他更好的方法来做到这一点?


答案 1

你似乎在寻找某种未来/承诺抽象。看看ComppletableFuture,自Java 8以来可用。

CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
        .exceptionally(ex -> logger.error("err", ex));

// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error

还可以从异步操作返回(有意义的)值,并将结果传递给回调。要利用这一点,请查看 、 、 等。supplyAsync()thenAccept()thenApply()whenComplete()

您还可以将多个期货合并为一个甚至更多。


答案 2

我相信用 in 方法替换可以防止僵局。mutexCountDownLatchwaitingRoom

CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
  // consider timed version
  latch.await()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
  getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()

推荐