rxjava:我可以使用retry()但有延迟吗?

2022-08-31 10:31:01

我在我的Android应用程序中使用rxjava来异步处理网络请求。现在,我想仅在经过一段时间后重试失败的网络请求。

有没有办法在可观察对象上使用retry(),但只在一定延迟后重试?

有没有办法让可观察量知道目前正在重试(而不是第一次尝试)?

我看了一下 debounce()/throttleWithTimeout(),但他们似乎正在做一些不同的事情。

编辑:

我想我找到了一种方法来做到这一点,但我感兴趣的是确认这是正确的方法,或者其他更好的方法。

我正在做的是这样的:在我的Observable.OnSubscribe的call()方法中,在我调用 Subscribers onError() 方法之前,我只是让 Thread 休眠所需的时间。因此,要每隔 1000 毫秒重试一次,我执行如下操作:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

由于此方法无论如何都在 IO 线程上运行,因此它不会阻止 UI。我能看到的唯一问题是,即使第一个错误也会报告延迟,因此即使没有重试(),延迟也会存在。如果延迟不是在错误之后应用,而是在重试之前(但不是在第一次尝试之前,显然)应用,我会更好。


答案 1

可以使用该运算符将重试逻辑添加到任何可观察量。retryWhen()

以下类包含重试逻辑:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

用法:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

答案 2

受到Paul的回答的启发,如果您不关心Abhijit Sarkar提出的问题,那么不条件地延迟使用rxJava2重新订阅的最简单方法是:retryWhen

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

您可能希望查看有关重试时和重复时的更多示例和说明。


推荐