如何使用 netty 客户端获取服务器响应

2022-09-03 06:30:33

我想写一个基于netty的客户端。它应该有方法公共字符串发送(字符串msg);它应该从服务器或将来返回响应 - 没关系。此外,它应该是多线程的。喜欢这个:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

而且我不明白如何在调用 writeAndFlush() 后从服务器检索响应;我该怎么办?

我也使用Netty 4.0.18.Final


答案 1

返回方法很简单,我们将实现以下方法签名:Future<String>

public Futute<String> sendMessage(String msg) {

当您知道异步编程结构时,这是相对容易做到的。为了解决设计问题,我们将执行以下步骤:

  1. 写入消息时,将 添加到数组阻止队列<希望>Promise<String>

    这将作为最近发送的消息的列表,并允许我们更改对象返回结果。Future<String>

  2. 当消息返回到处理程序时,请将其解析为Queue

    这使我们能够获得正确的未来来改变。

  3. 更新 的状态Promise<String>

    我们调用以最终设置对象的状态,这将传播回未来的对象。promise.setSuccess()

示例代码

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

文档细分

  • private ChannelHandlerContext ctx;

    用于存储我们对ChannelHandlerContext的引用,我们使用它,以便我们可以创建承诺

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    我们将过去的消息保留在此列表中,以便我们可以更改将来的结果

  • public void channelActive(ChannelHandlerContext ctx)

    当连接变为活动状态时由 netty 调用。在这里初始化我们的变量。

  • public void channelInactive(ChannelHandlerContext ctx)

    当连接由于错误或正常连接关闭而变为非活动状态时,由 netty 调用。

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    当有新消息到达时,netty 调用,这里挑出队列的头部,然后我们调用 setsuccess。

警告建议

使用期货时,您需要注意1件事,如果将来尚未完成,请不要从1个netty线程调用get(),如果不遵循这个简单的规则将导致死锁或阻塞操作异常


答案 2

您可以在 netty 项目中找到该示例。我们可以将结果保存到最后一个处理程序的自定义字段中。在下面的代码中,我们想要的是 handler.getFactorial()。

请参阅 http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all

因子客户端.java

public final class FactorialClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
    static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new FactorialClientInitializer(sslCtx));

            // Make a new connection.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Get the handler instance to retrieve the answer.
            FactorialClientHandler handler =
                (FactorialClientHandler) f.channel().pipeline().last();

            // Print out the answer.
            System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
        } finally {
            group.shutdownGracefully();
        }
    }
}

public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {

    private ChannelHandlerContext ctx;
    private int receivedMessages;
    private int next = 1;
    final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

    public BigInteger getFactorial() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        sendNumbers();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == FactorialClient.COUNT) {
            // Offer the answer after closing the connection.
            ctx.channel().close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    boolean offered = answer.offer(msg);
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private void sendNumbers() {
        // Do not send more than 4096 numbers.
        ChannelFuture future = null;
        for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
            future = ctx.write(Integer.valueOf(next));
            next++;
        }
        if (next <= FactorialClient.COUNT) {
            assert future != null;
            future.addListener(numberSender);
        }
        ctx.flush();
    }

    private final ChannelFutureListener numberSender = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                sendNumbers();
            } else {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    };
}

推荐