卡夫卡再平衡。重复处理问题

2022-09-04 04:49:45

我有一个消费者工作者应用程序,它在内部启动了许多线程,每个线程都在生成它的KafkaCosnumer。Cosnumers具有相同的主题,并订阅了相同的主题。因此,每个消费者都能获得公平的分区份额。XgroupId

处理的性质是我不能丢失消息,也不能允许重复。我正在运行的 kafka 版本是 0.10.2.1。

这就是我面临的问题:使用者线程 1 开始使用消息,on 获取一批消息。我还实现了 ,以便每次成功处理消息时,它都会被添加到映射中。(请参阅下面的代码。因此,一旦发生重新平衡,我就可以在将分区重新分配给其他使用者之前提交偏移量。有时,为了处理该批处理,它需要更长的时间,这是重新平衡发生的地方,分区从使用者 1 中提取并分配给使用者 2。使用者 1 不知道分区已被吊销,并继续处理消息,同时使用者 2 从最后一个偏移量(由 RebalanceListener 提交)中选取并处理相同的消息。poll()ConsumerRebalanceListeneroffsetsmax.poll.interval.ms

有没有办法通知使用者他已经撤销了分区,以便他可以停止处理循环中的消息,这些消息已分配给另一个使用者?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener {

    private final KafkaConsumer<K, V> consumer;

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
            Maps.newConcurrentMap();

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

    public RebalanceListener(KafkaConsumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
                topic, partition, offset);
        CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset, "commit"));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return CURRENT_OFFSETS;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        LOGGER.debug("message=Comitting offsets for partititions [{}]",
                CURRENT_OFFSETS.keySet().stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        consumer.commitSync(CURRENT_OFFSETS);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
    }

}

我想我可以在内部创建一个并发映射,然后在处理每个消息之前检查当前消费者是否仍然与记录相关联(每个都有和字段)。如果没有 - 打破循环并制定下一个.consumerId -- TopicPartitionRebalanceListenerConsumerRecordtopicpartitionpoll()

如果我的工作应用在单个实例中运行,即使有多个 KafkaConsumer 线程旋转,这将是一个可行的解决方案。但是,一旦我将其放大,我将无法在静态映射中隐藏偏移量和消费者主题分区映射。那必须是某种集中式存储,数据库,或者说,Redis。

但是,在我每次处理一个项目之前,我都必须询问我的记录是否可以由当前的消费者线程合法处理。在扩展的工作应用程序的情况下,它将是对外部存储的网络调用,这将破坏使用kafka的目的,因为它会减慢处理速度。我可能只是选择在处理单个项目后执行偏移量提交。


答案 1

你需要实现 onPartitionsRevoked()

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

可以保证,在调用分区分配的任何进程之前,所有使用者进程都将调用 onPartitionsRevoked。因此,如果偏移量或其他状态保存在 onPartitionsRevoked 调用中,则保证在接管该分区的进程调用其 onPartitionsAssigned 回调以加载状态时保存该偏移量或其他状态。


答案 2

消费者平衡Listener的javadoc说

每当分区分配发生更改时,此回调将仅在用户线程中作为 poll(long) 调用的一部分执行。

因此,您不必担心在处理 poll() 返回的最后一批消息的过程中发生分区重新分配。在你完成处理所有这些消息并再次调用 poll() 之前,它不会发生。

javadoc还说:

可以保证,在调用分区分配的任何进程之前,所有使用者进程都将调用 onPartitionsRevoked。因此,如果偏移量或其他状态保存在 onPartitionsRevoked 调用中,则保证在接管该分区的进程调用其 onPartitionsAssigned 回调以加载状态时保存该偏移量或其他状态。


推荐