编写自定义 Kafka 序列化程序

2022-09-01 21:54:18

我在Kafka消息中使用我自己的类,该消息具有一堆字符串数据类型。

因此,我无法使用默认序列化程序类或 Kafka 库附带的序列化程序类。StringSerializer

我想我需要编写自己的序列化程序并将其馈送到生产者属性?


答案 1

编辑

在较新的 Kafka 客户端中,实现而不是 .SerializerEncoder


编写自定义序列化程序所需的内容包括:

  1. 使用为泛型指定的对象实现Encoder
    • 需要提供构造函数VerifiableProperties
  2. 重写确保返回字节数组的方法toBytes(...)
  3. 将序列化程序类注入到ProducerConfig

为创建器声明自定义序列化程序

正如您在问题中所指出的,Kafka 提供了一种为生产者声明特定序列化程序的方法。序列化程序类在实例中设置,该实例用于构造所需的类。ProducerConfigProducer

如果你遵循 Kafka 的 Producer 示例,你将通过一个对象进行构造。构建属性文件时,请确保包含:ProducerConfigProperties

props.put("serializer.class", "path.to.your.CustomSerializer");

使用类的路径,您希望 Kafka 在将消息追加到日志之前使用该类序列化消息。

创建 Kafka 理解的自定义序列化程序

编写 Kafka 可以正确解释的自定义序列化程序需要实现 Kafka 提供的 scala 类。在java中实现特征很奇怪,但以下方法适用于在我的项目中序列化JSON:Encoder[T]

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

您的问题听起来像是您在为附加到日志中的所有消息使用一个对象(让我们称之为)。如果是这种情况,您的序列化程序可能看起来更像这样:CustomMessage

package com.project.serializer;
    
public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

这将使您的属性配置看起来像这样:

props.put("serializer.class", "path.to.your.CustomSerializer");

答案 2

您需要同时实现编码和解码器

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }

解码器代码

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}

pom 条目

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

在 Kafka 属性中设置默认编码器

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

编写器和读取器代码如下所示

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());