卡夫卡消费者。commitSync vs commitAsync
缺点是,虽然 commitSync() 将重试提交,直到它成功或遇到不可重放的故障,但 commitAsync() 不会重试。
这句话我不清楚。我假设消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败。我错了吗?
您能详细澄清一下它们的区别吗?
另外,请提供用例,说明我应该更喜欢哪种提交类型。commitSync
commitAsync
缺点是,虽然 commitSync() 将重试提交,直到它成功或遇到不可重放的故障,但 commitAsync() 不会重试。
这句话我不清楚。我假设消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败。我错了吗?
您能详细澄清一下它们的区别吗?
另外,请提供用例,说明我应该更喜欢哪种提交类型。commitSync
commitAsync
正如API文档中所说:
这是一个同步提交,将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛给调用方)。
这意味着,这是一种阻塞方法。调用它将阻塞您的线程,直到它成功或失败。commitSync
例如
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
对于 for 循环中的每个迭代,只有在成功返回或中断并引发异常后,您的代码才会移动到下一个迭代。consumer.commitSync()
这是一个异步调用,不会阻塞。遇到的任何错误要么传递给回调(如果提供),要么被丢弃。
这意味着,这是一种非阻塞方法。调用它不会阻塞您的线程。相反,它将继续处理以下指令,无论它最终是成功还是失败。commitAsync
例如,与前面的示例类似,但这里我们使用:commitAsync
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
对于 for 循环中的每个迭代,无论最终发生什么情况,您的代码都将移动到下一个迭代。并且,提交的结果将由您定义的回调函数处理。consumer.commitAsync()
权衡:延迟与数据一致性
commitSync()
commitAsync()
一般来说,这一切都取决于实际行为和调用方法的位置。
在“Kafka - The Definitive Guide”一书中,有一个关于如何缓解由于异步提交而提交较低偏移量的潜在问题的提示:
重试异步提交:为异步重试获取正确的提交顺序的一种简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到 commit 异步回调中。当您准备发送重试时,请检查回调获得的提交序列号是否等于实例变量;如果是,则没有较新的提交,可以安全地重试。如果实例序列号较高,请不要重试,因为已发送较新的提交。
下面的代码描述了一个可能的解决方案:
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}