使用 RxJava 和 Okhttp

2022-09-01 19:58:36

我想在另一个线程(如IO线程)中使用okhttp请求url并进入Android主线程,但我不知道如何创建.ResponseObservable


答案 1

首先添加到依赖项中,然后创建如下所示的依赖项:RxAndroidObservable

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });

它将在另一个线程(io线程)中请求您的URL,并在Android主线程上观察它。

最后,当您离开屏幕时,请使用以避免内存泄漏。subsribtion.unsubscribe()

当你使用时,你应该写很多样板代码,你也必须自己处理订阅。更好的替代方法是使用延迟。形成文档:Observable.create

在观察者订阅之前不要创建可观察量,并为每个观察者创建一个新的可观察量

Defer 运算符等待观察器订阅它,然后生成一个可观察量,通常具有可观察的工厂函数。它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它正在订阅相同的可观察量,但实际上每个订阅者都有自己的单独序列。

因此,正如Marcin Koziński所提到的,你只需要这样做:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

答案 2

它比以下方法更易于使用,更安全:Observable.defer()Observable.create()

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

这样可以为您处理取消订阅和背压。这是Dan Lew关于和的一篇很棒的文章create()defer()

如果你想走这条路,那么它看起来应该更像这个库中,到处都是电话。我相信这仍然不能处理背压。Observable.create()isUnsubscribed()


推荐