小组协调员不可用- 卡夫卡
当我给kafka写一个主题时,有一个错误::Offset commit failed
2016-10-29 14:52:56.387 INFO [nioEventLoopGroup-3-1][org.apache.kafka.common.utils.AppInfoParser$AppInfo:82] - Kafka version : 0.9.0.1
2016-10-29 14:52:56.387 INFO [nioEventLoopGroup-3-1][org.apache.kafka.common.utils.AppInfoParser$AppInfo:83] - Kafka commitId : 23c69d62a0cabf06
2016-10-29 14:52:56.409 ERROR [nioEventLoopGroup-3-1][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$DefaultOffsetCommitCallback:489] - Offset commit failed.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
2016-10-29 14:52:56.519 WARN [kafka-producer-network-thread | producer-1][org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater:582] - Error while fetching metadata with correlation id 0 : {0085000=LEADER_NOT_AVAILABLE}
2016-10-29 14:52:56.612 WARN [pool-6-thread-1][org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater:582] - Error while fetching metadata with correlation id 1 : {0085000=LEADER_NOT_AVAILABLE}
使用命令创建新主题时,没关系。
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
这是使用Java的生产者代码:
public void create() {
Properties props = new Properties();
props.clear();
String producerServer = PropertyReadHelper.properties.getProperty("kafka.producer.bootstrap.servers");
String zookeeperConnect = PropertyReadHelper.properties.getProperty("kafka.producer.zookeeper.connect");
String metaBrokerList = PropertyReadHelper.properties.getProperty("kafka.metadata.broker.list");
props.put("bootstrap.servers", producerServer);
props.put("zookeeper.connect", zookeeperConnect);//声明ZooKeeper
props.put("metadata.broker.list", metaBrokerList);//声明kafka broker
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 1000);
props.put("linger.ms", 10000);
props.put("buffer.memory", 10000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
}
哪里错了?