以编程方式将数据大容量加载到 HBase 中的最快方法是什么?
我有一个纯文本文件,可能有数百万行需要自定义解析,我想尽快将其加载到HBase表中(使用Hadoop或HBase Java客户端)。
我目前的解决方案是基于MapReduce作业,没有Reduce部分。我用于读取文本文件,以便将每行传递给我的类的方法。此时,该行被解析以形成一个对象,该对象被写入 .然后,获取该对象并将其插入到表中。FileInputFormat
map
Mapper
Put
context
TableOutputFormat
Put
该解决方案产生的平均插入速率为每秒 1,000 行,这比我预期的要低。我的 HBase 设置在单个服务器上处于伪分布式模式。
一个有趣的事情是,在插入1,000,000行期间,生成了25个映射器(任务),但它们连续运行(一个接一个);这正常吗?
以下是我当前解决方案的代码:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}