简单的卡夫卡消费者示例不起作用
2022-09-03 05:45:57
						我有一个简单的类来使用来自kafka服务器的消息。大多数代码是从 org.apache.kafka.clients.consumer.KafkaConsumer 的评论中复制的.java。
public class Demo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.144.10:29092");
        props.put("group.id", "test");
        props.put("session.timeout.ms", "1000");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "10000");
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
        consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST");
        boolean isRunning = true;
        while (isRunning) {
            Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
            process(records);
        }
        consumer.close();
    }
    private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) {
        Map<TopicPartition, Long> processedOffsets = new HashMap<>();
        for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
            for (int i = 0; i < recordsPerTopic.size(); i++) {
                ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
                // process record
                try {
                    processedOffsets.put(record.topicAndPartition(), record.offset());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return processedOffsets;
    }
}
我正在使用'org.apache.kafka:kafka-clients:0.8.2.0'。它引发异常
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
    at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400)
    at kafka.integration.Demo.main(Demo.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
我应该如何配置 key.deserializer?
 
					 
				 
				    		 
				    		 
				    		 
				    		