卡夫卡消费者。commitSync vs commitAsync

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 语录

缺点是,虽然 commitSync() 将重试提交,直到它成功或遇到不可重放的故障,但 commitAsync() 不会重试。

这句话我不清楚。我假设消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败。我错了吗?

您能详细澄清一下它们的区别吗?
另外,请提供用例,说明我应该更喜欢哪种提交类型。commitSynccommitAsync


答案 1

正如API文档中所说:


这是一个同步提交,将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛给调用方)。

这意味着,这是一种阻塞方法。调用它将阻塞您的线程,直到它成功或失败。commitSync

例如

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

对于 for 循环中的每个迭代,只有在成功返回或中断并引发异常后,您的代码才会移动到下一个迭代。consumer.commitSync()


这是一个异步调用,不会阻塞。遇到的任何错误要么传递给回调(如果提供),要么被丢弃。

这意味着,这是一种非阻塞方法。调用它不会阻塞您的线程。相反,它将继续处理以下指令,无论它最终是成功还是失败。commitAsync

例如,与前面的示例类似,但这里我们使用:commitAsync

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

对于 for 循环中的每个迭代,无论最终发生什么情况,您的代码都将移动到下一个迭代。并且,提交的结果将由您定义的回调函数处理。consumer.commitAsync()


权衡:延迟与数据一致性

  • 如果必须确保数据一致性,请选择,因为它将确保在执行任何进一步操作之前,您将知道偏移量提交是成功还是失败。但是由于它是同步和阻塞的,因此您将花费更多时间等待提交完成,从而导致高延迟。commitSync()
  • 如果您确定某些数据不一致并希望具有低延迟,请选择因为它不会等待完成。相反,它只会发送提交请求,并在以后处理来自Kafka的响应(成功或失败),同时,您的代码将继续执行。commitAsync()

一般来说,这一切都取决于实际行为和调用方法的位置。


答案 2

使用 commitAsync() 进行可靠的重试处理

在“Kafka - The Definitive Guide”一书中,有一个关于如何缓解由于异步提交而提交较低偏移量的潜在问题的提示:

重试异步提交:为异步重试获取正确的提交顺序的一种简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到 commit 异步回调中。当您准备发送重试时,请检查回调获得的提交序列号是否等于实例变量;如果是,则没有较新的提交,可以安全地重试。如果实例序列号较高,请不要重试,因为已发送较新的提交。

下面的代码描述了一个可能的解决方案:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}