如何使用单个 spark 上下文在 Apache Spark 中运行并发作业(操作)

2022-09-02 04:11:06


它在Apache Spark文档中说:“在每个Spark应用程序中,如果它们由不同的线程提交,则多个”作业“(Spark操作)可能会同时运行”。有人可以解释如何为以下示例代码实现此并发性吗?

    SparkConf conf = new SparkConf().setAppName("Simple_App");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
    JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");

    System.out.println(file1.count());
    System.out.println(file2.count());

这两个作业是独立的,必须同时运行。
谢谢。


答案 1

试试下面这样:

    final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // Start thread 1
    Future<Long> future1 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
            return file1.count();
        }
    });
    // Start thread 2
    Future<Long> future2 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
            return file2.count();
        }
    });
    // Wait thread 1
    System.out.println("File1:"+future1.get());
    // Wait thread 2
    System.out.println("File2:"+future2.get());

答案 2

使用 scala 并行集合功能

Range(0,10).par.foreach {
  project_id => 
      {
        spark.table("store_sales").selectExpr(project_id+" as project_id", "count(*) as cnt")
        .write
        .saveAsTable(s"counts_$project_id")
    }
}

PS. 上面将启动多达 10 个并行 Spark 作业,但根据 Spark 驱动程序上可用内核的数量,它可能会更少。GQ使用期货的上述方法在这方面更加灵活。


推荐