无限期等待可能永远不会到达的消息

我有一个Java类型的actor,负责对可能暂时不可用的外部资源进行过滤/重试逻辑。参与者的领域和常用的方法是:

public class MyActorImpl implements MyActor {
    private static final long MINWAIT = 50;
    private static final long MAXWAIT = 1000;
    private static final long DEFAULTWAIT = 0;
    private static final double BACKOFFMULTIPLIER = 1.5;

    private long updateWait(long currentWait) {
        return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
    }

    // mutable
    private long opWait = DEFAULTWAIT;
    private final Queue<OpInput> opBuffer = new ArrayDeque<>();

    // called from external actor
    public void operation(OpInput opInput) {
        operation(opInput, DEFAULTWAIT);
    }

    // called internally
    public void operation(OpInput opInput, long currentWait);
}

执行组件具有多个操作,这些操作都或多或少具有相同的重试/缓冲逻辑;每个操作都有自己的字段。[op]Wait[op]Buffer

  1. 父角色调用void operation(OpInput opInput)
  2. 前面的方法调用 using for 第二个参数void operation(OpInput opInput, long currentWait)DEFAULTWAIT
  3. 如果参数不相等,则输入存储在 中,否则输入被发送到外部资源。currentWaitopWaitopBuffer
  4. 如果外部资源返回成功,则 设置为 ,并且 通过该方法发回 的内容。如果外部资源(或更可能是网络)返回错误,则我使用ms的延迟在actor系统调度程序上进行更新和调度。opWaitDEFAULTWAITopBufferoperation(opInput)opWait = updateWait(opWait)operation(opInput, opWait)opWait

也就是说,我正在使用actor系统调度器来实现指数退避;我正在使用该参数来标识我正在重试的消息,并正在缓冲其他消息,直到外部资源成功处理主消息。currentWait

问题是,如果计划的消息丢失,那么我将永远缓冲消息,因为所有其他消息的防护都将失败。我可以使用类似spring-retry的东西来实现指数退避,但我没有看到合并操作的重试循环的方法,这意味着我可以为每个重试循环使用一个线程(而使用actor系统的调度程序不会给系统带来更大的压力)。operation(opInput, currentWait)currentWait == opWait

我正在寻找一种更容错的方法,在参与者和外部资源之间的接口上实现缓冲和指数退避,而不必为任务分配太多资源。


答案 1

如果我对你的理解是正确的,如果唯一的问题是丢失计划的消息,你为什么不对该特定消息使用类似可靠代理模式的东西,然后如果它失败opWait = DEFAULTWAIT;

关于你的代码,我得到了一些东西,我不明白你说的外部调用是什么意思。您的意思是此方法正在与网络进行交互,该网络使用有时不可用的资源?public void operation(OpInput opInput)

如果可以的话,我建议一个替代方案。据我所知,您的主要问题是您有时有一个不可用的资源,因此您有某种使用某种等待逻辑实现的que/buffer,以便消息在再次可用时将被处理,不幸的是,这涉及一些消息可能会丢失并导致无限等待。我认为你可以使用超时期货来实现你想要的。如果将来未在一定时间内完成,则重试,最多说 3 次重试。您甚至可以根据服务器负载以及完成消息所需的时间来调整此时间。希望有所帮助。


答案 2

推荐