Kafka - 使用高级使用者的延迟队列实现

想要使用高级使用者 API 实现延迟的使用者

主要思想:

  • 按键生成消息(每个消息都包含创建时间戳),这可确保每个分区都按生成时间对消息进行排序。
  • auto.commit.enable=false (将在每个消息进程后显式提交)
  • 使用消息
  • 检查消息时间戳并检查是否经过足够的时间
  • 进程消息(此操作永远不会失败)
  • 提交 1 偏移量

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    

关于此实现的一些担忧:

  1. 提交每个偏移量可能会降低 ZK 的速度
  2. consumer.commitOffsets 可以抛出一个异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
  3. 问题等待很长时间而不提交偏移量,例如延迟期为24小时,将从迭代器获得下一个,睡眠24小时,处理和提交(ZK会话超时?
  4. ZK会话如何在不提交新偏移量的情况下保持活动状态?(设置一个蜂巢 zookeeper.session.timeout.ms 可以在死亡消费者中解决,而无需识别它)
  5. 缺少任何其他问题吗?

谢谢!


答案 1

一种方法是使用不同的主题,在其中推送所有要延迟的消息。如果所有延迟的消息都应在同一时间延迟后进行处理,这将相当简单:

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

现在,所有常规消息都将尽快处理,而那些需要延迟的消息将被放在另一个主题上。

好消息是,我们知道延迟主题开头的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个读取 head 消息的使用者,检查时间戳是否为过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而只是休眠,直到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以对延迟主题进行分区(例如,24小时,12小时,6小时)。如果延迟时间比这更动态,它就会变得更加复杂。您可以通过引入两个延迟主题来解决它。读取延迟主题之外的所有消息,并处理其值为过去的所有消息。在其他人中,您只需找到最接近的那个,然后将它们放在主题上。休眠,直到最接近的一个应该被处理,并以相反的方式完成所有操作,即处理来自主题的消息,并将一次不应该被处理回到主题上。AdelayTodelayToBBA

回答您的具体问题(有些问题已在对您问题的评论中得到解决)

  1. 提交每个偏移量可能会降低 ZK 的速度

您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始提供此功能,在使用者配置中签出属性)offsets.storage

  1. consumer.commitOffsets 会引发异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)

我相信它可以,例如,如果它无法与偏移存储进行通信。但是,正如您所说,使用幂等消息可以解决此问题。

  1. 长时间等待而不提交偏移量的问题,例如延迟期为24小时,将从迭代器获得下一个,睡眠24小时,处理和提交(ZK会话超时?

这不会是上述解决方案的问题,除非消息本身的处理时间超过会话超时。

  1. ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个蜂巢 zookeeper.session.timeout.ms 可以在死亡的消费者中解析,而无需识别它)

同样,使用上述内容,您不需要设置较长的会话超时。

  1. 我错过了任何其他问题吗?

总有;)


答案 2

使用Tibco EMS或其他JMS Queue。它们内置了重试延迟。Kafka可能不是您正在做的事情的正确设计选择


推荐