Kafka - 使用高级使用者的延迟队列实现
2022-09-01 14:29:51
想要使用高级使用者 API 实现延迟的使用者
主要思想:
- 按键生成消息(每个消息都包含创建时间戳),这可确保每个分区都按生成时间对消息进行排序。
- auto.commit.enable=false (将在每个消息进程后显式提交)
- 使用消息
- 检查消息时间戳并检查是否经过足够的时间
- 进程消息(此操作永远不会失败)
-
提交 1 偏移量
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
关于此实现的一些担忧:
- 提交每个偏移量可能会降低 ZK 的速度
- consumer.commitOffsets 可以抛出一个异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
- 问题等待很长时间而不提交偏移量,例如延迟期为24小时,将从迭代器获得下一个,睡眠24小时,处理和提交(ZK会话超时?
- ZK会话如何在不提交新偏移量的情况下保持活动状态?(设置一个蜂巢 zookeeper.session.timeout.ms 可以在死亡消费者中解决,而无需识别它)
- 缺少任何其他问题吗?
谢谢!