Apache Spark:在Java中有效地使用mapPartitions

2022-09-04 21:37:01

在目前早期发布的名为《高性能火花》的教科书中,Spark的开发人员指出:

为了让 Spark 能够灵活地将一些记录溢出到磁盘,重要的是以这样一种方式表示内部的函数,即你的函数不会强制在内存中加载整个分区(例如,隐式转换为列表)。迭代器有许多方法,我们可以在上面编写函数式样式转换,或者您可以构建自己的自定义迭代器。当转换直接获取并返回迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器转换。mapPartitions

然而,教科书缺乏使用或类似方法变体的良好示例。而且网上很少有好的代码示例 - 其中大多数是Scala。例如,我们看到这个 Scala 代码使用 zero323 编写的 如何在 mapPartitions 内部的 org.apache.spark.sql.Row 中添加列mapPartitionsmapPartitions

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

不幸的是,Java没有提供任何像迭代器那样好的东西。因此,它引出了一个问题,如何有效地使用迭代器到迭代器的转换,而不会完全将磁盘溢出为列表?iter.map(...)mapPartitionsRDD

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

这似乎是在Java示例中使用的一般语法,但我不明白这将如何最有效,假设您拥有数万条记录(甚至更多...因为,Spark是用于大数据的)。你最终会得到一个迭代器中所有对象的列表,只是为了把它变成一个迭代器(这就意味着某种类型的映射函数在这里会更有效)。mapPartitionsJavaRDD

注意:虽然这8行代码的使用可以写成1行或,但我故意使用这个事实,它在每个分区上运行,而不是在每个元素上运行。mapPartitionsmapflatMapmapPartitionsRDD

有什么想法吗?


答案 1

防止强制整个分区“具体化”的一种方法是将 转换为流,然后使用 的功能 API(例如 函数)。IteratorStreammap

如何将迭代器转换为流?建议了一些将 a 转换为 a 的好方法,因此采用那里建议的选项之一,我们最终可以得到:IteratorStream

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

这应该是“迭代器到迭代器”的转换,因为所有使用的中间 API (, ) 都经过了懒惰的评估。IterableStream

编辑:我自己没有测试过,但OP评论说,我引用,“在列表上使用Stream不会提高效率”。我不知道为什么会这样,我不知道这在一般情况下是否是正确的,但值得一提。


答案 2

推荐