如何以健壮的方式处理kafka发布失败

我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至不应该错过任何一条消息。所以问题是:如果发布到Kafka由于任何原因(ZooKeeper关闭,Kafka代理关闭等)而失败,我们如何可靠地处理这些消息,并在事情再次恢复后重放它们。同样,正如我所说,我们甚至不能承受一个信息失败。另一个用例是,我们还需要知道在任何给定的时间点,由于任何原因(例如计数器功能之类的东西)而未能发布到Kafka的消息数量,现在这些消息需要再次重新发布。

其中一个解决方案是将这些消息推送到某个数据库(如Cassandra,其中写入速度非常快,但我们还需要计数器功能,我想Cassandra计数器功能不是很好,我们不想使用它)。它可以处理这种负载,并为我们提供非常准确的计数器工具。

这个问题更多的是从架构的角度来看的,然后是使用哪种技术来实现这一目标。

PS:我们处理一些像3000TPS的地方。因此,当系统开始失败时,这些失败的消息可以在很短的时间内变得非常快。我们使用的是基于Java的框架。

感谢您的帮助!


答案 1

我去派对超级晚了。但是我看到上面的答案中缺少一些东西:)

选择像Cassandra这样的分布式系统的策略是一个不错的主意。一旦Kafka启动并正常,您可以重试写入此中的所有消息。

我想回答“知道在给定时间有多少条消息未能发布”

从标签中,我看到您正在使用和。您可以为创建者编写自定义回调,此回调可以告诉您消息是否失败或成功发布。失败时,记录消息的元数据。apache-kafkakafka-consumer-api

现在,您可以使用日志分析工具来分析故障。一个这样的体面工具是Splunk。

下面是一个小代码片段,可以更好地解释我正在谈论的回调:

public class ProduceToKafka {

  private ProducerRecord<String, String> message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer<String, String> myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord<String, String> message = new ProducerRecord<>(
        "topicName", string);

    myProducer.send(message, new MyCallback(message.key(), message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key, String value) {
      this.key = key;
      this.value = value;
    }


    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info("" + metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}

如果分析每个时间单位的日志数,则可以获得所需的见解。"--------> not so good !!"

神速!


答案 2

Kafka以分布式,容错方式构建的原因是处理与您的问题完全相同的问题,核心组件的多次故障应该避免服务中断。要避免 Zookeeper 停机,请至少部署 3 个 Zookeeper 实例 (如果这是在 AWS 中,请跨可用区部署它们)。要避免代理失败,请部署多个代理,并确保在生产者属性中指定多个代理。若要确保 Kafka 群集已将消息写入持久庄园,请确保在生产者中设置了该属性。当所有同步副本都确认接收消息时,这将确认客户端写入(以牺牲吞吐量为代价)。您还可以设置排队限制,以确保在写入代理时开始备份,您可以捕获异常并进行处理,并可能重试。bootstrap.serversacks=all

使用Cassandra(另一个经过深思熟虑的分布式容错系统)来“暂存”您的写入似乎并没有为您的架构增加任何可靠性,但确实增加了复杂性,加上Cassandra没有被编写为消息队列的消息队列,我会避免这种情况。

配置正确后,Kafka应该可以处理所有消息写入并提供适当的保证。


推荐