处理序列化框架的不兼容版本更改问题描述问题简述相关事实概述可能(和不可能)的方法

2022-09-03 07:45:53

问题描述

我们有一个Hadoop集群,我们在其上存储数据,这些数据使用Kryo(一个序列化框架)序列化为字节。我们用来执行此操作的Kryo版本已从官方版本2.21中分叉出来,以将我们自己的补丁应用于我们使用Kryo遇到的问题。当前的Kryo版本2.22也修复了这些问题,但解决方案不同。因此,我们不能仅仅更改我们使用的Kryo版本,因为这意味着我们将不再能够读取已经存储在Hadoop集群上的数据。为了解决这个问题,我们想要运行一个Hadoop作业,它

  1. 读取存储的数据
  2. 反序列化与旧版本的 Kryo 一起存储的数据
  3. 使用新版本的 Kryo 序列化还原的对象
  4. 将新的序列化表示形式写回我们的数据存储

问题在于,在一个Java程序中使用同一类的两个不同版本并非易事(更确切地说,在Hadoop作业的映射器类中)。

问题简述

如何在一个 Hadoop 作业中反序列化和序列化具有同一序列化框架的两个不同版本的对象?

相关事实概述

  • 我们已将数据存储在Hadoop CDH4集群上,并使用Kryo版本2.21.2-ourpatch分支进行序列化
  • 我们希望使用 Kryo 版本 2.22 序列化数据,这与我们的版本不兼容
  • 我们使用Apache Maven构建Hadoop作业JAR

可能(和不可能)的方法

(1) 重命名软件包

我们想到的第一种方法是使用Maven Shade插件的重定位功能重命名我们自己的Kryo分支中的软件包,并使用不同的工件ID发布它,以便我们可以在转换作业项目中依赖这两个工件。然后,我们将实例化旧版本和新版本的一个 Kryo 对象,并使用旧对象进行反序列化,并使用新对象再次序列化该对象。

问题
我们没有在Hadoop作业中明确使用Kryo,而是通过我们自己的库的多层访问它。对于这些库中的每一个,有必要

  1. 重命名涉及的包和
  2. 创建具有不同组或项目 ID 的发布

为了使事情变得更加混乱,我们还使用其他第三方库提供的Kryo序列化程序,为此我们必须做同样的事情。


(2) 使用多个类装入器

我们提出的第二种方法是在包含转换作业的Maven项目中完全不依赖Kryo,而是从JAR加载每个版本的所需类,这些类存储在Hadoop的分布式缓存中。然后,序列化对象将如下所示:

public byte[] serialize(Object foo, JarClassLoader cl) {
    final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
    Object k = kryoClass.getConstructor().newInstance();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");

    Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
    Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
    writeObject.invoke(k, output, foo);
    outputClass.getMethod("close").invoke(output);
    baos.close();
    byte[] bytes = baos.toByteArray();
    return bytes;
}

问题
虽然这种方法可能适用于实例化未配置的 Kryo 对象并序列化/还原某些对象,但我们使用的 Kryo 配置要复杂得多。这包括多个自定义序列化程序、注册的类 ID 等。例如,我们无法找到一种方法来设置类的自定义序列化程序而不获取NoClassDefFoundError - 以下代码不起作用:

Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError

最后一行抛出一个

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

因为该类引用了 Kryo 的类,并尝试使用自己的类装入器(即 System 类装入器)装入它,而该类不知道该类。URISerializerSerializerSerializer


(3) 使用中间序列化

目前最有前途的方法似乎是使用独立的中间序列化,例如使用Gson或类似方式的JSON,然后运行两个单独的作业:

  1. kryo:2.21.2-我们的常规商店中的我们的补丁分支 ->临时商店中的JSON
  2. JSON 在临时商店 -> kryo:2-22 在我们的常规商店


问题 此解决方案的最大问题是,它大约使所处理数据的空间消耗增加了一倍。此外,我们需要另一种序列化方法,该方法可以在所有数据上没有问题,我们需要首先对其进行调查。


答案 1

我会使用多类装入器方法。

(包重命名也将起作用。它看起来确实很丑陋,但这是一次性的黑客攻击,所以美丽和正确可以退居二线。中间序列化似乎有风险 - 使用Kryo是有原因的,并且使用不同的中间形式可以否定该原因)。

整体设计将是:

child classloaders:      Old Kryo     New Kryo   <-- both with simple wrappers
                                \       /
                                 \     /
                                  \   /
                                   \ /
                                    |
default classloader:    domain model; controller for the re-serialization
  1. 在缺省类装入器中装入域对象类
  2. 加载一个包含修改后的 Kryo 版本和包装器代码的 Jar。包装器有一个静态的“main”方法,其中包含一个参数:要反序列化的文件的名称。通过从默认类装入器的反射调用 main 方法:

        Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
        Method mainIn = deserializer.getMethod("main", String.class);
        Object graph = mainIn.invoke(null, "/path/to/input/file");
    
    1. 此方法:
      1. 将文件反序列化为一个对象图
      2. 将对象置于共享空间中。ThreadLocal 是一种简单的方法,或者将其返回到包装器脚本。
  3. 当调用返回时,使用简单的包装器加载具有新序列化框架的第二个 Jar。包装器具有一个静态的“main”方法和一个参数,用于传递要序列化的文件的名称。通过从默认类装入器的反射调用 main 方法:

        Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
        Method mainOut = deserializer.getMethod("main", Object.class, String.class);
        mainOut.invoke(null, graph, "/path/to/output/file");
    
    1. 此方法
      1. 从 ThreadLocal 中检索对象
      2. 序列化对象并将其写入文件

考虑

在代码片段中,为每个对象序列化和反序列化创建一个类装入器。您可能只想加载类加载器一次,发现主要方法并循环访问文件,如下所示:

for (String file: files) {
    Object graph = mainIn.invoke(null, file + ".in");
    mainOut.invoke(null, graph, file + ".out");
}

域对象是否对任何 Kryo 类有任何引用?如果是这样,您有困难:

  1. 如果引用只是一个类引用,例如调用一个方法,那么第一次使用该类会将两个Kryo版本中的一个加载到默认的类加载器中。这可能会导致问题,因为序列化或反序列化的一部分可能由错误的Kryo版本执行
  2. 如果引用用于实例化任何 Kryo 对象并将引用存储在域模型(类或实例成员)中,则 Kryo 实际上将在模型中序列化自身的一部分。这可能是这种方法的交易破坏者。

在任何一种情况下,您的第一种方法都应该是检查这些引用并消除它们。确保已执行此操作的一种方法是确保默认类装入器无法访问任何 Kryo 版本。如果域对象以任何方式引用 Kryo,则引用将失败(如果直接引用类,则使用 ClassNotFoundError,如果使用反射,则使用 ClassNotFoundException)。


答案 2

对于 2,可以创建两个 jar 文件,其中包含序列化程序以及序列化程序的新版本和旧版本的所有依赖项,如下所示。然后创建一个 map reduce 作业,该作业将代码的每个版本加载到单独的类加载器中,并在中间添加一些胶水代码,这些代码使用旧代码进行反序列化,然后使用新代码进行序列化。

您必须小心,您的域对象与胶水代码加载到相同的类装入器中,并且序列化/反序列化的代码依赖于与胶水代码相同的类装入器,以便它们都看到相同的域对象类。