解决 Apache Spark 中的依赖问题

构建和部署 Spark 应用程序时的常见问题包括:

  • java.lang.ClassNotFoundException.
  • object x is not a member of package y编译错误。
  • java.lang.NoSuchMethodError

如何解决这些问题?


答案 1

Apache Spark的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它容易受到此类问题的影响。@user7337271的答案是正确的,但还有更多问题,具体取决于您使用的集群管理器(“master”)。

首先,Spark 应用程序由这些组件组成(每个组件都是一个单独的 JVM,因此其类路径中可能包含不同的类):

  1. 驱动程序:这是您的应用程序创建(或)并连接到集群管理器以执行实际工作SparkSessionSparkContext
  2. 集群管理器:作为集群的“入口点”,负责为每个应用程序分配执行器。Spark中支持几种不同的类型:独立,YARN和Mesos,我们将在下面描述它们。
  3. 执行器:这些是群集节点上的进程,执行实际工作(运行 Spark 任务))

这些之间的关系在Apache Spark的集群模式概述中如下图所示:

Cluster Mode Overview

现在 - 哪些类应该驻留在这些组件中的每个组件中?

这可以通过下图来回答:

Class placement overview

让我们慢慢解析一下:

  1. Spark Code是Spark的库。它们应该存在于所有三个组件中,因为它们包括让Spark执行它们之间通信的胶水。顺便说一句 - Spark作者做出了一个设计决定,在所有组件中包含所有组件的代码(例如,在驱动程序中也包含只能在Executor中运行的代码)以简化这一点 - 因此Spark的“胖罐”(在1.6版本中)或“存档”(在2.0中,细节如下)包含所有组件的必要代码,并且应该在所有组件中可用。

  2. 仅驱动程序代码 这是用户代码,不包括应在执行器上使用的任何内容,即未在RDD / DataFrame / Dataset上的任何转换中使用的代码。这不一定必须与分布式用户代码分开,但它可以。

  3. 分布式代码 这是使用驱动程序代码编译的用户代码,但也必须在执行器上执行 - 实际转换使用的所有内容都必须包含在此jar中。

现在我们已经弄清楚了,我们如何在每个组件中正确加载类,它们应该遵循什么规则?

  1. Spark Code:正如前面的答案所述,您必须在所有组件中使用相同的ScalaSpark版本。

    1.1 在独立模式下,有一个“预先存在”的 Spark 安装,应用程序(驱动程序)可以连接到该安装。这意味着所有驱动程序都必须使用在主程序和执行程序上运行的相同 Spark 版本。

    1.2 在YARN /Mesos中,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的版本。这意味着,如果您使用版本 X 来编译和打包驱动程序应用程序,则应在启动 SparkSession 时提供相同的版本(例如,使用 YARN 时的 via 或 参数)。您提供的 jar/归档文件应包含所有 Spark 依赖项(包括传递依赖项),并且当应用程序启动时,集群管理器会将其传送给每个执行程序。spark.yarn.archivespark.yarn.jars

  2. 驱动程序代码:这完全取决于 - 驱动程序代码可以作为一堆jar或“胖jar”提供,只要它包含所有Spark依赖项+所有用户代码

  3. 分布式代码:除了存在于驱动程序上之外,此代码还必须交付给执行程序(同样,连同其所有可传递依赖项)。这是使用参数完成的。spark.jars

总而言之,以下是构建和部署 Spark 应用程序的建议方法(在本例中为使用 YARN):

  • 使用分布式代码创建一个库,将其打包为“常规”jar(带有描述其依赖项的.pom文件)和“胖jar”(包括其所有传递依赖项)。
  • 创建一个驱动程序应用程序,在分布式代码库和 Apache Spark(具有特定版本)上具有编译依赖项
  • 将驱动程序应用程序打包到要部署到驱动程序的胖罐中
  • 在启动时传递正确版本的分布式代码作为参数的值spark.jarsSparkSession
  • 将包含已下载的 Spark 二进制文件文件夹下所有 jar 的存档文件(例如 gzip)的位置作为lib/spark.yarn.archive

答案 2

在构建和部署 Spark 应用程序时,所有依赖项都需要兼容的版本。

  • Scala版本。所有软件包必须使用相同的主要(2.10,2.11,2.12)Scala版本。

    考虑以下(不正确):build.sbt

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们用于 Scala 2.10,而其余的软件包用于 Scala 2.11。有效文件可以是spark-streaming

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好全局指定版本并使用(这会为您附加 scala 版本):%%

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    

同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
  • 火花版所有软件包必须使用相同的主要 Spark 版本(1.6、2.0、2.1、...)。

    请考虑以下(不正确的)build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    我们使用 1.6,而其余组件在 Spark 2.0 中。有效文件可以是spark-core

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    但最好使用变量(仍然不正确):

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    

同样在Maven中:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
  • Spark 依赖项中使用的 Spark 版本必须与 Spark 安装的 Spark 版本匹配。例如,如果在集群上使用 1.6.1,则必须使用 1.6.1 来构建 jar。次要版本不匹配并不总是被接受。

  • 用于构建jar的Scala版本必须与用于构建部署的Spark的Scala版本相匹配。默认情况下(可下载的二进制文件和默认版本):

    • 火花 1.x -> 斯卡拉 2.10
    • Spark 2.x -> Scala 2.11
  • 如果包含在胖罐中,则应在工作节点上访问其他包。有许多选项,包括:

    • --jars参数 - 分发本地文件。spark-submitjar
    • --packages参数 - 从 Maven 存储库获取依赖项。spark-submit

    在群集节点中提交时,应在 中包含应用程序。jar--jars


推荐