如何在将数据发送到其他应用程序时实施重试策略?

我正在处理将数据发送到 的应用程序。以下是我的应用程序的作用:zeromq

  • 我有一个类SendToZeroMQ,它将数据发送到zeromq。
  • 将相同的数据添加到同一类中的 retryQueue 中,以便在以后未收到确认时可以重试。它使用具有最大大小限制的番石榴缓存。
  • 有一个单独的线程,该线程从 zeromq 接收先前发送的数据的确认,如果未收到确认,则 SendToZeroMQ 将重试发送同一段数据。如果收到确认,我们将从重试队列中删除它,以便无法再次重试。

想法非常简单,我必须确保我的重试策略工作正常,这样我就不会丢失数据。这是非常罕见的,但以防万一我们没有收到确认。

我正在考虑构建两种类型的重试策略,但我无法理解如何在这里构建与我的程序相对应的重试策略:

  • RetryNTimes:在这种情况下,它将在每次重试之间以特定的睡眠重试N次,之后,它将删除记录。
  • ExponentialBackoffRetry:在这种情况下,它将呈指数级继续重试。我们可以设置一些最大重试限制,之后它不会重试,而是会删除记录。

以下是我的 SendToZeroMQ 类,它将数据发送到 zeromq,也从后台线程每 30 秒重试一次,并启动永远保持运行的 ResponsePoller runnable:

public class SendToZeroMQ {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
  private final Cache<Long, byte[]> retryQueue =
      CacheBuilder
          .newBuilder()
          .maximumSize(10000000)
          .concurrencyLevel(200)
          .removalListener(
              RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
          sendTo(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  public boolean sendTo(final long address, final byte[] encodedRecords) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
  }

  public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // adding to retry queue
    retryQueue.put(address, encodedByteArray);
    return sent;
  }

  public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
  }
}

下面是我的 ResponsePoller 类,它轮询来自 zeromq 的所有确认。如果我们从 zeromq 收到确认,那么我们将从重试队列中删除该记录,以便它不会重试,否则它将被重试。

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
                long address = TestUtils.getAddress(frame.getData());
                // remove from retry queue since we got the acknowledgment for this record
                SendToZeroMQ.getInstance().removeFromRetryQueue(address);               
            } catch (Exception ex) {
                // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

问题:

正如你在上面看到的,我使用SendToZeroMQ类发送到zeromq,然后每30秒重试一次,这取决于我们是否从类中恢复了一个acknolwedgement。encodedRecordsResponsePoller

对于每个都有一个唯一的密钥,这就是我们将从zeromq作为确认返回的密钥。encodedRecordsaddress

如何继续扩展此示例以构建我上面提到的两个重试策略,然后我可以选择要在发送数据时使用的重试策略。我想出了下面的接口,但后来我无法理解我应该如何继续实现这些重试策略并在上面的代码中使用它。

public interface RetryPolicy {
    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     */
    public boolean allowRetry(int retryCount, long elapsedTimeMs);
}

我可以在这里使用番石榴重试故障安全,因为这些库已经有许多我可以使用的重试策略吗?


答案 1

我无法计算出有关如何使用相关API-s的所有细节,但至于算法,您可以尝试:

  • 重试策略需要将某种状态附加到每条消息(至少是当前消息重试的次数,可能是当前延迟)。您需要决定重试策略是否应保留它本身,或者是否要将其存储在消息中。
  • 而不是 allowRetry,您可以有一个方法来计算何时应该发生下一次重试(在绝对时间内或将来以毫秒为单位),这将是上述状态的函数
  • 重试队列应包含有关何时应重试每条消息的信息。
  • 而不是使用 ,在重试队列中找到具有最低值的消息(可能通过按绝对重试时间戳排序并选择第一个),并让执行器服务重新调度自身使用和scheduleAtFixedRatewhen_is_next_retryscheduletime_to_next_retry
  • 对于每次重试,将其从重试队列中拉出,发送消息,使用重试策略计算下一次重试的时间(如果要重试),并使用新值插入回重试队列(如果重试策略返回 -1,则可能意味着不再重试该消息)when_is_next_retry

答案 2

这不是一个完美的方法,但也可以通过以下方式实现。

public interface RetryPolicy {
public boolean allowRetry();
public void decreaseRetryCount();

}

创建两个实现。对于重试 NTimes

public class RetryNTimes implements RetryPolicy {

private int maxRetryCount;
public RetryNTimes(int maxRetryCount) {
    this.maxRetryCount = maxRetryCount;
}

public boolean allowRetry() {
    return maxRetryCount > 0;
}

public void decreaseRetryCount()
{
    maxRetryCount = maxRetryCount-1;
}}

对于指数回退重试

public class ExponentialBackoffRetry implements RetryPolicy {

private int maxRetryCount;
private final Date retryUpto;

public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) {
    this.maxRetryCount = maxRetryCount;
    this.retryUpto = retryUpto;
}

public boolean allowRetry() {
    Date date = new Date();
    if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0)
    {
        return false;
    }
    return true;
}

public void decreaseRetryCount() {
    maxRetryCount = maxRetryCount-1;
}}

您需要在 SendToZeroMQ 类中进行一些更改

public class SendToZeroMQ {

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long,RetryMessage> retryQueue =
        CacheBuilder
                .newBuilder()
                .maximumSize(10000000)
                .concurrencyLevel(200)
                .removalListener(
                        RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}

public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
}

private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
            for (Map.Entry<Long, RetryMessage> entry : retryQueue.asMap().entrySet()) {
                RetryMessage retryMessage = entry.getValue();
                if(retryMessage.getRetryPolicy().allowRetry())
                {
                    retryMessage.getRetryPolicy().decreaseRetryCount();
                    entry.setValue(retryMessage);
                    sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage);

                }else
                {
                    retryQueue.asMap().remove(entry.getKey());
                }
            }
        }
    }, 0, 30, TimeUnit.SECONDS);
}



public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
        return false;
    }
    if(null==retryMessage)
    {
        RetryPolicy retryPolicy = new RetryNTimes(10);
        retryMessage = new RetryMessage(retryPolicy,encodedRecords);
        retryQueue.asMap().put(address,retryMessage);
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}

public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    return sent;
}

public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
}}

推荐