如何使用Kafka发送大型消息(超过15MB)?

2022-08-31 07:19:15

我使用Java Producer API向Kafka V. 0.8发送String消息。如果邮件大小约为15 MB,我得到一个.我已尝试设置为 40 MB,但我仍然遇到异常。小消息没有问题。MessageSizeTooLargeExceptionmessage.max.bytes

(异常出现在生产者中,我在此应用程序中没有使用者。

我能做些什么来摆脱这个例外?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

答案 1

您需要调整三个(或四个)属性:

  • 使用者端: - 这将确定使用者可以获取的消息的最大大小。fetch.message.max.bytes
  • 代理端: - 这将允许代理中的副本在群集内发送消息,并确保正确复制消息。如果这太小,则永远不会复制消息,因此,使用者将永远不会看到该消息,因为永远不会提交(完全复制)消息。replica.fetch.max.bytes
  • 代理端: - 这是代理可以从生产者接收的最大消息大小。message.max.bytes
  • 代理端(每个主题): - 这是代理将允许附加到主题的消息的最大大小。此大小在预压缩时经过验证。(默认为经纪人的 .)max.message.bytesmessage.max.bytes

我发现了关于数字2的艰难方法 - 你不会从Kafka那里得到任何异常,消息或警告,所以当你发送大消息时,一定要考虑这一点。


答案 2

laughing_man的答案相比,Kafka 0.10和新消费者需要进行微小的更改:

  • 经纪人:没有变化,你仍然需要增加属性和. 必须等于或小于 (*)。message.max.bytesreplica.fetch.max.bytesmessage.max.bytesreplica.fetch.max.bytes
  • 创建者:增加以发送更大的消息。max.request.size
  • 使用者:增加以接收更大的消息。max.partition.fetch.bytes

(*)阅读评论以了解更多信息message.max.bytes<=replica.fetch.max.bytes


推荐