我去派对超级晚了。但是我看到上面的答案中缺少一些东西:)
选择像Cassandra这样的分布式系统的策略是一个不错的主意。一旦Kafka启动并正常,您可以重试写入此中的所有消息。
我想回答“知道在给定时间有多少条消息未能发布”
从标签中,我看到您正在使用和。您可以为创建者编写自定义回调,此回调可以告诉您消息是否失败或成功发布。失败时,记录消息的元数据。apache-kafka
kafka-consumer-api
现在,您可以使用日志分析工具来分析故障。一个这样的体面工具是Splunk。
下面是一个小代码片段,可以更好地解释我正在谈论的回调:
public class ProduceToKafka {
private ProducerRecord<String, String> message = null;
// TracerBulletProducer class has producer properties
private KafkaProducer<String, String> myProducer = TracerBulletProducer
.createProducer();
public void publishMessage(String string) {
ProducerRecord<String, String> message = new ProducerRecord<>(
"topicName", string);
myProducer.send(message, new MyCallback(message.key(), message.value()));
}
class MyCallback implements Callback {
private final String key;
private final String value;
public MyCallback(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("--------> All good !!");
} else {
log.info("--------> not so good !!");
log.info(metadata.toString());
log.info("" + metadata.serializedValueSize());
log.info(exception.getMessage());
}
}
}
}
如果分析每个时间单位的日志数,则可以获得所需的见解。"--------> not so good !!"
神速!