使用 MultipleOutputs 写入 MapReduce 中的 HBase

2024-04-06

我目前有一个 MapReduce 作业,它使用 MultipleOutputs 将数据发送到多个 HDFS 位置。完成后,我使用 HBase 客户端调用(在 MR 之外)将一些相同的元素添加到一些 HBase 表中。使用 TableOutputFormat 将 HBase 输出添加为附加的 MultipleOutputs 会很好。通过这种方式,我可以分发我的 HBase 处理。

问题是,我无法让它发挥作用。有没有人在 MultipleOutputs 中使用过 TableOutputFormat...?具有多个 HBase 输出?

基本上,我正在设置我的收藏家,就像这样......

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

我觉得这似乎符合hbase编写的总体思路。

当我输入此内容时,部分问题可能更多地在于工作定义中。看起来 MR(和 Hbase)想要一个全局参数集,就像这样......

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

提供表名称。问题是,我有两张桌子......

有任何想法吗?

Thanks


我已经用 3 种不同的方式将数据放入 HBase 中。最有效的(和分布式的)是使用HFileOutputFormat class.

我按如下方式设置作业...(请注意,这是根据实际代码编辑的,但核心内容仍然存在)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

正如我们所看到的,我的 Mapper 类被称为HiveToHBaseMapper- 漂亮又原创。 :) 这是它的(再次,粗略的)定义

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

我不知道你是否可以用它来适应MultipleOutputs或者需要创建一个新的 MR 作业。这是我遇到的将数据导入 HBase 的最佳方法。 :)

这有望帮助您找到解决方案的正确方向。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 MultipleOutputs 写入 MapReduce 中的 HBase 的相关文章

随机推荐