Apache Kafka 客户端何时引发“批量过期”异常?

2022-09-01 00:27:12

使用Apache Kafka Java客户端(0.9),我正在尝试使用Kafka Producer类向代理发送一长串记录。

异步发送方法会立即返回一段时间,然后在短时间内开始阻止每个调用。大约三十秒后,客户端开始引发异常(TimeoutException),并显示消息“批处理已过期”。

什么情况会导致引发此异常?


答案 1

此异常表示您正在以比发送记录更快的速度排队记录。

调用 send 方法时,ProducerRecord 将存储在内部缓冲区中,以便发送到代理。一旦 ProducerRecord 被缓冲,该方法将立即返回,无论它是否已被发送。

记录被分组为多个批次以发送到代理,以减少每条消息的传输积压并提高吞吐量。

添加批记录后,发送该批记录有时间限制,以确保在指定的持续时间内发送该批记录。这由创建器配置参数 request.timeout.ms 控制,该参数默认为 30 秒。

如果批处理的排队时间超过超时限制,则将引发异常。该批中的记录将从发送队列中删除。

使用配置参数增加超时限制将允许客户端在过期之前将批处理排队更长时间。


答案 2

我在一个完全不同的环境中得到了这个例外。

我已经设置了一个由动物园管理员vm,经纪人vm和生产者/消费者vm组成的迷你集群。我在服务器 (9092) 和 zookeeper (2181) 上打开了所有必要的端口,然后尝试将消息从使用者/发布者 vm 发布到代理。我得到了OP提到的异常,但由于到目前为止我只发布了一条消息(或者至少我尝试过),解决方案不能是增加超时或批量大小。因此,我搜索并发现了这个邮件列表,该列表描述了我在尝试从消费者/生产者vm(ClosedChannelException)内部使用消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 此邮件列表中的最后一篇文章实际上描述了如何解决问题。

长话短说,如果您同时遇到异常,则可能必须将此行更改为文件中的以下内容并重新启动代理:ChannelClosedExceptionBatch Expiredserver.config

advertised.host.name=<broker public IP address>

如果未设置,它将回退到属性(可能也没有设置),然后回退到 Java 类的规范主机名,这当然是不正确的,因此混淆了远程节点。host.nameInetAddress


推荐