使用ComppleableFuture重试逻辑

我需要在我正在处理的异步框架中提交任务,但我需要捕获异常,并在“中止”之前多次重试同一任务。

我正在使用的代码是:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

这当前无法编译,因为 lambda 的返回类型错误:它期望一个 ,但返回一个 。ResultexecuteActionAsyncCompletableFuture<Result>

如何实现此完全异步重试逻辑?


答案 1

链接后续重试可以很简单:

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.exceptionally(t -> executeMycustomActionHere().join());
    }
    return f;
}

阅读下面的
缺点 这只会按预期链接尽可能多的重试,因为这些后续阶段在非特殊情况下不会执行任何操作。

一个缺点是,如果第一次尝试立即失败,因此在第一个处理程序链接时已经异常完成,则调用线程将调用该操作,从而完全删除请求的异步性质。通常,可能会阻塞线程(默认执行器将启动新的补偿线程,但仍然不鼓励这样做)。不幸的是,两者都没有,也没有方法。fexceptionallyjoin()exceptionallyAsyncexceptionallyCompose

不调用的解决方案是join()

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.thenApply(CompletableFuture::completedFuture)
           .exceptionally(t -> executeMycustomActionHere())
           .thenCompose(Function.identity());
    }
    return f;
}

演示如何组合“compose”和“异常”处理程序。

此外,如果所有重试都失败,则仅报告最后一个异常。更好的解决方案应报告第一个异常,并将重试的后续异常添加为抑制异常。这样的解决方案可以通过链接递归调用来构建,正如Gili的答案所暗示的那样,但是,为了将这个想法用于异常处理,我们必须使用以下步骤来组合“compose”和“exceptionally”,如上所示:

public CompletableFuture<Result> executeActionAsync() {
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> retry(t, 0))
        .thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
    if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
        .thenCompose(Function.identity());
}

CompletableFuture.failedFuture是一个 Java 9 方法,但如果需要,将兼容 Java 8 的向后移植添加到代码中是微不足道的:

public static <T> CompletableFuture<T> failedFuture(Throwable t) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    cf.completeExceptionally(t);
    return cf;
}

答案 2

我建议不要实现自己的重试逻辑,而是使用经过验证的库,如faillessafe,它具有对期货的内置支持(并且似乎比番石榴重试更受欢迎)。对于您的示例,它看起来像这样:

private static RetryPolicy retryPolicy = new RetryPolicy()
    .withMaxRetries(MAX_RETRIES);

public CompletableFuture<Result> executeActionAsync() {
    return Failsafe.with(retryPolicy)
        .with(executor)
        .withFallback(null)
        .future(this::executeMycustomActionHere);
}

也许你应该避免并且只是让返回的未来方法抛出结果异常,以便你的方法的调用者可以专门处理它,但这是你必须做出的设计决策。.withFallback(null).get()

需要考虑的其他事项包括是立即重试还是在两次尝试之间等待一段时间,任何类型的递归回退(在调用可能已关闭的 Web 服务时很有用),以及是否存在不值得重试的特定异常(例如,如果方法的参数无效)。


推荐