我想将 1GB(1000 万条记录)的 CSV 文件加载到 Hbase 中。我为它编写了 Map-Reduce 程序。我的代码运行良好,但需要 1 小时才能完成。最后一个Reducer 花费了半个多小时的时间。有人可以帮我吗?
我的代码如下:
驱动程序.Java
package com.cloudera.examples.hbase.bulkimport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* HBase bulk import example
* Data preparation MapReduce job driver
*
* - args[0]:HDFS输入路径
*
- args[1]:HDFS输出路径
*
- args[2]:HBase表名
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/*
* NBA Final 2010 game 1 tip-off time (seconds from epoch)
* Thu, 03 Jun 2010 18:00:00 PDT
*/
// conf.setInt("epoch.seconds.tipoff", 1275613200);
conf.set("hbase.table.name", args[2]);
// Load hbase-site.xml
HBaseConfiguration.addHbaseResources(conf);
Job job = new Job(conf, "HBase Bulk Import Example");
job.setJarByClass(HBaseKVMapper.class);
job.setMapperClass(HBaseKVMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(TextInputFormat.class);
HTable hTable = new HTable(conf, args[2]);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
// Load generated HFiles into table
// LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
// loader.doBulkLoad(new Path(args[1]), hTable);
}
}
HColumnEnum.java
package com.cloudera.examples.hbase.bulkimport;
/**
* HBase table columns for the 'srv' column family
*/
public enum HColumnEnum {
SRV_COL_employeeid ("employeeid".getBytes()),
SRV_COL_eventdesc ("eventdesc".getBytes()),
SRV_COL_eventdate ("eventdate".getBytes()),
SRV_COL_objectname ("objectname".getBytes()),
SRV_COL_objectfolder ("objectfolder".getBytes()),
SRV_COL_ipaddress ("ipaddress".getBytes());
private final byte[] columnName;
HColumnEnum (byte[] column) {
this.columnName = column;
}
public byte[] getColumnName() {
return this.columnName;
}
}
HBaseKVMMapper.java
package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import au.com.bytecode.opencsv.CSVParser;
/**
* HBase bulk import example
* <p>
* Parses Facebook and Twitter messages from CSV files and outputs
* <ImmutableBytesWritable, KeyValue>.
* <p>
* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
* into the correct HBase table region.
* <p>
* The KeyValue value holds the HBase mutation information (column family,
* column, and value)
*/
public class HBaseKVMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
final static byte[] SRV_COL_FAM = "srv".getBytes();
final static int NUM_FIELDS = 6;
CSVParser csvParser = new CSVParser();
int tipOffSeconds = 0;
String tableName = "";
// DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
// .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
ImmutableBytesWritable hKey = new ImmutableBytesWritable();
KeyValue kv;
/** {@inheritDoc} */
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration c = context.getConfiguration();
// tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
tableName = c.get("hbase.table.name");
}
/** {@inheritDoc} */
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*if (value.find("Service,Term,") > -1) {
// Skip header
return;
}*/
String[] fields = null;
try {
fields = value.toString().split(",");
//csvParser.parseLine(value.toString());
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
return;
}
if (fields.length != NUM_FIELDS) {
context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
return;
}
// Get game offset in seconds from tip-off
/* DateTime dt = null;
try {
dt = p.parseDateTime(fields[9]);
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
return;
}
int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
String offsetForKey = String.format("%04d", gameOffset);
String username = fields[2];
if (username.equals("")) {
username = fields[3];
}*/
// Key: e.g. "1200:twitter:jrkinley"
hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
.getBytes());
// Service columns
if (!fields[0].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
context.write(hKey, kv);
}
if (!fields[1].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
context.write(hKey, kv);
}
if (!fields[2].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
context.write(hKey, kv);
}
if (!fields[3].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
context.write(hKey, kv);
}
if (!fields[4].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
context.write(hKey, kv);
}
if (!fields[5].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
context.write(hKey, kv);
}
context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
/*
* Output number of messages per quarter and before/after game. This should
* correspond to the number of messages per region in HBase
*/
/* if (gameOffset < 0) {
context.getCounter("QStats", "BEFORE_GAME").increment(1);
} else if (gameOffset < 900) {
context.getCounter("QStats", "Q1").increment(1);
} else if (gameOffset < 1800) {
context.getCounter("QStats", "Q2").increment(1);
} else if (gameOffset < 2700) {
context.getCounter("QStats", "Q3").increment(1);
} else if (gameOffset < 3600) {
context.getCounter("QStats", "Q4").increment(1);
} else {
context.getCounter("QStats", "AFTER_GAME").increment(1);
}*/
}
}