KafkaProducer:“回调”和返回的“Future”之间的区别?

2022-09-01 11:37:35

KafkaProducer 发送方法既返回一个 Future,又接受一个回调。

在发送完成后使用一种机制与另一种机制执行操作之间是否存在任何根本区别?


答案 1

查看您链接到的文档,看起来 Future 和回调之间的主要区别在于谁发起了“请求已完成,现在该怎么办?”问题。

假设我们有一个客户和一个面包师。并要求给他做一块漂亮的饼干。现在有2种可能的方法,面包师可以将美味的饼干退还给顾客。CBCB

前途

面包师接受了这个要求,并告诉顾客:好吧,等我吃完了,我就把你的饼干放在柜台上。(此协议是 .)Future

在这种情况下,客户负责检查柜台(),看看面包师是否完成了他的饼干。Future

阻塞顾客呆在柜台附近,看着它,直到饼干放在那里(Future.get())或面包师在那里道歉(错误:饼干面团出来)。

非阻塞客户做一些其他工作,偶尔检查cookie是否在柜台上等着他(Future.isDone())。如果 Cookie 准备就绪,客户将接受它(Future.get())。

回调

在这种情况下,客户在订购饼干后告诉面包师:当我的饼干准备好时,请把它交给我的宠物机器狗,他会知道该怎么做(这个机器人是回调)。

现在,当饼干准备好时,面包师把饼干交给狗,并告诉他跑回它的主人那里。面包师可以继续为另一位顾客烘烤下一块饼干。

狗跑回顾客身边,开始摇晃它的人造尾巴,让顾客知道他的饼干已经准备好了。

请注意,客户不知道什么时候会把饼干送给他,也没有主动轮询面包师,看看它是否准备好了。

这是两种方案之间的主要区别。谁负责发起“您的 Cookie 已准备就绪,您想用它做什么?”问题。对于未来,客户负责检查准备就绪的时间,无论是通过主动等待,还是不时地轮询。在回调的情况下,面包师将回调到提供的函数。


我希望这个答案能让你更好地了解未来和Calback到底是什么。一旦你有了大致的想法,你可以试着找出每个特定事情在哪个线程上被处理。当线程被阻塞时,或者以什么顺序完成一切。编写一些简单的程序来打印诸如“主客户端线程:cookie已接收”之类的语句可能是一种有趣的实验方式。


答案 2

异步方法

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
})

与同步相比,为您提供更好的吞吐量

RecordMetadata rm = producer.send(record).get();

因为在第一种情况下,您不会等待确认。

此外,在异步方式中,不能保证排序,而在同步方式中,它是 - 消息仅在收到确认后发送。

另一个区别可能是,在异常情况下的同步调用中,您可以在异常发生后立即停止发送消息,而在第二种情况下,在您发现某些错误并执行某些操作之前,将发送一些消息。

另请注意,在异步方法中,“处于 fligh”状态的消息数由参数控制。max.in.flight.requests.per.connection

除了同步和异步方法之外,您还可以使用Fire and Forget方法,这与同步方法几乎相同,但不处理返回的元数据 - 只需发送消息并希望它到达代理(知道它很可能会发生,并且生产者将在可恢复错误的情况下重试),但有些消息可能会丢失:

RecordMetadata rm = producer.send(record);

总结一下:

  • 开火和忘记 - 最快的一个,但有些消息可能会丢失;
  • 同步 - 最慢,如果您无法承受丢失消息,请使用它;
  • 异步 - 介于两者之间。

推荐