编辑 - 在较新版本的Kafka中不需要Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur对API版本0.11.0+的回答。
原始答案
我修复了它..经过长时间的研究..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
从上面的代码中,ZkClient将创建一个主题,但这个主题信息不会意识到kafka。因此,我们要做的是,我们需要通过以下方式为ZkClient创建对象,
首先导入以下语句,
import kafka.utils.ZKStringSerializer$;
并通过以下方式为ZkClient创建对象,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
编辑1:(用于@ajkret评论)
由于 api 已更改,因此上述代码不适用于 kafka > 0.9,因此将以下代码用于 kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}