Kafka 如何存储每个主题的偏移量?
2022-09-01 06:02:35
在轮询Kafka时,我使用该函数订阅了多个主题。现在,我想设置要从每个主题中阅读的偏移量,而不是在每个主题之后重新订阅。在轮询数据之前,对每个主题名称进行迭代调用是否能获得结果?偏移量是如何精确存储在 Kafka 中的?subscribe()
seek()
poll()
seek()
我每个主题都有一个分区,只有一个消费者可以从所有主题中读取。
在轮询Kafka时,我使用该函数订阅了多个主题。现在,我想设置要从每个主题中阅读的偏移量,而不是在每个主题之后重新订阅。在轮询数据之前,对每个主题名称进行迭代调用是否能获得结果?偏移量是如何精确存储在 Kafka 中的?subscribe()
seek()
poll()
seek()
我每个主题都有一个分区,只有一个消费者可以从所有主题中读取。
Kafka 如何存储每个主题的偏移量?
Kafka已将偏移存储从zookeeper转移到kafka经纪人。原因如下:
Zookeeper 不是为高写入负载(如偏移量更新)提供服务的好方法,因为 Zookeeper 通过每个节点路由每个写入,因此无法对写入进行分区或以其他方式扩展写入。我们一直都知道这一点,但选择这种实现作为一种“方便的婚姻”,因为我们已经依赖于zk。
Kafka将偏移量提交存储在主题中,当消费者提交偏移量时,kafka将提交偏移量消息发布到“commit-log”主题,并保留一个内存中结构,该结构将组/主题/分区映射到最新的偏移量以进行快速检索。有关偏移管理的此页面,可以找到更多设计信息。
现在,我想设置我想从每个主题中读取的偏移量,而不是在每次从主题中搜索()和 poll() 后重新订阅。
有一个关于kafka管理工具的新功能来重置偏移量。
kafka-consumer-group.sh --bootstrap-server 127.0.0.1:9092 --group
your-consumer-group **--reset-offsets** --to-offset 1 --all-topics --execute
您可以使用更多选项。