在 Hadoop Map Reduce 中重命名零件文件

2022-09-02 03:41:23

我尝试按照第 http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html 页中的示例使用该类MultipleOutputs

驱动程序代码

    Configuration conf = new Configuration();
    Job job = new Job(conf, "Wordcount");
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
            Text.class, IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);

减速器代码

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    private MultipleOutputs<Text, IntWritable> mos;
    public void setup(Context context){
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        //context.write(key, result);
        mos.write("text", key,result);
    }
    public void cleanup(Context context)  {
         try {
            mos.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         }
}

发现化简器的输出重命名为 text-r-00000

但这里的问题是,我也得到了一个空的part-r-00000文件。这是多重输出的预期行为方式,还是我的代码存在问题?请指教。

我尝试过的另一种方法是使用FileSystem类循环访问我的输出文件夹,并手动重命名以part开头的所有文件。

最好的方法是什么?

FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
} 
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}

答案 1

即使您正在使用 ,默认值(我相信是)仍在使用,因此它将初始化并创建您看到的这些文件。MultipleOutputsOutputFormatTextOutputFormatpart-r-xxxxx

它们是空的事实是,您没有执行任何操作,因为您正在使用 。但这并不妨碍它们在初始化期间创建。context.writeMultipleOutputs

要摆脱它们,您需要定义您的说您不期望任何输出。你可以这样做:OutputFormat

job.setOutputFormat(NullOutputFormat.class);

设置该属性后,这应该可以确保零件文件永远不会被初始化,但您仍然可以在 中获得输出。MultipleOutputs

您也可以使用,以确保仅在存在某些数据时才创建输出文件,而不会初始化空文件。你可以这样做:LazyOutputFormat

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

请注意,您正在原型中使用,它仅使用默认输出路径,该路径将根据您的结果生成,如下所示:。如果你想更好地控制你的输出文件名,你应该使用原型,它可以让你根据你的键/值在运行时生成文件名。ReducerMultipleOutputs.write(String namedOutput, K key, V value)namedOutput{namedOutput}-(m|r)-{part-number}MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)


答案 2

这是您在 Driver 类中更改输出文件的基名所需的全部操作:因此,这将导致您的文件被称为“text-r-00000”。job.getConfiguration().set("mapreduce.output.basename", "text");


推荐