计算 RDD 中的行数

2022-09-01 19:41:56

我正在将spark与java一起使用,并且我有一个500万行的RDD。有没有一个解决方案可以让我计算RDD的行数。我试过,但这需要很多时间。我已经看到我可以使用该功能。但是我没有找到这个函数的java文档。你能告诉我如何使用它,或者给我看另一个解决方案来获取我的RDD的行数。RDD.count()fold

这是我的代码:

JavaPairRDD<String, String> lines = getAllCustomers(sc).cache();
JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache();
JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache();

double count_ctid = (double)join.count(); // i want to get the count of these three RDD
double all = (double)lines.count();
double count_cfid = all - CFIDNotNull.count();
System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%");

谢谢。


答案 1

你有正确的想法:用来计算行数。没有比这更快的方法了。rdd.count()

我认为你应该问的问题是为什么rdd.count()这么慢?

答案是,这是一个“操作”——它是一个急切的操作,因为它必须返回一个实际的数字。您之前执行的RDD操作是“转换” - 它们懒惰地将RDD转换为另一个RDD。实际上,转换实际上并没有执行,只是排队。调用 时,强制执行前面的所有惰性操作。输入文件需要立即加载,执行s和s,执行洗牌等,直到最后我们有数据并可以说它有多少行。rdd.count()count()count()map()filter()

请注意,如果您呼叫两次,则所有这一切都将发生两次。返回计数后,将丢弃所有数据!如果要避免这种情况,请调用RDD。然后,第二次调用将很快,并且派生的RDD的计算速度也更快。但是,在这种情况下,RDD必须存储在内存(或磁盘)中。count()cache()count()


答案 2

丹尼尔的解释是对的。但是,如果您愿意接受近似值,则可以尝试RDD方法。(但请注意,这被标记为“实验性”)。countcountApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]


推荐