非常感谢您的所有建议!
最后,我选择了我认为相当简单的东西。我发现CountDownLatch几乎是我需要的。它会阻塞,直到计数器达到 0。唯一的问题是它只能倒计时,不能向上倒计时,因此在我拥有的动态设置中不起作用,任务可以提交新任务。因此,我实现了一个提供附加功能的新类。(见下文)然后,我按如下方式使用此类。CountLatch
主线程调用,阻塞直到闩锁达到 0。latch.awaitZero()
任何线程,在调用 调用 之前。executor.execute(..)
latch.increment()
任何任务,在完成之前,都会调用 。latch.decrement()
当最后一个任务终止时,计数器将达到 0,从而释放主线程。
我们非常欢迎您提出进一步的建议和反馈!
public class CountLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountLatch(int count) {
this.sync = new Sync(count);
}
public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void increment() {
sync.acquireNonBlocking(1);
}
public void decrement() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
请注意,/调用可以封装到一个自定义的子类中,例如,由Sami Korhonen建议,或者与impl建议的那样。请参阅此处:increment()
decrement()
Executor
beforeExecute
afterExecute
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
protected final CountLatch numRunningTasks = new CountLatch(0);
public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}
}