如何检查Kafka服务器是否正在运行?

2022-08-31 15:26:03

我想在开始生产和消费作业之前确保kafka服务器是否正在运行。它是在Windows环境中,这是我的kafka服务器在eclipse中的代码...

Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("port", "9092");
properties.setProperty("log.dirs", "D://workspace//");
properties.setProperty("zookeeper.connect", "localhost:2181"); 

Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(properties);       
KafkaServer kafka = new KafkaServer(config, new CurrentTime(), option);
kafka.startup();

在这种情况下是不够的,因为它总是正确的。那么有没有办法知道我的kafka服务器正在运行并准备好制作者。我有必要对此进行检查,因为它会导致一些起始数据包的丢失。if (kafka != null)


答案 1

必须为所有 Kafka 代理分配一个 .启动时,代理将在 Zookeeper 中创建一个路径为 的临时节点。由于节点是短暂的,因此一旦代理断开连接,它就会被删除,例如通过关闭。broker.id/broker/ids/$id

您可以查看临时代理节点的列表,如下所示:

echo dump | nc localhost 2181 | grep brokers

ZooKeeper客户端界面公开了许多命令; 列出群集的所有会话和临时节点。dump

请注意,上述假设是:

  • 您正在 上的默认端口 () 上运行 ZooKeeper,这是群集的领导者2181localhostlocalhost
  • 您的 Kafka 配置不会为您的 Kafka 集群指定 chroot env,即它只是而不是zookeeper.connecthost:porthost:port/path

答案 2

我使用了 AdminClient api。

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}

推荐