Webflux websocketclient, 如何在同一会话中发送多个请求[设计客户端库]TL;DR;长版请!使用 RSocket!好吧,它还没有说服我,我还想有一个自定义的WebSocket服务器要点

TL;DR;

我们正试图使用spring webflux WebSocket实现来设计WebSocket服务器。服务器具有通常的HTTP服务器操作,例如.使用WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行各种操作,因为WebSockets就是为此目的而设计的。使用webflux和WebSockets是正确的设计吗?create/fetch/update/fetchall

长版

我们正在启动一个项目,该项目将使用 来自 的反应式 Web 套接字。我们需要构建一个反应式客户端库,消费者可以使用它来连接到服务器。spring-webflux

在服务器上,我们收到一个请求,读取一条消息,保存它并返回一个静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

在客户端上,我们希望在有人调用方法时进行调用,并从 返回响应。saveserver

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

我们不确定,如何去设计这个。理想情况下,我们认为应该有

1)应该只调用一次,并以某种方式持有.应使用相同的会话在后续调用中发送数据。client.executesession

2)如何从我们进入的服务器返回响应?session.receive

3)当响应很大(不仅仅是静态字符串,而是事件列表)的情况下,怎么样?fetchsession.receive

我们正在做一些研究,但我们无法在线找到webflux-websocket-client文档/实现的适当资源。关于如何继续前进的任何指示。


答案 1

请!使用 RSocket

这是绝对正确的设计,值得节省资源,并且每个客户端仅使用一个连接进行所有可能的操作。

但是,不要实现轮子并使用协议,它为您提供了所有这些类型的通信。

  • RSocket 具有请求-响应模型,允许您执行当今最常见的客户端-服务器交互。
  • RSocket 具有请求流通信模型,因此您可以满足所有需求,并异步重用同一连接返回事件流。RSocket 会将逻辑流映射到物理连接并返回,因此您不会感到自己执行此操作的痛苦。
  • RSocket具有更多的交互模型,例如即发即忘流流,这在以两种方式发送数据流的情况下可能很有用。

如何在春季使用RSocket

其中一个选项是使用RSocket协议的RSocket-Java实现。RSocket-Java建立在Project Reactor之上,因此它自然适合Spring WebFlux生态系统。

不幸的是,没有与Spring生态系统的特色集成。幸运的是,我花了几个小时来提供一个简单的RSocket Spring Boot Starter,它将Spring WebFlux与RSocket集成在一起,并公开了WebSocket RSocket服务器以及WebFlux Http服务器。

为什么 RSocket 是更好的方法?

基本上,RSocket隐藏了自己实现相同方法的复杂性。使用RSocket,我们不必关心交互模型定义作为自定义协议和Java中的实现。RSocket 为我们将数据传递到特定的逻辑通道。它提供了一个内置客户端,可以将消息发送到相同的WS连接,因此我们不必为此发明自定义实现。

使用 RSocket-RPC 使其变得更好

由于 RSocket 只是一个协议,因此它不提供任何消息格式,因此此挑战适用于业务逻辑。但是,有一个 RSocket-RPC 项目,它提供协议缓冲区作为消息格式,并重用与 GRPC 相同的代码生成技术。因此,使用RSocket-RPC,我们可以很容易地为客户端和服务器构建一个API,而不必关心传输和协议抽象。

相同的 RSocket Spring Boot 集成也提供了 RSocket-RPC 用法的示例。

好吧,它还没有说服我,我还想有一个自定义的WebSocket服务器

所以,为了这个目的,你必须自己实现这个地狱。我以前已经做过一次了,但我不能指出那个项目,因为它是一个企业项目。不过,我可以分享一些代码示例,这些示例可以帮助您构建正确的客户端和服务器。

服务器端

处理程序和开放逻辑订阅服务器映射

必须考虑的第一点是,一个物理连接中的所有逻辑流都应存储在某个位置:

class MyWebSocketRouter implements WebSocketHandler {

  final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;


  @Override
  public Mono<Void> handle(WebSocketSession session) {
    final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
    ...
  }
}

上面的示例中有两个映射。第一个是路由映射,它允许您根据传入的消息参数等来识别路由。第二个是为请求流用例创建的(在我的情况下,它是活动订阅的映射),因此您可以发送创建订阅的消息框架,或者为您订阅特定操作并保留该订阅,因此一旦执行取消订阅操作,如果订阅存在,您将被取消订阅。

使用处理器进行消息多路复用

为了从所有逻辑流发回消息,必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用以下命令执行此操作:UnicastProcessor

@Override
public Mono<Void> handle(WebSocketSession session) {
  final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
  ...

  return Mono
    .subscriberContext()
    .flatMap(context -> Flux.merge(
      session
        .receive()
        ...
        .cast(ActionMessage.class)
        .publishOn(Schedulers.parallel())
        .doOnNext(am -> {
          switch (am.type) {
            case CREATE:
            case UPDATE:
            case CANCEL: {
              ...
            }
            case SUBSCRIBE: {
              Flux<ResponseMessage<?>> flux = Flux
                .from(
                  channelsMapping.get(am.getChannelId())
                                 .get(ActionMessage.Type.SUBSCRIBE)
                                 .handle(am) // returns Publisher<>
                );

              if (flux != null) {
                channelsIdsToDisposableMap.compute(
                  am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                  (cid, disposable) -> {
                    ...

                    return flux
                      .subscriberContext(context)
                      .subscribe(
                        funIn::onNext, // send message to a Processor manually
                        e -> {
                          funIn.onNext(
                            new ResponseMessage<>( // send errors as a messages to Processor here
                              0,
                              e.getMessage(),
                              ...
                              ResponseMessage.Type.ERROR
                            )
                          );
                        }
                      );
                  }
                );
              }

              return;
            }
            case UNSABSCRIBE: {
              Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());

              if (disposable != null) {
                disposable.dispose();
              }
            }
          }
        })
        .then(Mono.empty()),

        funIn
            ...
            .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
            .as(session::send)
      ).then()
    );
}

从上面的示例中可以看出,那里有很多东西:

  1. 该消息应包含路线信息
  2. 消息应包含与其相关的唯一流 ID。
  3. 用于消息多路复用的单独处理器,其中错误也应该是消息
  4. 每个通道都应该存储在某个地方,在这种情况下,我们都有一个简单的用例,其中每个消息都可以提供一个消息或只是一个(在单声道的情况下,它可以在服务器端实现得更简单,所以你不必保持唯一的流ID)。FluxMono
  5. 此示例不包括消息编码-解码,因此此质询留给您。

客户端

客户端也不是那么简单:

句柄会话

为了处理连接,我们必须分配两个处理器,以便我们可以进一步使用它们来多路复用和解复用消息:

UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
  return Flux.merge(
     session.receive()
            .subscribeWith(incoming)
            .then(Mono.empty()),
     session.send(outgoing)
  ).then();
}

将所有逻辑流保留在某个位置

所有创建的流,无论是还是应该存储在某个地方,因此我们将能够区分与哪个流消息相关:MonoFlux

Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;

由于MonoSink,我们必须保留两个映射,而FluxSink没有相同的父接口。

消息路由

在上面的示例中,我们只考虑了客户端的初始部分。现在我们必须构建一个消息路由机制:

...
.subscribeWith(incoming)
.doOnNext(message -> {
    if (monoSinkMap.containsKey(message.getStreamId())) {
        MonoSink sink = monoSinkMap.get(message.getStreamId());
        monoSinkMap.remove(message.getStreamId());
        if (message.getType() == SUCCESS) {
            sink.success(message.getData());
        }
        else {
            sink.error(message.getCause());
        }
    } else if (fluxSinkMap.containsKey(message.getStreamId())) {
        FluxSink sink = fluxSinkMap.get(message.getStreamId());
        if (message.getType() == NEXT) {
            sink.next(message.getData());
        }
        else if (message.getType() == COMPLETE) {
            fluxSinkMap.remove(message.getStreamId());
            sink.next(message.getData());
            sink.complete();
        }
        else {
            fluxSinkMap.remove(message.getStreamId());
            sink.error(message.getCause());
        }
    }
})

上面的代码示例演示了如何路由传入消息。

多路复用请求

最后一部分是消息多路复用。为此,我们将介绍可能的发送方类 impl:

class Sender {
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...

    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;

    public Sender () {

在此处创建 websocket 连接,并放置前面提到的代码 }

    Mono<R> sendForMono(T data) {
        //generate message with unique 
        return Mono.<R>create(sink -> {
            monoSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Mono
        });
    }

     Flux<R> sendForFlux(T data) {
         return Flux.<R>create(sink -> {
            fluxSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Flux
        });
     }
}

自定义实现的总结

  1. 硬核
  2. 没有实施背压支持,因此这可能是另一个挑战
  3. 容易射中自己的脚

要点

  1. 请使用RSocket,不要自己发明协议,这很难!!!
  2. 要了解有关RSocket的更多信息,请从Pivotal guys - https://www.youtube.com/watch?v=WVnAbv65uCU
  3. 要从我的一次演讲中了解有关RSocket的更多信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
  4. 有一个建立在RSocket之上的特色框架,称为Proteus - 你可能对此感兴趣 - https://www.netifi.com/
  5. 要从RSocket协议的核心开发人员那里了解有关Proteus的更多信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E

答案 2

不确定这种情况是否是您的问题??我看到您正在发送静态通量响应(这是一个可关闭的流),您需要一个打开的流来向该会话发送消息,例如,您可以创建一个处理器

public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;

public SocketMessageComponent() {
    emitterProcessor = DirectProcessor.create();
    subscriber = emitterProcessor.share();
}

public Flux<String> getSubscriber() {
    return subscriber;
}

public void sendMessage(String mesage) {
    emitterProcessor.onNext(mesage);
}

}

然后您可以发送

 public Mono<Void> handle(WebSocketSession webSocketSession) {
    this.webSocketSession = webSocketSession;
    return webSocketSession.send(socketMessageComponent.getSubscriber()
            .map(webSocketSession::textMessage))
            .and(webSocketSession.receive()
                    .map(WebSocketMessage::getPayloadAsText).log());
}

推荐