最简单的想法可能是这样的:
有一个固定的 s 映射。使用哈希机制根据任务 ID 选择队列。哈希算法应为相同的 ID 选择相同的队列。为每个队列启动一个线程。每个线程将从自己的专用队列中选择一个任务并执行它。BlockingQueue
p.s. 适当的解决方案很大程度上取决于您分配给线程的工作类型
更新
好吧,这个疯狂的想法怎么样,请忍受我:)
比如说,我们有一个保存参考文献的ConcurrentHashMap
id -> OrderQueue
ID1->Q1, ID2->Q2, ID3->Q3, ...
这意味着现在每个都与自己的队列相关联。 是带有附加布尔标志的自定义阻塞队列 - 。id
OrderQueue
isAssociatedWithWorkingThread
还有一个常规的,我们现在将调用,你稍后会看到它的使用。BlockingQueue
amortizationQueue
接下来,我们有工作线程。每个工作线程都有自己的工作队列,该队列是与此线程关联的包含 ID。N
BlockingQueue
当新 ID 出现时,我们会执行以下操作:
create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue
当现有ID的更新到来时,我们执行以下操作:
pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
put this OrderQueue to amortizationQueue
每个工作线程执行以下操作:
take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue
非常简单。现在进入有趣的部分 - 工作偷窃:)
如果在某个时间点,某个工作线程发现自己有空的工作队列,那么它会执行以下操作:
go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution
还有+1个额外的线程,提供摊销工作:
while (true)
take next OrderQueue from amortizationQueue
if queue is not empty and isAssociatedWithWorkingThread == false
set isAssociatedWithWorkingThread=true
pick any working thread and add the id to it's working queue
将不得不花更多的时间思考你是否可以逃脱标志,或者有必要让它阻塞操作来检查/更改这个标志。AtomicBoolean
isAssociatedWithWorkingThread