火花 - 地图中的过滤器
2022-09-04 20:53:36
我正在尝试在地图功能内部进行过滤。基本上,我在经典的map-reduce中这样做的方法是,当筛选条件满足时,mapper不会向上下文写入任何内容。如何实现与火花的类似效果?我似乎无法从map函数返回null,因为它在随机步骤中失败。我可以使用过滤器功能,但似乎没有必要迭代数据集,而我可以在映射期间执行相同的任务。我也可以尝试使用虚拟密钥输出null,但这是一个糟糕的解决方法。
我正在尝试在地图功能内部进行过滤。基本上,我在经典的map-reduce中这样做的方法是,当筛选条件满足时,mapper不会向上下文写入任何内容。如何实现与火花的类似效果?我似乎无法从map函数返回null,因为它在随机步骤中失败。我可以使用过滤器功能,但似乎没有必要迭代数据集,而我可以在映射期间执行相同的任务。我也可以尝试使用虚拟密钥输出null,但这是一个糟糕的解决方法。
有几个选项:
rdd.flatMap
:将集合拼合到 RDD 中。若要选取元素,通常将返回作为转换结果的 。rdd.flatMap
Traversable
Option
rdd.flatMap(elem => if (filter(elem)) Some(f(elem)) else None)
rdd.collect(pf: PartialFunction)
允许您提供一个部分函数,该函数可以过滤和转换原始RDD中的元素。您可以使用此方法的所有模式匹配功能。
rdd.collect{case t if (cond(t)) => f(t)}
rdd.collect{case t:GivenType => f(t)}
正如Dean Wampler在评论中提到的那样,可能比上面提到的其他更“简洁”的选项一样好,甚至更快。rdd.map(f(_)).filter(cond(_))
其中 是转换(或映射)函数。f
也许试试 这里是文档: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.map_filter.html#pyspark.sql.functions.map_filtermap_filter(col, lambda-function)