KafkaAvroSerializer 用于序列化 Avro 而不使用 schema.registry.url
我是卡夫卡和阿夫罗的菜鸟。所以我一直在努力让生产者/消费者运行。到目前为止,我已经能够生成和使用简单的字节和字符串,使用以下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
现在这一切都很好,当我试图序列化POJO时,问题就来了。因此,我能够使用Avro提供的实用程序从POJO获得AvroSchema。硬编码了架构,然后尝试创建一个通用记录,以通过KafkaProducer发送生产者现在设置为:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
这就是问题所在:当我使用KafkaAvroSerializer的那一刻,生产者没有出现,因为:缺少强制性参数:schema.registry.url
我仔细阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。但是,该架构不是已经嵌入到AvroMessage中了吗?如果有人可以分享一个使用KafkaProducer与KafkaAvroSerializer的工作示例,而不必指定schema.registry.url,那就太好了。
也非常感谢有关架构注册表实用程序的任何见解/资源。
谢谢!