在 FutureTask 上等待 cancel()
2022-09-04 04:20:15
我想取消从 ThreadPoolExecutor 获得的 FutureTask,但我想确保线程池上的 Callable 已停止工作。
如果我调用FutureTask#cancel(false),然后get()(阻止直到完成),我得到一个 CancelledException。此异常是立即引发还是在任务停止执行后引发?
我想取消从 ThreadPoolExecutor 获得的 FutureTask,但我想确保线程池上的 Callable 已停止工作。
如果我调用FutureTask#cancel(false),然后get()(阻止直到完成),我得到一个 CancelledException。此异常是立即引发还是在任务停止执行后引发?
这个答案通过检查任务是否已在可调用内取消来修复Aleksey和FooJBar代码中的竞争条件。(在 FutureTask.run 检查状态和运行可调用对象之间有一个窗口,在此期间,cancel 和 getWithJoin 都可以成功完成。但是,可调用仍将运行。
我还决定不覆盖原始取消,因为新的取消需要声明。新的取消摆脱了其无用的返回值(因为可能意味着“任务尚未启动”,“任务已启动并且已经造成大部分损害”,“任务已启动并最终完成”中的任何一个)。检查返回值也消失了,这样如果从不同的线程多次调用新的unase,它们都将等待任务完成。InterruptedException
true
super.cancel
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask
*
* @author Aleksandr Dubinsky
*/
public class FixedFutureTask<T> extends FutureTask<T> {
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable},
* and arrange that {@code get} will return the given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion.
* If you don't need a particular result, consider using constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public
FixedFutureTask (Runnable runnable, T result) {
this (Executors.callable (runnable, result));
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public
FixedFutureTask (Callable<T> callable) {
this (new MyCallable (callable));
}
/** Some ugly code to work around the compiler's limitations on constructors */
private
FixedFutureTask (MyCallable<T> myCallable) {
super (myCallable);
myCallable.task = this;
}
private final Semaphore semaphore = new Semaphore(1);
private static class MyCallable<T> implements Callable<T>
{
MyCallable (Callable<T> callable) {
this.callable = callable;
}
final Callable<T> callable;
FixedFutureTask<T> task;
@Override public T
call() throws Exception {
task.semaphore.acquire();
try
{
if (task.isCancelled())
return null;
return callable.call();
}
finally
{
task.semaphore.release();
}
}
}
/**
* Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
@Override public T
get() throws InterruptedException, ExecutionException, CancellationException {
try
{
return super.get();
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws CancellationException
* @throws TimeoutException if the wait timed out
*/
@Override public T
get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {
try
{
return super.get (timeout, unit);
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Attempts to cancel execution of this task and waits for the task to complete if it has been started.
* If the task has not started when {@code cancelWithJoin} is called, this task should never run.
* If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted;
* otherwise, in-progress tasks are allowed to complete
* @throws InterruptedException if the thread is interrupted
*/
public void
cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {
super.cancel (mayInterruptIfRunning);
semaphore.acquire();
semaphore.release();
}
}
是的,立即抛出。您可以扩展 FutureTask 以添加方法的版本,该版本将等到 的线程完成。CancellationException
get()
Callable
public class ThreadWaitingFutureTask<T> extends FutureTask<T> {
private final Semaphore semaphore;
public ThreadWaitingFutureTask(Callable<T> callable) {
this(callable, new Semaphore(1));
}
public T getWithJoin() throws InterruptedException, ExecutionException {
try {
return super.get();
}
catch (CancellationException e) {
semaphore.acquire();
semaphore.release();
throw e;
}
}
private ThreadWaitingFutureTask(final Callable<T> callable,
final Semaphore semaphore) {
super(new Callable<T>() {
public T call() throws Exception {
semaphore.acquire();
try {
return callable.call();
}
finally {
semaphore.release();
}
}
});
this.semaphore = semaphore;
}
}