Jedis 和 Lettuce 异步能力

2022-09-01 09:34:15

我正在将redis与Akka一起使用,因此我不需要阻塞呼叫。生菜内置了异步未来调用。但Jedis是Redis推荐的客户。有人可以告诉我我是否以正确的方式使用它们。如果是这样,哪一个更好。

杰迪斯我正在使用静态Jedis连接池来获取con,并使用Akka未来回调来处理结果。我在这里担心的是,当我使用另一个线程(可调用)来获得结果时,线程最终会阻止结果。虽然生菜可能有一些更有效的方法来做到这一点。

 private final class OnSuccessExtension extends OnSuccess<String> {
            private final ActorRef senderActorRef;
            private final Object message;
            @Override
            public void onSuccess(String valueRedis) throws Throwable {
                log.info(getContext().dispatcher().toString());
                senderActorRef.tell((String) message, ActorRef.noSender());
            }
             public OnSuccessExtension(ActorRef senderActorRef,Object message) {
                    this.senderActorRef = senderActorRef;
                    this.message=message;
                }
        }
        ActorRef senderActorRef = getSender(); //never close over a future
            if (message instanceof String) {
        Future<String> f =akka.dispatch.Futures.future(new Callable<String>() {
                    public String call() {
                        String result;
                        try(Jedis jedis=JedisWrapper.redisPool.getResource()) {
                            result = jedis.get("name");
                        }
                        return result;
                    }
                }, ex);
                f.onSuccess(new OnSuccessExtension(senderActorRef,message), ex);
    }

生菜

ExecutorService executorService = Executors.newFixedThreadPool(10);
public void onReceive(Object message) throws Exception {
        ActorRef senderActorRef = getSender(); //never close over a future
        if (message instanceof String) {

            final RedisFuture<String> future = lettuce.connection.get("name");
            future.addListener(new Runnable() {
                final ActorRef sender = senderActorRef;
                final String msg =(String) message;
                @Override
                public void run() {
                    try {
                        String value = future.get();
                        log.info(value);
                        sender.tell(message, ActorRef.noSender());
                    } catch (Exception e) {
                    }
                }
            }, executorService);

如果生菜是异步调用的更好选择。那么我应该在生产环境中使用哪种类型的执行器。如果可能的话,我可以使用 Akka 调度程序作为 Letture 未来调用的执行上下文。


答案 1

您的问题没有一个答案,因为它取决于。

Jedis和生菜都是成熟的客户。为了完成Java客户端的列表,还有Redisson,它增加了另一个抽象层(Collection/Queue/Lock/...接口,而不是原始的 Redis 命令)。

这在很大程度上取决于您如何与客户合作。通常,Jedis(基于java的客户端连接到redis)在数据访问方面是单线程的,因此通过并发获得的唯一好处是将协议和I / O工作卸载到不同的线程。对于生菜和Redisson来说,这并不完全正确,因为它们在引擎盖下使用netty(netty将一个套接字通道绑定到特定的事件循环线程)。

使用 Jedis,您一次只能使用一个线程的一个连接。这与 Akka actor 模型密切相关,因为一个 actor 实例一次只被一个线程占用。

另一方面,你需要的Jedis连接与处理特定演员的线程一样多。如果您开始在不同的 Actor 之间共享 Jedis 连接,则要么使用连接池,要么需要为每个 Actor 实例建立专用的 Jedis 连接。请记住,您需要自己处理重新连接(一旦Redis连接断开)。

使用Redisson和生菜,如果您愿意,您可以获得透明的重新连接(这是生菜的默认值,不确定Redisson)。

通过使用生菜和 Redisson,您可以在所有参与者之间共享一个连接,因为它们是线程安全的。在两种情况下,您不能共享一个生菜连接:

  1. 阻止操作(因为您将阻止连接的所有其他用户)
  2. 事务(/,因为您将不同的操作与事务混合在一起,这肯定是您不想这样做的事情)MULTIEXEC

Jedis 没有异步接口,因此您需要自己执行此操作。这是可行的,我对MongoDB做了类似的事情,将I / O部分卸载/解耦给其他参与者。您可以在代码中使用该方法,但不需要提供自己的执行器服务,因为您可以在可运行的侦听器中执行非阻塞操作。

使用生菜4.0,您将获得Java 8支持(由于CompendationStage接口,这在异步API方面要好得多),您甚至可以使用RxJava(反应式编程)来实现并发性。

生菜在您的并发模型上并不固执己见。它允许您根据需要使用它,除了Java 6 / 7和Guava的普通/ API不是很好用。FutureListenableFuture

HTH, 马克


答案 2

试试 Redisson 框架。它通过与 Project Reactor 和 RxJava3 库集成,提供异步 API 以及 Reactive Streams API。

异步 API 使用示例:

RedissonClient client = Redisson.create(config);
RMap<String, String> map = client.getMap("myMap");

// implements CompletionStage interface
RFuture<String> future = map.get("myKey");

future.whenComplete((res, exception) -> {
  // ...
});

具有 Project Reactor 的 Reactive Streams API 使用示例:

RedissonReactiveClient client = Redisson.createReactive(config);
RMapReactive<String, String> map = client.getMap("myMap");

Mono<String> resp = map.get("myKey");

Reactive Streams API with RxJava2 lib 使用示例:

RedissonRxClient client = Redisson.createRx(config);
RMapRx<String, String> map = client.getMap("myMap");

Flowable<String> resp = map.get("myKey");

推荐