Hadoop DistributedCache已被弃用 - 什么是首选的API?

2022-09-01 04:56:04

我的地图任务需要一些配置数据,我想通过分布式缓存分发这些数据。

Hadoop MapReduce Tutorial 显示了 DistributedCache 类的用法,大致如下:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

但是,在 Hadoop 2.2.0 中被标记为已弃用DistributedCache

实现这一目标的新首选方法是什么?是否有涵盖此 API 的最新示例或教程?


答案 1

分布式缓存的 API 可以在作业类本身中找到。在此处查看文档:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html 代码应类似于

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

在映射器代码中:

Path[] localPaths = context.getLocalCacheFiles();
...

答案 2

为了扩展@jtravaglini,YARN/MapReduce 2的首选使用方式如下:DistributedCache

在驱动程序中,使用Job.addCacheFile()

public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "MyJob");

    job.setMapperClass(MyMapper.class);

    // ...

    // Mind the # sign after the absolute file location.
    // You will be using the name after the # sign as your
    // file name in your Mapper/Reducer
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

    return job.waitForCompletion(true) ? 0 : 1;
}

在映射器/化简器中,覆盖该方法:setup(Context context)

@Override
protected void setup(
        Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
    if (context.getCacheFiles() != null
            && context.getCacheFiles().length > 0) {

        File some_file = new File("./some");
        File other_file = new File("./other");

        // Do things to these two files, like read them
        // or parse as JSON or whatever.
    }
    super.setup(context);
}

推荐