Apache Spark:在Java中有效地使用mapPartitions
在目前早期发布的名为《高性能火花》的教科书中,Spark的开发人员指出:
为了让 Spark 能够灵活地将一些记录溢出到磁盘,重要的是以这样一种方式表示内部的函数,即你的函数不会强制在内存中加载整个分区(例如,隐式转换为列表)。迭代器有许多方法,我们可以在上面编写函数式样式转换,或者您可以构建自己的自定义迭代器。当转换直接获取并返回迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器转换。
mapPartitions
然而,教科书缺乏使用或类似方法变体的良好示例。而且网上很少有好的代码示例 - 其中大多数是Scala。例如,我们看到这个 Scala 代码使用 zero323 编写的 如何在 mapPartitions 内部的 org.apache.spark.sql.Row 中添加列。mapPartitions
mapPartitions
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
不幸的是,Java没有提供任何像迭代器那样好的东西。因此,它引出了一个问题,如何有效地使用迭代器到迭代器的转换,而不会完全将磁盘溢出为列表?iter.map(...)
mapPartitions
RDD
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
ArrayList<OutObj> out = new ArrayList<>();
while(iter.hasNext()) {
InObj current = iter.next();
out.add(someChange(current));
}
return out.iterator();
});
这似乎是在Java示例中使用的一般语法,但我不明白这将如何最有效,假设您拥有数万条记录(甚至更多...因为,Spark是用于大数据的)。你最终会得到一个迭代器中所有对象的列表,只是为了把它变成一个迭代器(这就意味着某种类型的映射函数在这里会更有效)。mapPartitions
JavaRDD
注意:虽然这8行代码的使用可以写成1行或,但我故意使用这个事实,它在每个分区上运行,而不是在每个元素上运行。mapPartitions
map
flatMap
mapPartitions
RDD
有什么想法吗?