KafkaAvroSerializer 用于序列化 Avro 而不使用 schema.registry.url

我是卡夫卡和阿夫罗的菜鸟。所以我一直在努力让生产者/消费者运行。到目前为止,我已经能够生成和使用简单的字节和字符串,使用以下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}

现在这一切都很好,当我试图序列化POJO时,问题就来了。因此,我能够使用Avro提供的实用程序从POJO获得AvroSchema。硬编码了架构,然后尝试创建一个通用记录,以通过KafkaProducer发送生产者现在设置为:

    Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:当我使用KafkaAvroSerializer的那一刻,生产者没有出现,因为:缺少强制性参数:schema.registry.url

我仔细阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。但是,该架构不是已经嵌入到AvroMessage中了吗?如果有人可以分享一个使用KafkaProducer与KafkaAvroSerializer的工作示例,而不必指定schema.registry.url,那就太好了。

也非常感谢有关架构注册表实用程序的任何见解/资源。

谢谢!


答案 1

首先注意:在vanilla apache kafka中不提供 - 它由Confluent Platform提供。(https://www.confluent.io/),作为其开源组件的一部分(http://docs.confluent.io/current/platform.html#confluent-schema-registryKafkaAvroSerializer)

快速回答:否,如果使用 ,您将需要一个架构注册表。在此处查看一些示例:http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.htmlKafkaAvroSerializer

架构注册表的基本思想是每个主题将引用一个avro架构(即,您只能发送彼此一致的数据。但是一个架构可以有多个版本,因此您仍然需要标识每条记录的架构)

我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!这将是浪费时间,每次读取时解析它,并浪费资源(网络,磁盘,CPU)

相反,架构注册表实例将执行绑定,然后序列化程序在从注册表中获取数据(并缓存它以供以后使用)之后,在数据之前仅写入此 ID。avro schema <-> int schemaId

因此,在kafka内部,您的记录将是(并且由于技术原因而具有魔术字节),这仅是5个字节的开销(与架构的大小进行比较),并且在读取时,您的消费者将找到与id相关的相应架构,以及有关它的解串器avro字节。您可以在汇合文档中找到更多方式[<id> <bytesavro>]

如果您确实要为每个记录编写架构,则需要另一个序列化程序(我认为编写自己的序列化程序,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除架构注册表部分以将其替换为架构,与读取相同)。但是如果你使用avro,我真的会不鼓励这样做 - 一天后,你将需要实现像avro注册表这样的东西来管理版本控制


答案 2

虽然选中的答案都是正确的,但还应该提到可以禁用架构注册

只需设置为 。auto.register.schemasfalse


推荐