Apache Spark - foreach vs foreachPartition 何时使用什么?

2022-08-31 22:26:40

我想知道,由于并行度更高,与考虑我为了在累加器变量中执行一些求和而流经的情况的方法相比,will是否会产生更好的性能。foreachPartitionforeachRDD


答案 1

foreach并且是操作。foreachPartitions

前(功能):单位

用于调用具有副作用的操作的通用函数。对于RDD中的每个元素,它调用传递的函数。这通常用于操作累加器或写入外部存储。

注意:在 外部修改累加器以外的变量可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包foreach()

例如

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

前每个分区(函数): 单位

与 类似,但不是为每个元素调用函数,而是为每个分区调用函数。该函数应该能够接受迭代器。这比因为它减少了函数调用的数量(就像())更有效。foreach()foreach()mapPartitions

前段分区示例的用法:


  • 示例1 :对于每个分区,要使用一个数据库连接(每个分区块的内部),那么这是如何使用scala完成的示例用法。
/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }
  • 示例 2 :

使用 foreachPartition 与 sparkstreaming (dstreams) 和 kafka producer

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

注意:如果您想避免这种在每个分区创建一次创建创建器的方式,更好的方法是使用广播生产者,因为Kafka生产者是异步的,并且在发送之前大量缓冲数据。sparkContext.broadcast


累加器样本片段来玩它...通过它,您可以测试性能

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

结论:

foreachPartition在分区上的操作,所以很明显它会比边缘更好foreach

经验法则:

foreachPartition当您访问昂贵的资源(例如数据库连接或kafka生产者等)时,应使用。这将为每个分区初始化一个,而不是每个元素()初始化一个。当涉及到累加器时,您可以通过上述测试方法来测量性能,在累加器的情况下,这些测试方法也应该工作得更快。foreach

也。。。请参阅具有类似概念的地图与地图分区,但它们是转换。


答案 2

foreach在许多节点上自动运行循环。

但是,有时您希望在每个节点上执行一些操作。例如,建立与数据库的连接。您不能只是建立连接并将其传递到函数中:连接仅在一个节点上进行。foreach

因此,使用 ,您可以在运行循环之前连接到每个节点上的数据库。foreachPartition