根据套接字在自己的池中是活的还是死的,向后释放套接字?

我正在使用下面的类,通过使用套接字同步或异步方式将数据发送到我们的消息传递队列,如下所示。这取决于我是否要调用同步或异步方法来在套接字上发送数据的要求。大多数时候,我们将异步发送数据,但有时我可能需要同步发送数据。

  • sendAsync- 它异步发送数据,我们不会阻止发送数据的线程。如果未收到确认,则它将从仅在构造函数中启动的后台线程重试。SendToQueue
  • send- 它在套接字上同步发送数据。它在内部调用方法,然后在特定的超时时间段内休眠,如果未收到确认,则会从存储桶中删除,以便我们不会再次重试。doSendAsynccache

因此,上述两种方法之间的唯一区别是 - 对于异步情况,如果未收到确认,我需要不惜一切代价重试,但对于同步,我根本不需要重试,这就是为什么我在类中存储更多状态的原因。PendingMessage

ResponsePoller是一个类,它接收发送到特定套接字上的消息传递队列的数据的确认,然后调用下面的方法来删除地址,以便我们在收到确认后不会重试。如果收到确认,则套接字处于活动状态,否则它是死的。handleAckReceived

public class SendToQueue {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .build();

  private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, boolean retryEnabled) {
      _address = address;
      _sendTimeMillis = System.currentTimeMillis();
      _encodedRecords = encodedRecords;
      _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
      return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
      _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
      return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
      try {
        synchronized (_monitor) {
          _monitor.wait(500L);
        }
        return _acknowledged;
      } catch (InterruptedException ie) {
        return false;
      }
    }

    public void ackReceived() {
      _acknowledged = true;
      synchronized (_monitor) {
        _monitor.notifyAll();
      }
    }

    public long getAddress() {
      return _address;
    }

    public byte[] getEncodedRecords() {
      return _encodedRecords;
    }
  }

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
      if (m.hasExpired()) {
        if (m.shouldRetry()) {
          m.markResent();
          doSendAsync(m, Optional.<Socket>absent());
        } else {
          cache.invalidate(m.getAddress());
        }
      }
    }
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
                                                  // and then delete entry from the cache
                                                  // accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetries();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m, Optional.<Socket>absent());
  }

  private boolean doSendAsync(final PendingMessage pendingMessage, final Optional<Socket> socket) {
    Optional<Socket> actualSocket = socket;
    if (!actualSocket.isPresent()) {
      SocketHolder liveSocket = SocketManager.getInstance().getSocket();
      actualSocket = Optional.of(liveSocket.getSocket());
    }

    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      return msg.send(actualSocket.get());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords) {
    return send(address, encodedRecords, Optional.<Socket>absent());
  }

  public boolean send(final long address, final byte[] encodedRecords,
      final Optional<Socket> socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }   

  // called by acknowledgment thread which is in "ResponsePoller" class
  public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
      m.ackReceived();
      cache.invalidate(address);
    }
  }
}

由于我在套接字上发送数据,如果我得到相同数据的确认,则意味着套接字处于活动状态,但如果数据未确认,则意味着套接字已死(但我会继续重试发送数据)。

因此,使用我上面的设计(或者如果有更好的方法),我如何确定任何套接字是死的还是活的,因为要么没有收到确认,要么是从该套接字接收的,并且基于我需要通过调用下面的方法来将套接字释放回其池中(无论它是活的还是死的),这取决于是否收到同步或异步情况的确认。

我还需要配置计数,如果在x的特定套接字上未收到确认(其中x是数字>0,默认值应为2)次,则仅标记套接字死亡。做这件事的最好和有效的方法是什么?

SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);

答案 1

假设您在家中,您已将电缆插入笔记本电脑和路由器,路由器后面有电缆调制解调器。如果您关闭路由器 - 您的笔记本电脑将知道 - 没有电压。如果关闭调制解调器...这很棘手。你根本无法知道这一点。一个潜在的问题是没有到主机的路由。但是,即使您已连接,也可能是任何其他问题。一些协议 - 如ssh内置ping - 因此它们具有保持活动状态以进行连接。如果你的应用在每个时间间隔都不执行任何操作,则客户端和服务器之间会有 pingpong,因此你知道它是否处于活动状态。

如果您对协议有完全控制权 - 保持活动状态是选项之一。

您的客户处于一端,但总的来说,很难让双方确保他们达成协议。拜占庭将军问题描述了一些通用网络模型,其中每个节点都不知道任何其他节点,并且只能信任知道的东西。

一般来说,我不会自己编写分布式系统。我正在使用Hystrix来实现这一目标。链接是他们的配置,所以你可以看到它有多大。您可以跟踪服务器是否正常工作。此外,当它再次出现时,您可以准备策略来弄清楚它,而不是淹没它,取消过时的消息等等 - 图形,统计数据,与其他解决方案的集成。有大社区使用它,并解决问题。这是比自己做更好的选择。

不确定您是否只有这一项服务可以与之交谈,或者Hystrix是否适合您。在Java中,人们倾向于使用层,框架,如果他们处理问题......希望它有帮助。


答案 2

推荐