如何获取 kafka 主题分区的最后/结束偏移量?

2022-09-01 02:21:00

我正在使用Java编写一个消费者。我想保持消息的实时性,因此,如果有太多消息等待消耗,例如1000或更多,我应该放弃未消耗的消息,并从最后一个偏移量开始消耗。kafka

对于这个问题,我尝试比较一个主题的上次提交偏移量和结束偏移量(只有1个分区),如果这两个偏移量之间的差值大于一定数量,我会将主题的最后一个提交偏移量设置为下一个偏移量,以便我可以放弃那些冗余消息。

现在我的问题是,一个话题的结尾如何得到偏移,有人说我可以使用老消费者,但太复杂了,新消费者有这个功能吗?


答案 1

新的消费者也很复杂。

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();


答案 2

您还可以使用 kafka 服务器命令行工具:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name

输出的形式是,例如,参见 https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-GetOffsetShell.html 了解更多细节。<topicName>:<partitionID>:<offset>t1:0:0


推荐