将给定 ID 的任务绑定到同一线程的线程池

2022-09-02 14:04:58

是否有任何线程池(在 Java 中)的实现可以确保在同一逻辑 ID 上执行同一逻辑 ID 的所有任务?

我所追求的逻辑是,如果给定逻辑ID的特定线程上已经执行了一个任务,那么具有相同ID的新任务将调度在同一线程上。如果没有线程为同一 ID 执行任务,则可以使用任何线程。

这将允许并行执行不相关 ID 的任务,但以串行方式和提交的顺序执行相同 ID 的任务。

如果没有,是否有任何关于如何扩展以获得这种行为的建议(如果可能的话)?ThreadPoolExecutor

更新

在花了更长的时间思考这个问题之后,我实际上并不要求在同一线程上执行同一逻辑ID的任务,只是它们不会同时执行。

这方面的一个例子是处理客户订单的系统,其中可以同时处理多个订单,但不能处理同一客户(并且同一客户的所有订单都必须按顺序处理)。

我目前采用的方法是使用标准的ThreadPoolExecutor,带有自定义的,并且还用自定义包装器包装。包装器逻辑为:BlockingQueueRunnableRunnable

  1. 以原子方式尝试将 ID 添加到并发“正在运行”集 (),以查看当前是否正在运行同一 ID 的任务ConcurrentHashMap
    • 如果添加失败,请将任务推回队列的前面并立即返回
    • 如果成功,请继续
  2. 运行任务
  3. 从“正在运行”集中删除任务的关联 ID

然后,队列的方法仅返回 ID 当前不在“正在运行”集中的任务。poll()

这样做的问题是,我确信会有很多我没有想到的角落案例,所以这将需要大量的测试


答案 1

创建一个执行程序服务数组,每个执行器服务运行一个线程,并通过项目 ID 的哈希代码将队列条目分配给它们。数组可以是任何大小,具体取决于您最多要使用的线程数。

这将限制我们可以从执行器服务中使用的线程,但仍然允许使用其功能在不再需要时关闭唯一的线程(使用),并根据需要重新启动它。此外,所有排队内容都将在不重写的情况下工作。allowCoreThreadTimeOut(true)


答案 2

最简单的想法可能是这样的:

有一个固定的 s 映射。使用哈希机制根据任务 ID 选择队列。哈希算法应为相同的 ID 选择相同的队列。为每个队列启动一个线程。每个线程将从自己的专用队列中选择一个任务并执行它。BlockingQueue

p.s. 适当的解决方案很大程度上取决于您分配给线程的工作类型

更新

好吧,这个疯狂的想法怎么样,请忍受我:)

比如说,我们有一个保存参考文献的ConcurrentHashMapid -> OrderQueue

ID1->Q1, ID2->Q2, ID3->Q3, ...

这意味着现在每个都与自己的队列相关联。 是带有附加布尔标志的自定义阻塞队列 - 。idOrderQueueisAssociatedWithWorkingThread

还有一个常规的,我们现在将调用,你稍后会看到它的使用。BlockingQueueamortizationQueue

接下来,我们有工作线程。每个工作线程都有自己的工作队列,该队列是与此线程关联的包含 ID。NBlockingQueue

当新 ID 出现时,我们会执行以下操作:

create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue

当现有ID的更新到来时,我们执行以下操作:

pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
    put this OrderQueue to amortizationQueue

每个工作线程执行以下操作:

take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue

非常简单。现在进入有趣的部分 - 工作偷窃:)

如果在某个时间点,某个工作线程发现自己有空的工作队列,那么它会执行以下操作:

go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution

还有+1个额外的线程,提供摊销工作:

while (true)
    take next OrderQueue from amortizationQueue
    if queue is not empty and isAssociatedWithWorkingThread == false
         set isAssociatedWithWorkingThread=true
         pick any working thread and add the id to it's working queue

将不得不花更多的时间思考你是否可以逃脱标志,或者有必要让它阻塞操作来检查/更改这个标志。AtomicBooleanisAssociatedWithWorkingThread