vert.x 等待回复多封邮件

2022-09-03 13:43:22

在vert.x中,我可以向另一个顶点发送消息并“异步等待”回复。

问题是:我想将消息发送到多个顶点,并在所有顶点回复时调用异步处理程序。

这是否可能,或者是否有更好的设计来实现此功能?

编辑:

假设我有一个顶点A,它将消息发送到顶点B,C和D。每个顶点(B,C,D)对消息执行某些操作并返回A一些数据。然后,顶点A接收来自B,C,D的响应,并对所有数据执行某些操作。问题是我发送的每条消息都有一个处理程序(一个用于A,一个用于B,一个用于C),我希望在所有回复到达时调用一个处理程序。


答案 1

从 Vert.x 3.2 开始,文档解释了如何使用 和 进行异步协调FutureCompositeFuture

因此,假设您希望通过事件总线进行两次调用,并在两者都成功时执行某些操作:send

Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());

Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());

CompositeFuture.all(f1, f2).setHandler(result -> {
  // business as usual
});

最多可以将 6 个期货作为参数传递,也可以将它们作为列表传递。


答案 2

最好的方法是使用Reactive Extensions,由Netflix的Rx.Java实现,由RxVertx Module提供。

各种各样的运算符允许您执行诸如将多个异步调用的结果“压缩”到新结果中之类的操作,并对其进行任何操作。

我在GitHub上有一个简单的演示,其中包含:

final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);

此代码段显示了来自两个事件总线异步交互的两个可观察量如何被“压缩”(即合并)到一个最终的 HTTP 响应中。


推荐