解决 Apache Spark 中的依赖问题
构建和部署 Spark 应用程序时的常见问题包括:
-
java.lang.ClassNotFoundException
. -
object x is not a member of package y
编译错误。 java.lang.NoSuchMethodError
如何解决这些问题?
构建和部署 Spark 应用程序时的常见问题包括:
java.lang.ClassNotFoundException
.object x is not a member of package y
编译错误。java.lang.NoSuchMethodError
如何解决这些问题?
Apache Spark的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它容易受到此类问题的影响。@user7337271的答案是正确的,但还有更多问题,具体取决于您使用的集群管理器(“master”)。
首先,Spark 应用程序由这些组件组成(每个组件都是一个单独的 JVM,因此其类路径中可能包含不同的类):
SparkSession
SparkContext
这些之间的关系在Apache Spark的集群模式概述中如下图所示:
现在 - 哪些类应该驻留在这些组件中的每个组件中?
这可以通过下图来回答:
让我们慢慢解析一下:
Spark Code是Spark的库。它们应该存在于所有三个组件中,因为它们包括让Spark执行它们之间通信的胶水。顺便说一句 - Spark作者做出了一个设计决定,在所有组件中包含所有组件的代码(例如,在驱动程序中也包含只能在Executor中运行的代码)以简化这一点 - 因此Spark的“胖罐”(在1.6版本中)或“存档”(在2.0中,细节如下)包含所有组件的必要代码,并且应该在所有组件中可用。
仅驱动程序代码 这是用户代码,不包括应在执行器上使用的任何内容,即未在RDD / DataFrame / Dataset上的任何转换中使用的代码。这不一定必须与分布式用户代码分开,但它可以。
分布式代码 这是使用驱动程序代码编译的用户代码,但也必须在执行器上执行 - 实际转换使用的所有内容都必须包含在此jar中。
现在我们已经弄清楚了,我们如何在每个组件中正确加载类,它们应该遵循什么规则?
Spark Code:正如前面的答案所述,您必须在所有组件中使用相同的Scala和Spark版本。
1.1 在独立模式下,有一个“预先存在”的 Spark 安装,应用程序(驱动程序)可以连接到该安装。这意味着所有驱动程序都必须使用在主程序和执行程序上运行的相同 Spark 版本。
1.2 在YARN /Mesos中,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的版本。这意味着,如果您使用版本 X 来编译和打包驱动程序应用程序,则应在启动 SparkSession 时提供相同的版本(例如,使用 YARN 时的 via 或 参数)。您提供的 jar/归档文件应包含所有 Spark 依赖项(包括传递依赖项),并且当应用程序启动时,集群管理器会将其传送给每个执行程序。spark.yarn.archive
spark.yarn.jars
驱动程序代码:这完全取决于 - 驱动程序代码可以作为一堆jar或“胖jar”提供,只要它包含所有Spark依赖项+所有用户代码
分布式代码:除了存在于驱动程序上之外,此代码还必须交付给执行程序(同样,连同其所有可传递依赖项)。这是使用参数完成的。spark.jars
总而言之,以下是构建和部署 Spark 应用程序的建议方法(在本例中为使用 YARN):
spark.jars
SparkSession
lib/
spark.yarn.archive
在构建和部署 Spark 应用程序时,所有依赖项都需要兼容的版本。
Scala版本。所有软件包必须使用相同的主要(2.10,2.11,2.12)Scala版本。
考虑以下(不正确):build.sbt
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
我们用于 Scala 2.10,而其余的软件包用于 Scala 2.11。有效文件可以是spark-streaming
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
但最好全局指定版本并使用(这会为您附加 scala 版本):%%
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.1",
"org.apache.spark" %% "spark-streaming" % "2.0.1",
"org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
)
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
火花版所有软件包必须使用相同的主要 Spark 版本(1.6、2.0、2.1、...)。
请考虑以下(不正确的)build.sbt:
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "1.6.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
我们使用 1.6,而其余组件在 Spark 2.0 中。有效文件可以是spark-core
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.1",
"org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
)
但最好使用变量(仍然不正确):
name := "Simple Project"
version := "1.0"
val sparkVersion = "2.0.1"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
"org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
)
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark 依赖项中使用的 Spark 版本必须与 Spark 安装的 Spark 版本匹配。例如,如果在集群上使用 1.6.1,则必须使用 1.6.1 来构建 jar。次要版本不匹配并不总是被接受。
用于构建jar的Scala版本必须与用于构建部署的Spark的Scala版本相匹配。默认情况下(可下载的二进制文件和默认版本):
如果包含在胖罐中,则应在工作节点上访问其他包。有许多选项,包括:
--jars
参数 - 分发本地文件。spark-submit
jar
--packages
参数 - 从 Maven 存储库获取依赖项。spark-submit
在群集节点中提交时,应在 中包含应用程序。jar
--jars