我已经用 3 种不同的方式将数据放入 HBase 中。最有效的(和分布式的)是使用HFileOutputFormat
class.
我按如下方式设置作业...(请注意,这是根据实际代码编辑的,但核心内容仍然存在)
cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);
正如我们所看到的,我的 Mapper 类被称为HiveToHBaseMapper
- 漂亮又原创。 :) 这是它的(再次,粗略的)定义
public class HiveToHBaseMapper extends
Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
throws IOException, InterruptedException {
Configuration config = context.getConfiguration();
String family = config.get("FAMILY");
Double value = Double.parseDouble(sValue);
String sKey = generateKey(config);
byte[] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0)
? Bytes.toBytes(Double.MIN_VALUE)
: Bytes.toBytes(value));
ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey, put);
}
我不知道你是否可以用它来适应MultipleOutputs
或者需要创建一个新的 MR 作业。这是我遇到的将数据导入 HBase 的最佳方法。 :)
这有望帮助您找到解决方案的正确方向。