如何运行 Spark Java 程序

2022-09-01 00:02:14

我为Spark编写了一个Java程序。但是如何从Unix命令行运行和编译它。我是否必须在编译以运行时包含任何jar


答案 1

结合官方快速入门指南中的步骤和在YARN上启动Spark,我们得到:

我们将创建一个非常简单的 Spark 应用程序 SimpleApp.java:

/*** SimpleApp.java ***/
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
      "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

该程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将$YOUR_SPARK_HOME 替换为 Spark 的安装位置。与Scala示例一样,我们初始化了一个SparkContext,尽管我们使用特殊的JavaSparkContext类来获得Java友好的类。我们还创建RDD(由JavaRDD表示)并对其运行转换。最后,我们通过创建扩展 spark.api.function 的类将函数传递给 spark.java。Java 编程指南更详细地描述了这些差异。

为了构建程序,我们还编写了一个 Maven pom.xml 文件,该文件将 Spark 列为依赖项。请注意,Spark 工件使用 Scala 版本进行标记。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
    <repository>
      <id>Akka repository</id>
      <url>http://repo.akka.io/releases</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>0.9.0-incubating</version>
    </dependency>
  </dependencies>
</project>

如果您还希望从Hadoop的HDFS中读取数据,则还需要为您的HDFS版本添加对hadoop客户端的依赖关系:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>...</version>
</dependency>

我们根据规范的 Maven 目录结构对这些文件进行布局:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用 Maven 执行应用程序:

$ mvn package
$ mvn exec:java -Dexec.mainClass="SimpleApp"
...
Lines with a: 46, Lines with b: 23

然后按照在YARN上启动Spark中的步骤操作

构建支持 YARN 的装配 JAR

我们需要一个整合的 Spark JAR(它捆绑了所有必需的依赖项)来在 YARN 集群上运行 Spark 作业。这可以通过设置Hadoop版本和SPARK_YARN环境变量来构建,如下所示:

SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

组装好的 JAR 将如下所示:./assembly/target/scala-2.10/spark-assembly_0.9.0-incubating-hadoop2.0.5.jar。

生成过程现在还支持新的 YARN 版本 (2.2.x)。见下文。

准备

  • 构建支持 YARN 的程序集(见上文)。
  • 组装好的罐子可以安装到HDFS中或在本地使用。
  • 必须将应用程序代码打包到单独的 JAR 文件中。

如果要测试 YARN 部署模式,可以使用当前的 Spark 示例。通过运行以下命令,可以生成火花examples_2.10-0.9.0 孵化文件:

sbt/sbt assembly 

注意:由于您正在阅读的文档是针对 Spark 版本 0.9.0-孵化的,因此我们在这里假设您已经下载了 Spark 0.9.0-孵化或将其从源代码管理中签出。如果您使用的是不同版本的 Spark,则 sbt 包命令生成的 jar 中的版本号显然会有所不同。

配置

YARN上的Spark的大多数配置与其他部署相同。有关这些内容的详细信息,请参阅配置页面。这些是特定于 YARN 上的 SPARK 的配置。

环境变量:

  • SPARK_YARN_USER_ENV,将环境变量添加到 YARN 上启动的 Spark 进程中。这可以是以逗号分隔的环境变量列表,例如
SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"

系统属性:

  • spark.yarn.applicationMaster.waitTries,该属性用于设置 ApplicationMaster 等待 spark master 的次数,然后设置它等待 Spark Context 初始化的尝试次数。默认值为 10。
  • spark.yarn.submit.file.replication,上传到应用程序 HDFS 的文件的 HDFS 复制级别。这些包括火花罐,应用程序罐和任何分布式缓存文件/档案之类的东西。
  • spark.yarn.preserve.staging.files,设置为 true 以在作业结束时保留暂存文件(spark jar、app jar、分布式缓存文件),而不是删除它们。
  • spark.yarn.scheduler.heartbeat.interval-ms,Spark 应用程序主检测信号进入 YARN ResourceManager 的间隔(以毫秒为单位)。默认值为 5 秒。
  • spark.yarn.max.worker.failures,应用程序失败之前的最大工作线程失败数。默认值为请求的工作人员数乘以 2,最少为 3。

在 YARN 上推出火花

确保HADOOP_CONF_DIRYARN_CONF_DIR指向包含 hadoop 群集的(客户端)配置文件的目录。这将用于连接到群集、写入 dfs 以及将作业提交到资源管理器。

有两种调度程序模式可用于在 YARN 上启动 spark 应用程序。

通过 YARN 客户端以纱线独立模式启动火花应用程序。

启动 YARN 客户端的命令如下:

SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
  --jar <YOUR_APP_JAR_FILE> \
  --class <APP_MAIN_CLASS> \
  --args <APP_MAIN_ARGUMENTS> \
  --num-workers <NUMBER_OF_WORKER_MACHINES> \
  --master-class <ApplicationMaster_CLASS>
  --master-memory <MEMORY_FOR_MASTER> \
  --worker-memory <MEMORY_PER_WORKER> \
  --worker-cores <CORES_PER_WORKER> \
  --name <application_name> \
  --queue <queue_name> \
  --addJars <any_local_files_used_in_SparkContext.addJar> \
  --files <files_for_distributed_cache> \
  --archives <archives_for_distributed_cache>

例如:

# Build the Spark assembly JAR and the Spark examples JAR
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

# Configure logging
$ cp conf/log4j.properties.template conf/log4j.properties

# Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
$ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.5-alpha.jar \
    ./bin/spark-class org.apache.spark.deploy.yarn.Client \
      --jar examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar \
      --class org.apache.spark.examples.SparkPi \
      --args yarn-standalone \
      --num-workers 3 \
      --master-memory 4g \
      --worker-memory 2g \
      --worker-cores 1

# Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
# (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794

上面启动一个 YARN 客户端程序,该程序启动默认的应用程序主机。然后,SparkPi 将作为应用程序主机的子线程运行,YARN 客户端将定期轮询应用程序主机以获取状态更新,并在控制台中显示它们。应用程序运行完成后,客户端将退出。

使用此模式时,应用程序实际上在运行应用程序主机的远程计算机上运行。因此,涉及局部交互的应用程序将无法正常工作,例如火花壳。


答案 2

几天前我有同样的问题,昨天设法解决了它。
这就是我所做的:

  1. 下载 sbt 并解压缩并解压缩它 :http://www.scala-sbt.org/download.html
  2. 我已经下载了Hadoop 2的Spark Prebuild软件包,解压缩并解压缩了它:http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz
  3. 我创建了独立的应用程序SimpleApp.scala,如中所述:使用适当的simple.sbt文件(刚刚从描述中复制)和正确的目录布局 http://spark.apache.org/docs/latest/quick-start.html#standalone-applications
  4. 确保你的 PATH 中有 sbt。转到应用程序所在的目录,并使用sbt package
  5. 启动 Spark Server 使用SPARK_HOME_DIR/sbin/spark_master.sh
  6. 转到并确保您的服务器正在运行。从 URL 复制链接(从服务器描述,而不是从本地主机。它应该是端口7077或类似的东西)localhost:8080
  7. 使用 IP:PORT 是 6 中复制的 URL 启动工作线程SPARK_HOME_DIR/bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
  8. 将应用程序部署到服务器:SPARK_HOME_DIR/bin/spark-submit --class "SimpleApp" --master URL target/scala-2.10/simple-project_2.10-1.0.jar

这对我有用,希望能帮助你。
帕维尔


推荐