了解 Spark 的闭包及其序列化
免责声明:刚刚开始玩Spark。
我在理解著名的“任务不可序列化”异常时遇到了麻烦,但我的问题与我在SO上看到的问题略有不同(或者我认为是这样)。
我有一个很小的自定义RDD()。它有一个字段,用于存储其类未实现可序列化 () 的对象。我已将“spark.serializer”配置选项设置为使用 Kryo。但是,当我尝试RDD时,我得到以下结果:TestRDDNonSerializablecount()
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
当我向内看时,我看到它在我的RDD上使用它的闭包序列化程序,这是Java序列化程序,而不是我所期望的Kryo序列化程序。我读过Kryo在序列化闭包方面有问题,Spark总是使用Java序列化程序进行闭包,但我完全不明白闭包是如何在这里发挥作用的。我在这里所做的就是这个:DAGScheduler.submitMissingTasks
SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
也就是说,没有映射器或任何需要序列化闭包的东西。OTOH这个工作:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
Kryo 序列化程序按预期使用,不涉及闭包序列化程序。如果我没有将序列化程序属性设置为 Kryo,我也会在这里得到一个例外。
我很欣赏任何解释闭包来自哪里以及如何确保我可以使用Kryo序列化自定义RDD的指针。
更新:这是其不可序列化的字段:TestRDDmNS
class TestRDD extends RDD<String> {
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
    NonSerializable mNS = new NonSerializable();
    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }
    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }
    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }
    static class TestPartition implements Partition {
        final int mIndex;
        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }
        public int index() {
            return mIndex;
        }
    }
}
 
					 
				 
				    		 
				    		 
				    		 
				    		