多线程执行,其中保留已完成工作项的顺序

我有一个工作单元流,让我们称它们为按顺序处理的“工作项”(现在)。我想通过多线程工作来加快处理速度。

约束:这些工作项按特定顺序出现,在处理过程中顺序不相关 - 但处理完成后,必须还原订单。

像这样:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|

我想在Java中解决这个问题,最好没有执行器服务,期货等,但使用基本的并发方法,如wait(),notify()等。

原因是:我的工作项非常小且粒度细,每个工作项的处理时间约为 0.2 毫秒。所以我担心使用java.util.concurrent.*中的东西可能会带来很多开销并减慢我的代码速度。

到目前为止,我发现的示例都在处理过程中保留了顺序(在我的情况下无关紧要),并且不关心处理后的顺序(这在我的情况下至关重要)。


答案 1

这就是我在以前的项目中解决您的问题的方式(但使用java.util.concurrent):

(1) 工作项类执行实际工作/处理:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}

(2) 此类将工作项放入队列中并启动处理:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}

(3) 处理完成后,工作项将在此处取消排队:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}

(4)这是你在代码中使用这些东西并提交新作品的方式:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}

答案 2

如果允许,为什么会忽略 java 中其余的并发实用程序?您可以使用例如 (如果你有java 1.8)用于上述内容:BlockingQueueStream

List<Type> data = ...;
List<Other> out = data.parallelStream()
    .map(t -> doSomeWork(t))
    .collect(Collectors.toList());

由于您是从有序 () 开始的,并且也收集到 一个 ,因此结果的顺序将与输入的顺序相同。CollectionListList


推荐