使用执行器服务控制任务执行顺序

2022-09-01 05:21:49

我有一个进程,它将异步任务委托给线程池。我需要确保某些任务按顺序执行。例如

任务按顺序到达

任务 a1, b1, c1, d1 , e1, a2, a3, b2, f1

任务可以按任何顺序执行,除非存在自然依赖性,因此必须通过分配到同一线程或阻止这些线程来按该顺序处理a1,a2,a3,直到我知道上一个a#任务已完成。

目前它不使用Java并发包,但我正在考虑进行更改以充分利用线程管理。

有没有人有类似的解决方案或关于如何实现这一目标的建议


答案 1

我编写自己的执行器,保证对具有相同键的任务进行排序。它对具有相同键的订单任务使用队列映射。每个键控任务使用相同的键执行下一个任务。

此解决方案不处理 RejectedExecutionException 或来自委托执行器的其他异常!因此,委托执行器应该是“无限的”。

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;

/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{

    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();

    public OrderingExecutor(Executor delegate){
        this.delegate = delegate;
    }

    @Override
    public void execute(Runnable task) {
        // task without key can be executed immediately
        delegate.execute(task);
    }

    public void execute(Runnable task, Object key) {
        if (key == null){ // if key is null, execute without ordering
            execute(task);
            return;
        }

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks){
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null){
                dependencyQueue = new LinkedList<Runnable>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
                dependencyQueue.add(wrappedTask);
        }

        // execute method can block, call it outside synchronize block
        if (first)
            delegate.execute(wrappedTask);

    }

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask implements Runnable{

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            try{
                task.run();
            } finally {
                Runnable nextTask = null;
                synchronized (keyedTasks){
                    if (dependencyQueue.isEmpty()){
                        keyedTasks.remove(key);
                    }else{
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask!=null)
                    delegate.execute(nextTask);
            }
        }
    }
}

答案 2

当我过去这样做时,我通常由一个组件处理排序,然后向执行器提交可调用/可运行项。

类似的东西。

  • 有一个要运行的任务列表,其中一些具有依赖项
  • 创建执行程序并使用执行程序包装完成服务
  • 搜索所有任务,任何没有依赖关系的任务,通过完成服务安排它们
  • 轮询完成服务
  • 随着每个任务的完成
    • 将其添加到“已完成”列表
    • 重新评估任何等待任务,直到“已完成的列表”,以查看它们是否“依赖关系完成”。如果是这样,请安排它们
    • 重复冲洗,直到提交/完成所有任务

完成服务是一种很好的方式,可以在任务完成时获得任务,而不是试图轮询一堆期货。但是,您可能希望在通过完成服务计划任务时保留一个填充的,以便当完成服务为您提供完整的未来时,您可以弄清楚它是哪个。Map<Future, TaskIdentifier>TaskIdentifier

如果您发现自己处于任务仍在等待运行的状态,但没有任何东西正在运行,也没有可以安排任何内容,那么您就有一个循环依赖关系问题。


推荐