卡夫卡消费者多主题
2022-09-01 05:26:47
我有一个主题列表(现在是10个),其大小将来可能会增加。我知道我们可以生成多个线程(每个主题)来从每个主题消费,但是在我的情况下,如果主题的数量增加,那么从主题消费的线程数量就会增加,这是我不想要的,因为主题不会太频繁地获取数据,所以线程将是理想的。
有没有办法让单个消费者从所有主题中消费?如果是,那么我们如何实现它?另外,Kafka将如何保持偏移量?请建议答案。
我有一个主题列表(现在是10个),其大小将来可能会增加。我知道我们可以生成多个线程(每个主题)来从每个主题消费,但是在我的情况下,如果主题的数量增加,那么从主题消费的线程数量就会增加,这是我不想要的,因为主题不会太频繁地获取数据,所以线程将是理想的。
有没有办法让单个消费者从所有主题中消费?如果是,那么我们如何实现它?另外,Kafka将如何保持偏移量?请建议答案。
我们可以使用以下 API 订阅多个主题:consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
消费者有主题信息,我们可以通过创建OffsetAndMetadata对象来使用consumer.commitAsync或consumer.commitSync(),如下所示。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
不需要多个线程,你可以有一个消费者,从多个主题消费。偏移量由 zookeeper 维护,因为 kafka 服务器本身是无状态的。每当消费者使用消息时,其偏移量就会与 zookeeper 一起提交,以保持未来的跟踪,以便仅处理每条消息一次。因此,即使在 kafka 失败的情况下,消费者也会从最后一次提交的偏移量开始消耗。