Java High-load NIO TCP Server

2022-09-02 01:17:47

作为我研究的一部分,我正在用Java编写一个高负载的TCP / IP回显服务器。我想为大约3-4k的客户端提供服务,并查看每秒可以从中挤出的最大可能消息。消息大小非常小 - 最多 100 个字节。这项工作没有任何实际目的 - 只是一项研究。

根据我看过的许多演讲(HornetQ基准测试,LMAX Disruptor讲座等),现实世界的高负载系统倾向于每秒处理数百万个事务(我相信Disruptor提到了大约6 mils和Hornet - 8.5)。例如,这篇文章指出,可以实现高达40M MPS。因此,我将其作为对现代硬件应该具备哪些能力的粗略估计。

我编写了最简单的单线程NIO服务器并启动了负载测试。我有点惊讶,我只能在localhost上获得大约100k MPS,而在实际网络上只能获得25k。数字看起来很小。我正在Win7 x64,core i7上进行测试。查看 CPU 负载 - 只有一个内核处于繁忙状态(这在单线程应用上是预期的),而其余内核处于空闲状态。但是,即使我加载所有8个内核(包括虚拟),我也不会有超过800k MPS - 甚至没有接近4000万:)

我的问题是:向客户端提供大量消息的典型模式是什么?我是否应该在单个 JVM 内的多个不同套接字上分配网络负载,并使用某种负载均衡器(如 HAProxy)将负载分配到多个内核?或者我应该考虑在我的NIO代码中使用多个选择器?或者甚至可以在多个JVM之间分配负载,并使用Chronicle在它们之间建立进程间通信?在像CentOS这样的适当的服务器端操作系统上进行测试会产生很大的不同(也许是Windows减慢了速度)?

以下是我的服务器的示例代码。它总是对任何传入的数据回答“ok”。我知道在现实世界中,我需要跟踪消息的大小,并准备好一条消息可能会在多次读取之间拆分,但是我现在想让事情变得超级简单。

public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}

答案 1
what is a typical pattern for serving massive amounts of messages to clients?

有许多可能的模式:在不经历多个jvm的情况下利用所有内核的一种简单方法是:

  1. 让单个线程接受连接并使用选择器读取。
  2. 一旦您有足够的字节来构成单个消息,就可以使用环形缓冲区等构造将其传递到另一个内核。Disruptor Java框架与此非常匹配。如果知道什么是完整消息所需的处理是轻量级的,这是一个很好的模式。例如,如果您有一个长度前缀协议,则可以等到获得预期的字节数,然后将其发送到另一个线程。如果协议的解析非常繁重,那么您可能会淹没这个单个线程,阻止它接受连接或读取网络的字节。
  3. 在从环形缓冲区接收数据的工作线程上,执行实际处理。
  4. 您可以在工作线程上或通过其他聚合器线程写出响应。

这就是它的要点。这里还有更多的可能性,答案实际上取决于您正在编写的应用程序类型。以下是一些示例:

  1. CPU 密集型无状态应用程序表示图像处理应用程序。每个请求完成的 CPU/GPU 工作量可能会明显高于非常幼稚的线程间通信解决方案产生的开销。在这种情况下,一个简单的解决方案是一堆工作线程从单个队列中提取工作。请注意,这是单个队列,而不是每个工作线程一个队列。优点是这本质上是负载平衡的。每个工作线程完成其工作,然后只轮询单生产者多使用者队列。尽管这是争用的来源,但图像处理工作(秒?)应该比任何同步替代方案都要昂贵得多。
  2. 一个纯粹的IO应用程序,例如一个统计服务器,它只是为请求增加一些计数器:在这里,你几乎不做任何CPU繁重的工作。大部分工作只是读取字节和写入字节。多线程应用程序在这里可能不会给您带来显著的好处。事实上,如果将项目排队所需的时间超过处理项目所需的时间,它甚至可能会减慢速度。单线程Java服务器应该能够轻松地使1G链路饱和。
  3. 需要适度处理量的有状态应用程序,例如典型的业务应用程序:在这里,每个客户端都有一些状态来确定如何处理每个请求。假设我们采用多线程,因为处理不是微不足道的,我们可以将客户端关联到某些线程。这是 actor 体系结构的变体:

    i) 当客户端首次将其哈希连接到工作线程时。您可能希望使用某个客户端 ID 执行此操作,以便在断开连接并重新连接时仍将其分配给同一工作线程/执行组件。

    ii)当读取器线程读取完整的请求时,将其放在正确的工作线程/执行组件的环形缓冲区上。由于同一工作线程始终处理特定客户端,因此所有状态都应为线程本地,从而使所有处理逻辑变得简单且单线程。

    iii) 工作线程可以将请求写出。总是尝试只做一个写()。如果您的所有数据都无法写出,那么您是否注册OP_WRITE。工作线程仅在确实存在未完成的内容时才需要进行选择调用。大多数写作应该只是成功,使这变得不必要。这里的诀窍是在选择调用和轮询环形缓冲区以获得更多请求之间取得平衡。您还可以使用单个编写器线程,其唯一职责是将请求写出。每个工作线程都可以将其响应放在将其连接到此单个写入器线程的环形缓冲区上。单写入器线程轮循机制轮询每个传入的环形缓冲区,并将数据写出到客户端。同样,关于在选择之前尝试写入的警告也适用于在多个环形缓冲区和选择调用之间平衡的技巧。

正如您所指出的,还有许多其他选择:

Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores?

您可以执行此操作,但恕我直言,这不是负载均衡器的最佳用途。这确实为您购买了独立的JVM,它们可以自行失败,但可能比编写单个多线程JVM应用程序慢。应用程序本身可能更容易编写,因为它将是单线程的。

Or I should look towards using multiple Selectors in my NIO code?

您也可以这样做。查看Ngnix架构以获取有关如何执行此操作的一些提示。

Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them?这也是一种选择。Chronicle 为您提供了一个优势,即内存映射文件对中间退出的进程更具弹性。您仍然可以获得足够的性能,因为所有通信都是通过共享内存完成的。

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

我不知道这个。不可能。如果Java充分利用了原生的Windows API,那应该就不那么重要了。我对每秒4000万笔交易的数字(没有用户空间网络堆栈+ UDP)非常怀疑,但我列出的架构应该做得很好。

这些体系结构往往做得很好,因为它们是单写入器体系结构,它们使用基于有界数组的数据结构进行线程间通信。确定多线程是否是答案。在许多情况下,它不是必需的,并且可能导致速度变慢。

另一个需要研究的领域是内存分配方案。具体而言,分配和重用缓冲区的策略可以带来显著的好处。正确的缓冲区重用策略取决于应用程序。看看像好友记忆分配,竞技场分配等方案,看看它们是否能使你受益。JVM GC对于大多数工作负载来说做得很好,所以在你走这条路之前总是要测量。

协议设计对性能也有很大的影响。我倾向于使用长度前缀协议,因为它们允许您分配大小合适的缓冲区,从而避免缓冲区列表和/或缓冲区合并。长度前缀协议还可以轻松决定何时移交请求 - 只需检查。实际的解析可以由工作线程完成。序列化和反序列化超出了长度前缀协议的范围。像缓冲区上的蝇量级模式而不是分配这样的模式在这里有所帮助。查看 SBE 以了解其中的一些原则。num bytes == expected

你可以想象,一整篇论文都可以在这里写。这应该让你朝着正确的方向前进。警告:始终进行测量,并确保您需要比最简单的选项更高的性能。很容易陷入一个永无止境的性能改进黑洞。


答案 2

你关于写作的逻辑是错误的。您应该立即尝试写入,以便写入数据。如果返回零,则是时候注册OP_WRITE,当通道变为可写时重试写入,并在写入成功时取消注册。您在此处添加了大量延迟。在执行所有这些操作时,通过取消注册来增加更多的延迟。write()OP_WRITEOP_READ


推荐