使用 sc.textFile 从子目录中递归获取文件内容

2022-09-02 11:53:59

似乎SparkContext textFile只期望文件存在于给定的目录位置 - 它也没有

  • (a) 递归或
  • (b)甚至支持目录(尝试将目录读取为文件)

任何关于如何构建递归的建议 - 可能比手动创建递归文件列表/下降逻辑更简单?

下面是用例:文件

/数据/表/my_table

我希望能够通过hdfs调用该父目录下所有目录级别的所有文件进行读取。

更新

sc.textFile() 通过(子类)TextInputFormat 调用 Hadoop FileInputFormat。在逻辑内部确实存在执行递归目录读取 - 即首先检测条目是否是目录,如果是,则降序:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }

但是,在调用 sc.textFile 时,目录条目上会出现错误:“不是文件”。此行为令人困惑 - 鉴于似乎对处理目录提供了适当的支持。


答案 1

我正在看一个旧版本的FileInputFormat。

设置递归配置 mapreduce.input.fileinputformat.input.dir.recursive 之前

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build

默认值为 null/未设置,其计算结果为“false”:

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null

后:

现在设置值:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

现在重试递归操作:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.

添加更新/用于每个注释的完整递归,@Ben


答案 2

我发现必须通过以下方式设置这些参数:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")

推荐