火花启动器无限等待作业完成

我正在尝试从Java代码将带有Spark作业的JAR提交到YARN集群中。我正在使用SparkLauncher提交SparkPi示例:

Process spark = new SparkLauncher()
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi")
    .setMaster("yarn-cluster")
    .launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

有两个问题:

  1. 在“yarn-cluster”模式下提交时,应用程序成功提交到 YARN 并成功执行(它在 YARN UI 中可见,报告为 SUCCESS,pi 打印在输出中)。但是,提交申请的申请永远不会收到处理已完成的通知 - 在打印“等待完成...”后,它会无限期挂起。容器的日志可以在这里找到
  2. 在“yarn-client”模式下提交时,应用程序不会出现在 YARN UI 中,并且提交的应用程序挂起在“正在等待完成...”当挂起代码被终止时,应用程序将显示在 YARN UI 中,并报告为 SUCCESS,但输出为空(pi 未打印出来)。容器的日志可以在这里找到

我试图使用Oracle Java 7和8执行提交应用程序。


答案 1

我在Spark邮件列表中得到了帮助。关键是读取/清除 getInputStream 并在进程上获取ErrorStream()。子进程可能会填满缓冲区并导致死锁 - 请参阅有关进程的 Oracle 文档。应在单独的线程中读取流:

Process spark = new SparkLauncher()
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6")
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();

System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

其中 InputStreamReaderRunnable 类是:

public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

答案 2

由于这是一篇旧帖子,我想添加一个更新,这可能有助于任何阅读这篇文章的人。在 spark 1.6.0 中,SparkLauncher 类中添加了一些函数。即:

def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher

您可以运行应用程序,而无需为stdout和stderr处理添加其他线程,并且有一个很好的应用程序运行状态报告。使用以下代码:

  val env = Map(
      "HADOOP_CONF_DIR" -> hadoopConfDir,
      "YARN_CONF_DIR" -> yarnConfDir
    )
  val handler = new SparkLauncher(env.asJava)
      .setSparkHome(sparkHome)
      .setAppResource("Jar/location/.jar")
      .setMainClass("path.to.the.main.class")
      .setMaster("yarn-client")
      .setConf("spark.app.id", "AppID if you have one")
      .setConf("spark.driver.memory", "8g")
      .setConf("spark.akka.frameSize", "200")
      .setConf("spark.executor.memory", "2g")
      .setConf("spark.executor.instances", "32")
      .setConf("spark.executor.cores", "32")
      .setConf("spark.default.parallelism", "100")
      .setConf("spark.driver.allowMultipleContexts","true")
      .setVerbose(true)
      .startApplication()
println(handle.getAppId)
println(handle.getState)

如果 spark 应用程序成功,您可以继续查询状态。有关 Spark 启动器服务器在 1.6.0 中如何工作的信息。看到这个链接: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java


推荐