将 1GB 数据加载到 hbase 需要 1 小时

2024-02-09

我想将 1GB(1000 万条记录)的 CSV 文件加载到 Hbase 中。我为它编写了 Map-Reduce 程序。我的代码运行良好,但需要 1 小时才能完成。最后一个Reducer 花费了半个多小时的时间。有人可以帮我吗?

我的代码如下:

驱动程序.Java




    package com.cloudera.examples.hbase.bulkimport;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * HBase bulk import example
* Data preparation MapReduce job driver *
    *
  1. args[0]:HDFS输入路径 *
  2. args[1]:HDFS输出路径 *
  3. args[2]:HBase表名 *
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game 1 tip-off time (seconds from epoch) * Thu, 03 Jun 2010 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // Load generated HFiles into table // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }

HColumnEnum.java




        package com.cloudera.examples.hbase.bulkimport;

    /**
     * HBase table columns for the 'srv' column family
     */
    public enum HColumnEnum {
      SRV_COL_employeeid ("employeeid".getBytes()),
      SRV_COL_eventdesc ("eventdesc".getBytes()),
      SRV_COL_eventdate ("eventdate".getBytes()),
      SRV_COL_objectname ("objectname".getBytes()),
      SRV_COL_objectfolder ("objectfolder".getBytes()),
      SRV_COL_ipaddress ("ipaddress".getBytes());

      private final byte[] columnName;

      HColumnEnum (byte[] column) {
        this.columnName = column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }
  

HBaseKVMMapper.java

package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import au.com.bytecode.opencsv.CSVParser;

/**
 * HBase bulk import example
 * <p>
 * Parses Facebook and Twitter messages from CSV files and outputs
 * <ImmutableBytesWritable, KeyValue>.
 * <p>
 * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
 * into the correct HBase table region.
 * <p>
 * The KeyValue value holds the HBase mutation information (column family,
 * column, and value)
 */
public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

  final static byte[] SRV_COL_FAM = "srv".getBytes();
  final static int NUM_FIELDS = 6;

  CSVParser csvParser = new CSVParser();
  int tipOffSeconds = 0;
  String tableName = "";

  // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
  //    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));

  ImmutableBytesWritable hKey = new ImmutableBytesWritable();
  KeyValue kv;

  /** {@inheritDoc} */
  @Override
  protected void setup(Context context) throws IOException,
      InterruptedException {
    Configuration c = context.getConfiguration();

  //  tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
    tableName = c.get("hbase.table.name");
  }

  /** {@inheritDoc} */
  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    /*if (value.find("Service,Term,") > -1) {
      // Skip header
      return;
    }*/

    String[] fields = null;

    try {
      fields = value.toString().split(",");
      //csvParser.parseLine(value.toString());
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
      return;
    }

    if (fields.length != NUM_FIELDS) {
      context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
      return;
    }

    // Get game offset in seconds from tip-off
  /*  DateTime dt = null;

    try {
      dt = p.parseDateTime(fields[9]);
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
      return;
    }

    int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey = String.format("%04d", gameOffset);

    String username = fields[2];
    if (username.equals("")) {
      username = fields[3];
    }*/

    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
        .getBytes());

    // Service columns
    if (!fields[0].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[1].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[2].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[3].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[4].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[5].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
      context.write(hKey, kv);
    }


    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);

    /*
     * Output number of messages per quarter and before/after game. This should
     * correspond to the number of messages per region in HBase
     */
  /*  if (gameOffset < 0) {
      context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
      context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
      context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
      context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
      context.getCounter("QStats", "Q4").increment(1);
    } else {
      context.getCounter("QStats", "AFTER_GAME").increment(1);
    }*/
  }
}

首先,为什么我们需要 Mapreduce 程序将这么小的文件(1GB)的数据加载到 Hbase 中。

根据我的经验,我使用 Jackson 流处理了 5GB Json(我不想将所有 json 放入内存中),并通过使用批处理技术在 8 分钟内保留在 Hbase 中。

我使用 hbase 批量列出 100000 条记录的对象。

下面是我实现这一目标的代码片段。解析其他格式时也可以做同样的事情)

可能你需要在两个地方调用这个方法

1)批量为100000条记录。

2) 对于您的批次记录少于100000条的处理提醒

  public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 1GB 数据加载到 hbase 需要 1 小时 的相关文章

  • 带有来自 Selenium 2 / WebDriver 的 Id 的 jQuery 元素选择器

    我可以在 Selenium 中获取元素的 ID RemoteWebElement webElement getId 它返回一个像这样的字符串 e9b6a1cc bb6f 4740 b9cb b83c1569d96d 我想知道这个ID的来源
  • hive 添加分区语句忽略前导零

    我在 hdfs 上有文件夹 user test year 2016 month 04 dt 25 000000 0 需要将上面的分区路径添加到test table 命令 ALTER TABLE test ADD IF NOT EXISTS
  • Java,将 null 分配给对象和仅声明之间有什么区别

    之间有什么区别 Object o null and Object o 仅声明 有人可以回答我吗 这取决于您声明变量的范围 例如 局部变量没有default values在这种情况下你将不得不分配null手动 在这种情况下实例变量分配 nul
  • Eclipse 自动完成更改变量名称

    只是一个愚蠢的问题 但很难搜索 因为有很多关于 Eclipse 自动完成的主题 而且很难找到与我的问题匹配的内容 所以问题是 如果我写 MyClass MyVarName 然后按空格键 添加 new MyClass Eclipse 自动添加
  • 警告:跳过条目,因为它不是绝对 URI。 NetBeans 中的 GlassFish

    我成功安装了 GlassFish 但是 当我启动服务器时 我收到两条警告消息 警告 跳过条目 因为它不是绝对 URI 那是关于什么的 Launching GlassFish on Felix platform Aug 09 2014 10
  • 将 JSON Map 传递到 Spring MVC 控制器

    我正在尝试将 Map 的 JSON 表示形式作为 POST 参数发送到我的控制器中 RequestMapping value search do method RequestMethod GET consumes application j
  • 隐藏类的 System.out.print 调用

    我正在使用 java 库 jar 文件 该文件的作者放入了一堆System out print and System out printlns 有没有办法隐藏特定对象的这些消息 编辑 看起来jar文件似乎正在创建一堆线程 并且每个线程都有它
  • 根据哈希值确认文件内容

    我需要 检查完整性 content文件数量 文件将写入 CD DVD 可能会被复制多次 这个想法是识别正确复制的副本 在从 Nero 等中删除它们之后 我对此很陌生 但快速搜索表明Arrays hashCode byte http down
  • 在Java中使用BufferedWriter写入文件时监视文件大小?

    我正在将一个可能很长的项目列表写入文件 我正在写的项目的长度是可变的 如果生成的文件大小大于10M 则应将其分成多个文件 为了提高性能 我目前使用 BufferedWriter 如下所示 final FileOutputStream fos
  • Java 套接字:可以从一个线程发送并在另一个线程上接收吗?

    这可能是一个非常基本的问题 但我很难找到答案 让一个线程写入 Socket 的输出流 而另一个线程从 Socket 的输入流读取数据 这样可以吗 编辑 这是一个与外部服务器通信的客户端应用程序 我并不是想让两个线程互相交谈 很抱歉含糊不清
  • Java中的OR运算(BitSet.class)

    如何编写一个程序 该程序需要001010101110000100100 011100010001000011000 000000000010000000000100 作为输入 位 输出将是OR其中 3 个 OR 0 0 0 0 1 1 1
  • Akka 和 spring 配置

    我正在尝试将 akka 与 spring 结合起来 但没有成功 基本上 我的应用程序似乎不习惯读取 akka 模式 具有架构的 service context xml 的一部分
  • 多对多不检索映射数据

    Spring boot 2 5 6 我无法安装版本 概要文件 java Getter Setter NoArgsConstructor AllArgsConstructor EqualsAndHashCode FieldDefaults l
  • java 1.8下无法启动eclipse

    java 1 8 升级后我无法启动 eclipse 附上错误截图 这是我的 eclipse 配置设置 我该如何解决 startup plugins org eclipse equinox launcher 1 3 0 v20120522 1
  • 链表中的虚拟节点

    问 什么时候使用它们 作业问题 列表中的第一个和最后一个节点 有时用作列表中的第一个和最后一个节点 从未用作列表中的第一个和最后一个节点 维基百科说 哨兵节点是与链接一起使用的专门指定的节点 列表和树作为遍历路径终止符 哨兵节点的作用是 不
  • Microsoft JDBC 中的 JTDS 属性相当于什么?

    我正在将 JTDS 连接更改为 Microsoft JDBC 并且我看到存在于http jtds sourceforge net faq html http jtds sourceforge net faq htmlMicrosoft JD
  • Checkstyle - 方法按修饰符排序

    是否可以添加到 checkstyle 规则以按修饰符对类中的方法进行排序 我的意思是开头的公共方法和最后的私有方法 MethodsOrderCheck做这个工作 检查文档 https www qulice com qulice checks
  • Drools:为什么是无状态会话?

    Drools 使用会话来存储运行时数据 为此 有两种会话 无状态和有状态 与无状态会话相比 有状态会话允许迭代调用 并且似乎比无状态会话具有所有优势 那么为什么会有无状态会话呢 他们服务的目的是什么 与有状态会话相比 它们的优势是什么 谢谢
  • spring data jpa 过滤 @OneToMany 中的子项

    我有一个员工测试实体是父实体并且FunGroup信息子实体 这两个实体都是通过employeeId映射 我需要一种方法来过滤掉与搜索条件匹配的子实体 以便结果仅包含父实体和子实体 满足要求 员工测试类 Entity name Employe
  • 无法从 HBase 导出表

    我无法将表从 HBase 导出到 HDFS 下面是错误跟踪 它的尺寸相当大 还有其他方法可以导出吗 我使用下面的命令来导出 我增加了 rpc 超时但作业仍然失败 sudo u hdfs hbase Dhbase rpc timeout 10

随机推荐

  • 在 npm 构建期间找不到模块 @restart/context/forwardRef

    我最近开始遇到问题npm build升级到较新版本后react bootstrap 1 0 0 beta 6 Creating an optimized production build Failed to compile Cannot f
  • 从 Cordova 2.5 升级到 Cordova 3.0,在使用 CordovaInterface 时遇到问题

    我正在将我的项目从 Cordova 2 5 迁移到 Cordova 3 遵循中提到的迁移过程 http cordova apache org docs en 3 0 0 guide cli index md html http cordov
  • 从电子邮件中删除无效字符

    我想帮助用户在电子邮件输入中错误地输入无效字符 服务器端验证 清理前 注意 我不在前端验证电子邮件 只是清理 Coffeescript Element find input type email on change keyup event
  • Frederickson堆选择算法简单解释

    Frederickson 的堆选择算法是否有任何简单的解释 可以在 O k 时间内找到在线任何地方可用的最小堆中的第 k 个排序元素 如果没有 任何人都可以解释该算法的内部原理吗 尝试谷歌搜索 frederickson heap selec
  • 如何将自定义默认生成操作关联到 Visual Studio 中的自定义文件类型?

    我有一个为自定义文件类型构建的语言服务 此外 我还在 MSBuild 项目文件中创建了一个自定义目标 构建操作 但是 我无法找到任何方法将该构建操作默认关联到我的自定义文件扩展名 例如 如果添加 cs 文件 则构建操作默认为 编译 我想为我
  • php strip_tags 删除所有内容

    我在用户输入上使用 strip 标签来删除所有可能的标签 但 strip tags php 函数也会删除 例如 某些用户可能会使用表情符号 gt 或者这甚至可以在算法等时使用 是否有任何解决方案允许带状标签上的 问题是在这种情况下 foo
  • MySQL 工作台插入

    我正在使用 MySQL Workbench 5 2 28 来设计我的数据库架构 我需要将默认数据插入到一些表中 这可以使用 插入 选项卡来完成 然而 它似乎只允许手动输入数据 一次一行 我有一个包含数百行的 OUTFILE 我想插入这些行
  • React SetState 不调用 render

    我将我的函数发送到子组件callBack 在父级中 我有一个函数setState method onInputUpdated id var array let char id slice 1 console log this state s
  • ASP.NET MVC;一次只能为一名用户编辑选项

    我有一个表 其中包含三个字段和一些记录 如果用户要编辑表中的记录 则不允许其他用户同时编辑该记录 我可以采取哪些步骤来实现这一目标 许多具有桌面应用程序背景的人会想知道这是如何在 Web 应用程序中完成的 锁定记录标志 桌面世界中的一种方法
  • 如果与 ClientHttpRequestInterceptor 一起使用,Spring Resttemplate postforobject 将返回 null 作为对象响应

    我正在尝试使用休息服务 并且正在使用 Spring 发布一些数据RestTemplate postForObjectMethod但我收到空响应 即使我可以在有效负载中看到请求和响应 更新 我正在使用拦截器实现ClientHttpReques
  • CI::报告没有为 Ruby Test::Units 生成 xml?

    我正在尝试使用 CI reporter 生成 ruby 单元测试报告 我的耙文件 require rake require rake testtask require rake packagetask require rake requir
  • 两列并排可滚动

    我的页面看起来像这样 我有两个单独的 div 一个是产品过滤器 另一个是产品 div 产品内容可以显示 40 个产品或 100 个产品或无 即内容可以稍后更改 同样 我的过滤器的长度也可以变化 我希望以某种方式使过滤器 div 可滚动 并使
  • 如何将 AWS S3 url 转换为 boto 的存储桶名称?

    我正在尝试访问http s3 amazonaws com commoncrawl parse output segment http s3 amazonaws com commoncrawl parse output segment 桶与
  • OpenCL 动态并行/GPU 生成的线程?

    CUDA 5 刚刚被释放 http nvidianews nvidia com Releases NVIDIA Releases CUDA 5 Making Programming With World s Most Pervasive P
  • Stream 和 Spring Data 的优点

    有些人重写 CrudRepository 的方法 findAll 以返回 Stream java 8 但我看到他们最终将 Stream 转换为 List 以便通过其余控制器发送它 他们为什么使用 Stream 在这里使用 Stream 有什
  • Grails 集成测试不会回滚

    我正在从这本书中学习grails Grails 的实际应用 http my safaribooksonline com book web development ruby 9781933988931 并且我正在尝试从示例中运行集成测试 在书
  • 使用 VLC 托管无限视频循环流

    我想通过 WIFI 网络从带有 VLC 播放器的电脑向智能手机提供视频流以进行回归测试 视频在智能手机上播放完毕后应自动重新开始 我目前使用 rtsp 作为协议和循环选项 但这不是强制性的 问题是 每次视频重新启动时 都需要进行新的 rts
  • 如何检查 Azure 中应用程序网关的运行状况

    如何使用java sdk检查应用程序网关的健康状况 我需要使用 java sdk 执行类似的操作 如下面的 azure cli 命令 天蓝色网络应用程序网关后端运行状况显示 1 2 json jq r backendAddressPools
  • Redis 中的绝对缓存和滑动缓存

    我想在Redis中实现绝对缓存和滑动缓存 有没有人有任何资源链接 这会有帮助 Redis 已经有很多用于此目的的命令 EXPIRE http redis io commands expire 设置按键超时时间 EXPIREAT http r
  • 将 1GB 数据加载到 hbase 需要 1 小时

    我想将 1GB 1000 万条记录 的 CSV 文件加载到 Hbase 中 我为它编写了 Map Reduce 程序 我的代码运行良好 但需要 1 小时才能完成 最后一个Reducer 花费了半个多小时的时间 有人可以帮我吗 我的代码如下