Java High-load NIO TCP Server
作为我研究的一部分,我正在用Java编写一个高负载的TCP / IP回显服务器。我想为大约3-4k的客户端提供服务,并查看每秒可以从中挤出的最大可能消息。消息大小非常小 - 最多 100 个字节。这项工作没有任何实际目的 - 只是一项研究。
根据我看过的许多演讲(HornetQ基准测试,LMAX Disruptor讲座等),现实世界的高负载系统倾向于每秒处理数百万个事务(我相信Disruptor提到了大约6 mils和Hornet - 8.5)。例如,这篇文章指出,可以实现高达40M MPS。因此,我将其作为对现代硬件应该具备哪些能力的粗略估计。
我编写了最简单的单线程NIO服务器并启动了负载测试。我有点惊讶,我只能在localhost上获得大约100k MPS,而在实际网络上只能获得25k。数字看起来很小。我正在Win7 x64,core i7上进行测试。查看 CPU 负载 - 只有一个内核处于繁忙状态(这在单线程应用上是预期的),而其余内核处于空闲状态。但是,即使我加载所有8个内核(包括虚拟),我也不会有超过800k MPS - 甚至没有接近4000万:)
我的问题是:向客户端提供大量消息的典型模式是什么?我是否应该在单个 JVM 内的多个不同套接字上分配网络负载,并使用某种负载均衡器(如 HAProxy)将负载分配到多个内核?或者我应该考虑在我的NIO代码中使用多个选择器?或者甚至可以在多个JVM之间分配负载,并使用Chronicle在它们之间建立进程间通信?在像CentOS这样的适当的服务器端操作系统上进行测试会产生很大的不同(也许是Windows减慢了速度)?
以下是我的服务器的示例代码。它总是对任何传入的数据回答“ok”。我知道在现实世界中,我需要跟踪消息的大小,并准备好一条消息可能会在多次读取之间拆分,但是我现在想让事情变得超级简单。
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}
private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();
                if (!key.isValid()) {
                    continue;
                }
                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}
private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);
    System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();
    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();
        System.out.println("Forceful shutdown");
        return;
    }
    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();
        return;
    }
    socketChannel.register(selector, SelectionKey.OP_WRITE);
    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}
private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }
    key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}
public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}
 
					 
				 
				    		 
				    		 
				    		 
				    		