如何在Avro中从GenericRecord转换为DepecificRecord以获得兼容的模式

2022-09-04 21:08:18

Avro SpecificRecord(即生成的java类)是否与模式演进兼容?也就是说,如果我有Avro消息的来源(在我的情况下,kafka),并且我想将这些消息反序列化为特定的记录,是否可以安全地进行?

我看到的:

  • 将字段添加到架构的末尾工作正常 - 可以将 ok 反序列化为特定记录
  • 在中间添加字段不会 - 即破坏现有客户端

即使消息兼容,这也是一个问题。

如果我能找到新的模式(例如使用融合模式注册表),我可以反序列化为GenericRecord,但似乎没有办法从generalrecord映射到不同模式的特定记录。

MySpecificType message = (T SpecificData.get().deepCopy(MySpecificType.SCHEMA$, genericMessage);

Deepcopy在很多地方都提到过,但它使用索引,所以不起作用。

当您同时拥有两个架构并且它们兼容时,是否有任何安全的方法可以在两个 avro 对象之间进行映射?即使我可以从genercrecord映射到generalrecord,这也将像我那样做deepcopy技巧来完成这项工作。


答案 1

此处提供了针对特定数据类型转换的示例测试。它全部在配置“特定串行器Props”中

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java

我添加了以下配置,并根据需要获得了特定类型。

HashMap<String, String> specificDeserializerProps = new HashMap<String, String>();
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserializerProps);

希望有所帮助


答案 2

默认情况下,KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG设置为 false,因此 KafkaAvroDeserializer 将默认生成 GenericData$Record,而不是所需的 Object(avro 生成的类)。

正如@JARC所说,您可以通过编程方式启用它。

如果您在Spring引导项目中使用它,请按以下方式设置:

spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true

推荐