Java High-load NIO TCP Server
作为我研究的一部分,我正在用Java编写一个高负载的TCP / IP回显服务器。我想为大约3-4k的客户端提供服务,并查看每秒可以从中挤出的最大可能消息。消息大小非常小 - 最多 100 个字节。这项工作没有任何实际目的 - 只是一项研究。
根据我看过的许多演讲(HornetQ基准测试,LMAX Disruptor讲座等),现实世界的高负载系统倾向于每秒处理数百万个事务(我相信Disruptor提到了大约6 mils和Hornet - 8.5)。例如,这篇文章指出,可以实现高达40M MPS。因此,我将其作为对现代硬件应该具备哪些能力的粗略估计。
我编写了最简单的单线程NIO服务器并启动了负载测试。我有点惊讶,我只能在localhost上获得大约100k MPS,而在实际网络上只能获得25k。数字看起来很小。我正在Win7 x64,core i7上进行测试。查看 CPU 负载 - 只有一个内核处于繁忙状态(这在单线程应用上是预期的),而其余内核处于空闲状态。但是,即使我加载所有8个内核(包括虚拟),我也不会有超过800k MPS - 甚至没有接近4000万:)
我的问题是:向客户端提供大量消息的典型模式是什么?我是否应该在单个 JVM 内的多个不同套接字上分配网络负载,并使用某种负载均衡器(如 HAProxy)将负载分配到多个内核?或者我应该考虑在我的NIO代码中使用多个选择器?或者甚至可以在多个JVM之间分配负载,并使用Chronicle在它们之间建立进程间通信?在像CentOS这样的适当的服务器端操作系统上进行测试会产生很大的不同(也许是Windows减慢了速度)?
以下是我的服务器的示例代码。它总是对任何传入的数据回答“ok”。我知道在现实世界中,我需要跟踪消息的大小,并准备好一条消息可能会在多次读取之间拆分,但是我现在想让事情变得超级简单。
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
this.port = port;
selector = initSelector();
loop();
}
private void loop() {
while (true) {
try{
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
key.cancel();
socketChannel.close();
System.out.println("Forceful shutdown");
return;
}
if (numRead == -1) {
System.out.println("Graceful shutdown");
key.channel().close();
key.cancel();
return;
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
numMessages++;
if (numMessages%100000 == 0) {
long elapsed = System.currentTimeMillis() - loopTime;
loopTime = System.currentTimeMillis();
System.out.println(elapsed);
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
socketChannel.write(dummyResponse);
if (dummyResponse.remaining() > 0) {
System.err.print("Filled UP");
}
key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) throws IOException {
System.out.println("Starting echo server");
new EchoServer();
}
}