如何使用执行器服务进行轮询,直到结果到达

我有一个场景,我必须轮询远程服务器检查任务是否已完成。一旦它完成了,我就进行不同的调用来检索结果。

我最初认为我应该使用一个 with 进行轮询:SingleThreadScheduledExecutorscheduleWithFixedDelay

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

但是,由于我只能提供一个不能返回任何东西的东西,我不明白什么时候会完成,如果有的话。打电话到底是什么意思?我等什么结果?RunnablescheduleWithFixedDelayfuturefuture.get()

第一次检测到远程任务已完成时,我想执行不同的远程调用并将其结果设置为 .我想我可以使用CompletableFuture来实现这一点,我会转发到我的方法,这将反过来将其转发到我的方法,最终完成它:futurepollretrieveTask

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

但这有很多问题。首先,似乎甚至不适合这种用途。相反,我应该做我想,但是当我被取消/完成时,我将如何正确关闭并取消它返回?感觉轮询应该以某种完全不同的方式实现。CompletableFutureCompletableFuture.supplyAsync(() -> poll(jobId))executorfutureCompletableFuture


答案 1

我认为CompletableFutures是做到这一点的好方法:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(final String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}

答案 2

在我看来,你对一些文体问题比其他任何问题都更担心。在java 8中,有2个角色:一个是传统的未来,它为任务执行和状态查询提供了异步源;另一个是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成来源。因此,在这种情况下,直观地需要一个承诺,这就是您在此处使用的确切情况。您担心的示例是向您介绍第一次用法的东西,而不是承诺方式。CompletableFuture

接受这一点,你应该更容易开始处理你的实际问题。我认为承诺应该有2个角色,一个是通知你的任务完成轮询,另一个是取消你的计划任务完成。这应该是最终的解决方案:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}

推荐