如何在将数据发送到其他应用程序时实施重试策略?
2022-09-04 21:07:27
我正在处理将数据发送到 的应用程序。以下是我的应用程序的作用:zeromq
- 我有一个类
SendToZeroMQ
,它将数据发送到zeromq。 - 将相同的数据添加到同一类中的
retryQueue
中,以便在以后未收到确认时可以重试。它使用具有最大大小限制的番石榴缓存。 - 有一个单独的线程,该线程从 zeromq 接收先前发送的数据的确认,如果未收到确认,则
SendToZeroMQ
将重试发送同一段数据。如果收到确认,我们将从重试队列
中删除它,以便无法再次重试。
想法非常简单,我必须确保我的重试策略工作正常,这样我就不会丢失数据。这是非常罕见的,但以防万一我们没有收到确认。
我正在考虑构建两种类型的重试策略,
但我无法理解如何在这里构建与我的程序相对应的重试策略:
-
RetryNTimes:
在这种情况下,它将在每次重试之间以特定的睡眠重试N次,之后,它将删除记录。 -
ExponentialBackoffRetry:
在这种情况下,它将呈指数级继续重试。我们可以设置一些最大重试限制,之后它不会重试,而是会删除记录。
以下是我的 SendToZeroMQ
类,它将数据发送到 zeromq,也从后台线程每 30 秒重试一次,并启动永远保持运行的 ResponsePoller
runnable:
public class SendToZeroMQ {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long, byte[]> retryQueue =
CacheBuilder
.newBuilder()
.maximumSize(10000000)
.concurrencyLevel(200)
.removalListener(
RemovalListeners.asynchronous(new CustomListener(), executorService)).build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorService.submit(new ResponsePoller());
// retry every 30 seconds for now
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
sendTo(entry.getKey(), entry.getValue());
}
}
}, 0, 30, TimeUnit.SECONDS);
}
public boolean sendTo(final long address, final byte[] encodedRecords) {
Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}
public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// adding to retry queue
retryQueue.put(address, encodedByteArray);
return sent;
}
public void removeFromRetryQueue(final long address) {
retryQueue.invalidate(address);
}
}
下面是我的 ResponsePoller
类,它轮询来自 zeromq 的所有确认。如果我们从 zeromq 收到确认,那么我们将从重试队列中删除该记录,以便它不会重试,否则它将被重试。
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry queue since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryQueue(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
问题:
正如你在上面看到的,我使用SendToZeroMQ
类发送到zeromq,然后每30秒重试一次,这取决于我们是否从类中恢复了一个acknolwedgement。encodedRecords
ResponsePoller
对于每个都有一个唯一的密钥,这就是我们将从zeromq作为确认返回的密钥。encodedRecords
address
如何继续扩展此示例以构建我上面提到的两个重试策略,然后我可以选择要在发送数据时使用的重试策略。我想出了下面的接口,但后来我无法理解我应该如何继续实现这些重试策略并在上面的代码中使用它。
public interface RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs);
}