如何知道作业的哪个阶段当前在 Apache Spark 中运行?

2022-09-04 23:01:32

考虑一下我在Spark中有一份工作;

CSV 文件 ==> 按列筛选 ==> 取样本 ==> 另存为 JSON

现在我的要求是我如何知道作业的哪个步骤(提取文件过滤采样)当前正在以编程方式执行(最好使用Java API)?有什么办法吗?

我可以使用SparkListener类跟踪作业,舞台和任务。它可以像跟踪阶段ID一样完成。但是如何知道哪个阶段 Id 是作业链中的哪个步骤。

当考虑按列筛选完成时,我想向用户发送通知的内容。为此,我创建了一个扩展 SparkListener 类的类。但是我无法从哪里找到当前正在执行的转换名称的名称。有没有可能跟踪?

public class ProgressListener extends SparkListener{

  @Override
  public void onJobStart(SparkListenerJobStart jobStart)
  {

  }

  @Override
  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted)
  {
      //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only
  }

  @Override
  public void onTaskStart(SparkListenerTaskStart taskStart)
  {
      //no such method like taskStart.name()
  }
}

答案 1

例如,您无法确切知道过滤器操作何时开始或结束。

这是因为您有转换(,,...)和操作(,,...)。Spark将在一个阶段中投入尽可能多的操作。然后,在输入的不同分区上并行执行该阶段。问题来了。filtermapcountforeach

假设您有多个 worker 和以下程序

负载 ==>地图 ==>过滤器 ==>分组依据 + 聚合

该程序可能有两个阶段:第一阶段将加载文件并应用和。然后,输出将被洗牌以创建组。在第二阶段,将执行聚合。mapfilter

现在,问题是,您有几个工作线程,每个工作线程将并行处理部分输入数据。也就是说,集群中的每个执行器都将收到程序(当前阶段)的副本,并在分配的分区上执行该副本。

您会看到,您将有多个并行执行的和运算符的实例,但不一定同时执行。在极端情况下,工作线程 1 将在工作线程 20 开始之前完成阶段 1(因此在工作线程 20 之前完成其操作)。mapfilterfilter

对于RDD,Spark在阶段内使用迭代器模型。但是,对于最新 Spark 版本中的数据集,它们会在分区上创建单个循环并执行转换。这意味着在这种情况下,Spark本身并不真正知道转换操作员何时完成单个任务!

长话短说:

  1. 您无法知道阶段内的操作何时完成
  2. 即使可以,也有多个实例将在不同的时间完成。

所以,现在我已经遇到了同样的问题:

在我们的 Piglet 项目中(请允许一些逆向 ;-)),我们从 Pig Latin 脚本生成 Spark 代码,并希望分析脚本。我最终在所有用户运算符之间插入了运算符,这些运算符将分区ID和当前时间发送到将评估消息的服务器。但是,此解决方案也有其局限性...我还没有完全满意。mapPartition

但是,除非您能够修改程序,否则恐怕您无法实现所需的目标。


答案 2

您是否考虑过此选项:http://spark.apache.org/docs/latest/monitoring.html
似乎可以使用以下 rest api 来获取某个作业状态 /applications/[app-id]/jobs/[job-id]

您可以设置 JobGroupId 和 JobGroupDescription,以便跟踪正在处理的作业组。即 setJobGroup

假设您将 JobGroupId 称为“测试”

sc.setJobGroup("1", "Test job")

何时调用 http://localhost:4040/api/v1/applications/[app-id]/jobs/[job-id]

你将得到一个 json,其中包含该作业的描述性名称:

{
  "jobId" : 3,
  "name" : "count at <console>:25",
  "description" : "Test Job",
  "submissionTime" : "2017-02-22T05:52:03.145GMT",
  "completionTime" : "2017-02-22T05:52:13.429GMT",
  "stageIds" : [ 3 ],
  "jobGroup" : "1",
  "status" : "SUCCEEDED",
  "numTasks" : 4,
  "numActiveTasks" : 0,
  "numCompletedTasks" : 4,
  "numSkippedTasks" : 0,
  "numFailedTasks" : 0,
  "numActiveStages" : 0,
  "numCompletedStages" : 1,
  "numSkippedStages" : 0,
  "numFailedStages" : 0
}

推荐