根据套接字在自己的池中是活的还是死的,向后释放套接字?
我正在使用下面的类,通过使用套接字同步或异步方式将数据发送到我们的消息传递队列,如下所示。这取决于我是否要调用同步或异步方法来在套接字上发送数据的要求。大多数时候,我们将异步发送数据,但有时我可能需要同步发送数据。
-
sendAsync
- 它异步发送数据,我们不会阻止发送数据的线程。如果未收到确认,则它将从仅在构造函数中启动的后台线程重试。SendToQueue
-
send
- 它在套接字上同步发送数据。它在内部调用方法,然后在特定的超时时间段内休眠,如果未收到确认,则会从存储桶中删除,以便我们不会再次重试。doSendAsync
cache
因此,上述两种方法之间的唯一区别是 - 对于异步情况,如果未收到确认,我需要不惜一切代价重试,但对于同步,我根本不需要重试,这就是为什么我在类中存储更多状态的原因。PendingMessage
ResponsePoller
是一个类,它接收发送到特定套接字上的消息传递队列的数据的确认,然后调用下面的方法来删除地址,以便我们在收到确认后不会重试。如果收到确认,则套接字处于活动状态,否则它是死的。handleAckReceived
public class SendToQueue {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized (_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
} catch (InterruptedException ie) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized (_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, Optional.<Socket>absent());
} else {
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
// and then delete entry from the cache
// accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Optional<Socket> socket) {
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent()) {
SocketHolder liveSocket = SocketManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
return msg.send(actualSocket.get());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords) {
return send(address, encodedRecords, Optional.<Socket>absent());
}
public boolean send(final long address, final byte[] encodedRecords,
final Optional<Socket> socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by acknowledgment thread which is in "ResponsePoller" class
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
由于我在套接字上发送数据,如果我得到相同数据的确认,则意味着套接字处于活动状态,但如果数据未确认,则意味着套接字已死(但我会继续重试发送数据)。
因此,使用我上面的设计(或者如果有更好的方法),我如何确定任何套接字是死的还是活的,因为要么没有收到确认,要么是从该套接字接收的,并且基于我需要通过调用下面的方法来将套接字释放回其池中(无论它是活的还是死的),这取决于是否收到同步或异步情况的确认。
我还需要配置计数,如果在x的特定套接字上未收到确认(其中x是数字>0,默认值应为2)次,则仅标记套接字死亡。做这件事的最好和有效的方法是什么?
SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);