Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?

2022-09-02 10:43:19

看看这个问题:Scala + Spark - Task not serializeable: java.io.NotSerializableExceptionon。当调用函数外部闭包时,仅在类上而不是对象上

问题:

假设我的映射器可以是函数(def),它们在内部调用其他类并创建对象并在内部执行不同的操作。(或者它们甚至可以是扩展(Foo)=> Bar并在其appliance方法中进行处理的类 - 但现在让我们忽略这种情况)

Spark 仅支持 Java 序列化以进行闭包。有什么办法可以解决这个问题吗?我们可以用一些东西而不是闭包来做我想做的事情吗?我们可以轻松地用Hadoop做这种事情。这一件事使Spark对我来说几乎无法使用。不能指望所有第三方库都让所有类都扩展可序列化!

可能的解决方案:

这样的事情似乎有什么用处:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

包装器当然似乎是答案,但我看不出具体是如何做到的。


答案 1

我想出了如何自己做到这一点!

您只需在通过闭包之前序列化对象,并在之后反序列化。即使您的类不可序列化,此方法也有效,因为它在幕后使用 Kryo。你所需要的只是一些咖喱。;)

以下是我如何做到这一点的示例:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

随意使Blah尽可能复杂,类,伴随对象,嵌套类,对多个第三方库的引用。

KryoSerializationWrapper指的是:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala


答案 2

在使用 Java API 的情况下,在传递到映射函数闭包时应避免使用匿名类。而不是做map(新函数),你需要一个类来扩展你的函数并将其传递给map(..)请参见: https://yanago.wordpress.com/2015/03/21/apache-spark/