Apache Spark - foreach vs foreachPartition 何时使用什么?
我想知道,由于并行度更高,与考虑我为了在累加器变量中执行一些求和而流经的情况的方法相比,will是否会产生更好的性能。foreachPartition
foreach
RDD
我想知道,由于并行度更高,与考虑我为了在累加器变量中执行一些求和而流经的情况的方法相比,will是否会产生更好的性能。foreachPartition
foreach
RDD
foreach
并且是操作。foreachPartitions
用于调用具有副作用的操作的通用函数。对于RDD中的每个元素,它调用传递的函数。这通常用于操作累加器或写入外部存储。
注意:在 外部修改累加器以外的变量可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。foreach()
例如:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
与 类似,但不是为每个元素调用函数,而是为每个分区调用函数。该函数应该能够接受迭代器。这比因为它减少了函数调用的数量(就像())更有效。
foreach()
foreach()
mapPartitions
前段分区示例的
用法:
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
使用 foreachPartition
与 sparkstreaming (dstreams) 和 kafka producer
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
注意:如果您想避免这种在每个分区创建一次创建创建器的方式,更好的方法是使用广播生产者,因为Kafka生产者是异步的,并且在发送之前大量缓冲数据。
sparkContext.broadcast
累加器样本片段来玩它...通过它,您可以测试性能
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
foreachPartition
在分区上的操作,所以很明显它会比边缘更好foreach
foreachPartition
当您访问昂贵的资源(例如数据库连接或kafka生产者等)时,应使用。这将为每个分区初始化一个,而不是每个元素()初始化一个。当涉及到累加器时,您可以通过上述测试方法来测量性能,在累加器的情况下,这些测试方法也应该工作得更快。foreach
也。。。请参阅具有类似概念的地图与地图分区,但它们是转换。
foreach
在许多节点上自动运行循环。
但是,有时您希望在每个节点上执行一些操作。例如,建立与数据库的连接。您不能只是建立连接并将其传递到函数中:连接仅在一个节点上进行。foreach
因此,使用 ,您可以在运行循环之前连接到每个节点上的数据库。foreachPartition