KafkaConsumer 0.10 Java API 错误消息:没有分区的当前分配

2022-09-02 00:24:18

我正在使用KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量消耗。我查了一下,发现有一个寻求方法,但它抛出了一个异常。有人有类似的用例或解决方案吗?

法典:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

答案 1

在你可以之前,你首先需要把一个主题一个分区的主题给消费者。还要记住,并且很懒惰 - 因此,在使用之前,您还需要执行“虚拟调用”。seek()subscribe()assign()subscribe()assign()poll()seek()

注意:从 Kafka 2.0 开始,新的是异步的,不能保证您在返回时有一个完整的分配。因此,您可能需要在使用之前检查您的分配,并再次刷新分配。(详情请参阅KIP-266poll(Duration timeout)pollseek()poll

如果您使用 ,则使用组管理:因此,您可以使用相同的使用者启动多个使用者,并且该主题的所有分区将自动均匀地分配给组内的所有使用者(每个分区将分配给组中的单个使用者)。subscribe()group.id

如果要读取特定分区,则需要通过 使用手动分配。这允许您执行所需的任何任务。assign()

顺便说一句:有一个非常长的详细的类JavaDoc,包括示例。值得一读。KafkaConsumer


答案 2

如果您不想使用 poll() 和检索映射记录,并更改偏移量本身。卡夫卡版本 0.11 试试这个:

...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);    
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);    

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings
consumer.seekToBeginning(partitions); //or other seek

投票协调员事件。这可确保协调器是已知的,并且使用者已加入组(如果它使用组管理)。如果启用了定期偏移量提交,则此操作还会处理它们。


推荐