CompletableFuture,Future和RxJava的Observable之间的区别

我想知道 、 和 之间的区别。CompletableFutureFutureObservableRxJava

我所知道的都是异步的,但是

Future.get()阻塞线程

CompletableFuture给出回调方法

RxJava Observable---与其他福利相似(不确定)CompletableFuture

例如:如果客户端需要进行多个服务调用,并且当我们使用(Java)时将按顺序执行...想知道它在RxJava中如何更好。FuturesFuture.get()

文档 http://reactivex.io/intro.html

很难使用 Futures 以最佳方式组合条件异步执行流(或者不可能,因为每个请求的延迟在运行时会有所不同)。当然,这可以完成,但它很快就会变得复杂(因此容易出错),或者过早地阻止Future.get(),这消除了异步执行的好处。

真的很想知道如何解决这个问题。我发现很难从文档中理解。RxJava


答案 1

期货

期货是在Java 5(2004)中引入的。它们基本上是尚未完成的操作结果的占位符。操作完成后,将包含该结果。例如,操作可以是提交到执行器服务的可运行可调用实例。操作的提交者可以使用该对象来检查操作是否为Done(),或者使用阻塞 get() 方法等待它完成。FutureFuture

例:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

可操作未来

CompletableFutures在Java 8(2014)中引入。它们实际上是常规期货的演变,灵感来自谷歌的Listenable Futures,这是番石榴图书馆的一部分。它们是期货,还允许您将任务串成一个链。你可以用它们来告诉一些工作线程“去做一些任务X,当你完成之后,去用X的结果去做其他事情”。使用ComppletableFutures,您可以对操作结果执行某些操作,而无需实际阻塞线程以等待结果。下面是一个简单的示例:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava是在Netflix创建的用于反应式编程的整个库。乍一看,它似乎类似于Java 8的流。它是,除了它更强大。

与 Futures 类似,RxJava 可用于将一堆同步或异步操作串在一起,以创建处理管道。与一次性期货不同,RxJava适用于零个或多个项目的。包括具有无限数量项目的永无止境的流。由于一组令人难以置信的丰富运算符,它也更加灵活和强大。

与Java 8的流不同,RxJava还具有背压机制,它允许它处理处理管道的不同部分以不同的速率在不同线程中运行的情况。

RxJava的缺点是,尽管有可靠的文档,但由于涉及范式转变,它是一个具有挑战性的学习库。Rx代码也可能是调试的噩梦,特别是如果涉及多个线程,更糟糕的是 - 如果需要背压。

如果你想进入它,官方网站上有一整的各种教程,加上官方文档Javadoc。您还可以查看一些视频,例如这个视频,它简要介绍了Rx,并讨论了Rx和Futures之间的差异。

奖励:Java 9 Reactive Streams

Java 9的Reactive Streams(又名Flow API)是由各种反应式流库(如RxJava 2Akka StreamsVertx)实现的一组接口。它们允许这些反应式库互连,同时保持所有重要的背压。


答案 2

我从0.9开始就一直在使用Rx Java,现在是1.3.2,很快就迁移到2.x,我在一个私人项目中使用它,我已经工作了8年。

如果没有这个库,我就不会再编程了。一开始我很怀疑,但这是你需要创造的完全不同的心态。一开始很难。我有时看着弹珠几个小时。哈哈

这只是一个实践问题,真正了解流程(又名可观察量和观察者的契约),一旦你到达那里,你就会讨厌这样做。

对我来说,这个图书馆并没有真正的缺点。

用例:我有一个包含9个仪表(cpu,mem,网络等)的监视器视图。启动视图时,视图将自身订阅一个系统监视器类,该类返回一个可观察的(间隔),其中包含 9 米的所有数据。它会每秒将一个新结果推送到视图(因此不会轮询!!!)。该可观察性使用平面图同时(异步!)从9个不同的源获取数据,并将结果压缩到您的视图将在onNext()上获得的新模型中。

你怎么能用期货,可兑换物等来做到这一点...祝你好运!:)

Rx Java为我解决了编程中的许多问题,并且在某种程度上使我变得更加容易......

优势:

  • 斯泰!!!(重要的事情要提一下,也许最重要的是)
  • 开箱即用的线程管理
  • 构建具有自己生命周期的序列
  • 一切都是可观察的,所以链接很容易
  • 需要编写的代码更少
  • 类路径上的单个 jar(非常轻量级)
  • 高并发
  • 不再有回调地狱
  • 基于订户(消费者和生产者之间的紧密契约)
  • 背压策略(断路器等)
  • 出色的错误处理和恢复
  • 非常好的文档(弹珠<3)
  • 完全控制
  • 还有更多...

缺点: - 难以测试


推荐