Spark:以编程方式获取集群核心数
2022-09-04 00:52:45
我在纱线簇中运行我的火花应用。在我的代码中,我使用队列的可用核心数在我的数据集上创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置来获取队列的可用核心数?
我在纱线簇中运行我的火花应用。在我的代码中,我使用队列的可用核心数在我的数据集上创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置来获取队列的可用核心数?
有一些方法可以从 Spark 获取集群中的执行器数量和内核数量。这是我过去用过的一些Scala实用程序代码。您应该能够轻松地将其适应Java。有两个关键思想:
工作线程数是执行程序数减去 1 或 。sc.getExecutorStorageStatus.length - 1
每个工作线程的核心数可以通过在工作线程上执行来获取。java.lang.Runtime.getRuntime.availableProcessors
代码的其余部分是样板,用于为使用 Scala 隐式添加方便的方法。我在1.x年前写了代码,这就是为什么它没有使用.SparkContext
SparkSession
最后一点:合并到多个内核通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用 1.5x 到 4x 之间的任何值,具体取决于数据的大小以及作业是否在共享群集上运行。
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
更新
最近,已被删除。我们已经切换到使用 's(减号再次用于驱动程序)。通过 的正常方式在包外无法访问。因此,我们使用封装违规模式:getExecutorStorageStatus
SparkEnv
blockManager.master.getStorageStatus.length - 1
env
SparkContext
org.apache.spark
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
在寻找几乎相同问题的答案时发现了这一点。
我发现:
Dataset ds = ...
ds.coalesce(sc.defaultParallelism());
完全符合OP的需求。
例如,我的 5 节点 x 8 核心群集为 .defaultParallelism