无阻塞插座
在Java中实现非阻塞套接字的最佳方法是什么?
还是有这样的事情?我有一个通过套接字与服务器通信的程序,但我不希望套接字调用阻止/导致延迟,如果数据/连接出现问题。
在Java中实现非阻塞套接字的最佳方法是什么?
还是有这样的事情?我有一个通过套接字与服务器通信的程序,但我不希望套接字调用阻止/导致延迟,如果数据/连接出现问题。
Java 2 标准版 1.4 中引入的 Java 非阻塞套接字允许应用程序之间的网络通信,而不会阻塞使用套接字的进程。但是,什么是非阻塞套接字,它在哪些上下文中可能有用,以及它是如何工作的?
非阻塞套接字允许在通道上进行 I/O 操作,而不会阻塞使用它的进程。这意味着,我们可以使用单个线程来处理多个并发连接,并获得“异步高性能”读/写操作(有些人可能不同意这一点)
好吧,在什么情况下它可能是有用的?
假设您要实现一个接受各种客户端连接的服务器。假设您还希望服务器能够同时处理多个请求。使用传统方式,您有两种选择来开发这样的服务器:
这两种解决方案都有效,但是采用第一个解决方案时,您必须开发整个线程管理解决方案,并伴有相关的并发和冲突问题。第二种解决方案使应用程序依赖于非JDK外部模块,并且可能您必须使库适应您的需要。通过非阻塞套接字,您可以实现非阻塞服务器,而无需直接管理线程或求助于外部模块。
在详细介绍之前,您需要了解几个术语:
Java NIO有一个名为类,允许单个线程检查多个通道上的I / O事件。这怎么可能?好吧,可以检查通道的“就绪情况”,例如客户端尝试连接或读/写操作等事件。也就是说,每个实例都可以监视更多的套接字通道,从而监视更多的连接。现在,当通道上发生某些事情(发生事件)时,会通知应用程序处理请求。通过创建事件键(或选择键)来实现此目的,这些事件键是类的实例。每个都包含有关谁在发出请求以及请求类型的信息,如图 1 所示。Selector
selector
Selector
selector
selector
SelectionKey
key
服务器实现由无限循环组成,在该循环中等待事件并创建事件键。密钥有四种可能的类型:selector
通常在服务器端创建密钥。实际上,这种键只是通知服务器客户端需要连接,然后服务器将套接字通道个性化,并将其与选择器相关联以进行读/写操作。在此之后,当接受的客户端读取或写入某些内容时,选择器将为该客户端创建或密钥。acceptable
readable
writeable
现在,您已准备好按照建议的算法用 Java 编写服务器。套接字通道、和套接字选择器注册的创建可以通过以下方式进行:selector
final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;
// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));
// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
首先,我们创建一个 with 方法的实例。接下来,调用将其设置为非阻塞。通过方法与服务器建立连接。表示服务器的 IP 地址,并且是通信端口。最后,调用方法来创建实例并将其注册到 和 注册类型。在此示例中,注册类型为 ,这意味着选择器仅报告客户端尝试连接到服务器。其他可能的选项包括: ,客户端将使用; ;和。SocketChannel
ServerSocketChannel.open()
configureBlocking(false)
channel
serverChannel.socket().bind()
HOSTNAME
PORT
Selector.open()
selector
channel
OP_ACCEPT
OP_CONNECT
OP_READ
OP_WRITE
现在我们需要使用无限循环来处理此请求。一个简单的方法是:
// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
/*
* selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
* For example, if a client connects right this second, then it will break from the select()
* call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
* block undefinable.
*/
selector.select(TIMEOUT);
/*
* If we are here, it is because an operation happened (or the TIMEOUT expired).
* We need to get the SelectionKeys from the selector to see what operations are available.
* We use an iterator for this.
*/
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// remove the key so that we don't process this OPERATION again.
keys.remove();
// key could be invalid if for example, the client closed the connection.
if (!key.isValid()) {
continue;
}
/*
* In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
* If the key from the keyset is Acceptable, then we must get ready to accept the client
* connection and do something with it. Go read the comments in the accept method.
*/
if (key.isAcceptable()) {
System.out.println("Accepting connection");
accept(key);
}
/*
* If you already read the comments in the accept() method, then you know we changed
* the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
* a channel that is writable (key.isWritable()). The write() method will explain further.
*/
if (key.isWritable()) {
System.out.println("Writing...");
write(key);
}
/*
* If you already read the comments in the write method then you understand that we registered
* the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
* that is ready to read (key.isReadable()). The read() method will explain further.
*/
if (key.isReadable()) {
System.out.println("Reading connection");
read(key);
}
}
}
您可以在此处找到实现源
作为非阻塞实现的替代方法,我们可以部署异步服务器。例如,您可以使用该类,它为面向流的侦听套接字提供异步通道。AsynchronousServerSocketChannel
若要使用它,请首先执行其静态方法,然后将其执行到特定端口。接下来,您将执行其方法,向其传递一个实现接口的类。大多数情况下,您会发现该处理程序是作为匿名内部类创建的。open()
bind()
accept()
CompletionHandler
从此对象中,您可以调用以告诉它开始侦听连接,并向其传递一个自定义实例。当我们调用时,它会立即返回。请注意,这与传统的阻止方法不同;而该方法在客户端连接到它之前被阻止,该方法将为您处理它。AsynchronousServerSocketChannel
accept()
CompletionHandler
accept()
accept()
AsynchronousServerSocketChannel
accept()
这里有一个例子:
public class NioSocketServer
{
public NioSocketServer()
{
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress(5000));
// Listen for a new request
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
{
@Override
public void completed(AsynchronousSocketChannel ch, Void att)
{
// Accept the next connection
listener.accept(null, this);
// Greet the client
ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));
// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
try {
// Read the first line
int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
boolean running = true;
while (bytesRead != -1 && running) {
System.out.println("bytes read: " + bytesRead);
// Make sure that we have data to read
if (byteBuffer.position() > 2) {
// Make the buffer ready to read
byteBuffer.flip();
// Convert the buffer into a line
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
// Debug
System.out.println("Message: " + line);
// Echo back to the caller
ch.write(ByteBuffer.wrap(line.getBytes()));
// Make the buffer ready to write
byteBuffer.clear();
// Read the next line
bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
} else {
// An empty line signifies the end of the conversation in our protocol
running = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
// The user exceeded the 20 second timeout, so close the connection
ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
System.out.println("Connection timed out, closing connection");
}
System.out.println("End of conversation");
try {
// Close the connection if we need to
if (ch.isOpen()) {
ch.close();
}
} catch (I/OException e1)
{
e1.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void att)
{
///...
}
});
} catch (I/OException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
NioSocketServer server = new NioSocketServer();
try {
Thread.sleep(60000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
您可以在此处找到完整代码
在Java中实现非阻塞套接字的最佳方法是什么?
只有一种方法。.SocketChannel.configureBlocking(false)
请注意,其中一些答案不正确。SocketChannel.configureBlocking(false) 将其置于非阻塞模式。你不需要这样做。您只需要 实现 超时或具有非阻塞套接字的多路复用 I/O。Selector
Selector