运行 apache spark 作业时任务不可序列化异常

2022-09-02 03:12:18

下面的java程序是为尝试apache spark而编写的。

该程序尝试从相应的文件中读取正词和负字的列表,将其与主文件进行比较并相应地过滤结果。

import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp implements Serializable{
  public static void main(String[] args) {
    String logFile = "/tmp/master.txt"; // Should be some file on your system
    String positive = "/tmp/positive.txt"; // Should be some file on your system
    String negative = "/tmp/negative.txt"; // Should be some file on your system

    JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});

    JavaRDD<String> positiveComments = sc.textFile(logFile).cache();

    List<String> positiveList = GetSentiments(positive);
    List<String> negativeList= GetSentiments(negative);

    final Iterator<String> iterator = positiveList.iterator();
    int i = 0;
    while (iterator.hasNext())
    {
      JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

     numAs.saveAsTextFile("/tmp/output/"+ i);
     i++;
     }

  }

public static List<String> GetSentiments(String fileName) {
  List<String> input = new ArrayList<String>();
try
{
  Scanner sc = new Scanner(new File(fileName));

  while (sc.hasNextLine()) {
      input.add(sc.nextLine());
  }
}
catch (FileNotFoundException e){
    // do stuff here..
}
  return input;
}

}

执行 spark 作业时引发以下错误,

 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
    at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
    at SimpleApp.main(SimpleApp.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

任何指针??


答案 1

一些Java事实

  1. 在外部类内部定义的任何匿名类都具有对外部类的引用。
  2. 如果需要序列化匿名类,它将强制您使外部类序列化。
  3. 在 lambda 函数内部,如果使用封闭类的方法,则该类需要被序列化,如果 lambda 函数正在被序列化。

关于火花的一些事实。

  1. 在同一执行器上,多个任务可以在同一 JVM 中同时运行,因为任务在 spark 中作为线程生成。
  2. 与 spark 转换函数一起使用的任何 lambda、匿名类(map、mapPartitions、keyBy、redudeByKey ...)都将在驱动程序上实例化、序列化并发送给执行器。
  3. 序列化对象意味着将其状态转换为字节流,以便可以将字节流还原回对象的副本。
  4. 如果 Java 对象的类或其任何超类实现了 java.io.Serializable 接口或其子接口 java.io.Externalizable,并且其所有非瞬态、非静态字段都是可序列化的,则该对象是可序列化的。

避免序列化问题的经验法则:

  1. 避免使用匿名类,而是使用静态类,因为匿名类将强制你序列化外部类。
  2. 避免使用静态变量作为序列化问题的解决方法,因为多任务可以在同一个JVM内运行,而静态实例可能不是线程安全的。
  3. 使用瞬态变量来避免序列化问题,您必须在函数调用而不是构造函数中初始化它们。与在驱动程序上一样,构造函数将被调用,在执行器上它将反序列化并为对象。唯一的初始化方法是在函数调用中。
  4. 使用静态类代替匿名类。
  5. 虔诚地遵循“附加实现可序列化”,仅用于只需要序列化的类
  6. 在“lambda 函数”中从不直接引用 outclass 方法,因为这会导致 outer 类的序列化。
  7. 如果方法需要直接在 Lambda 函数中使用,则使方法保持静态,否则使用 Class::func() 概念,但不直接使用 func()
  8. Java Map<>没有实现Serializable,但HashMap实现了。
  9. 在决定使用Braodcast与Raw DataStructures时要明智。如果您看到真正的好处,那么只使用广播。

要深入了解,请遵循 http://bytepadding.com/big-data/spark/understanding-spark-serialization/


答案 2

创建匿名类时,编译器会执行一些操作:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

它将被重写为:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        private Iterator<...> $iterator;
        public Boolean call(String s)
        {
          return s.contains($iterator.next());
        }
      });

这就是为什么你可以有,因为迭代器是不可序列化的。NotSerializableException

为避免这种情况,只需提取下一步的结果:

String value = iterator.next();
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(value);
        }
      });

推荐