HBase Java 编程

2023-11-10

一、环境配置

1、引入Maven 库

   <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.3</version>
   </dependency>

2、将hbase-site.xml 和hdfs-site.xml 文件copy进本地项目的classpath路径中.

两个文件的个别参数配置内容视你搭建的HDFS 集群和HBASE集群而定.

hbase-site.xml 

<?xml version="1.0"?>
<configuration>
    <!-- 使用完全分布式 -->
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 指定hbase数据在hdfs上的存放路径 -->
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://mycluster/hbase</value>
    </property>
    <!-- 配置zk地址 -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>s201:2181,s202:2181,s203:2181</value>
    </property>
    <!-- zk的本地目录 -->
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/home/hadoop/zookeeper</value>
    </property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>s201:8020</value>
    </property>
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>s206:8020</value>
    </property>
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>s201:50070</value>
    </property>
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>s206:50070</value>
    </property>
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://s202:8485;s203:8485;s204:8485/mycluster</value>
    </property>
    <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/centos/.ssh/id_rsa</value>
    </property>
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/home/hadoop/hadoop/journal</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>s206:50090</value>
    </property>
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
</configuration>

二、Java 编程

1、创建名称空间

 /**
     * 创建名称空间
     * @throws Exception
     */
    @Test
    public void createNameSpace() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建名字空间描述符
        NamespaceDescriptor nsd = NamespaceDescriptor.create("ns1").build();
        admin.createNamespace(nsd);

        NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
        for(NamespaceDescriptor n : ns){
            System.out.println(n.getName());
        }
    }

2、创建表

 /**
     * 创建表
     * @throws Exception
     */
    @Test
    public void createTable() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建表名对象
        TableName tableName = TableName.valueOf("ns1:t1");
        //创建表描述符对象
        HTableDescriptor tbl = new HTableDescriptor(tableName);
        //创建列族描述符
        HColumnDescriptor col = new HColumnDescriptor("f1");
        tbl.addFamily(col);

        admin.createTable(tbl);
        System.out.println("over");
    }

创建表完成之后,我们可以在hbase shell 命令行,通过list 可以查看表.

hbase(main):007:0> list
TABLE                                                                                                                                    
ns1:t1                                                                                                                                   
1 row(s) in 0.0460 seconds

=> ["ns1:t1"]

3、添加数据到表中

    public void put() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("ns1:t1");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        byte[] rowid = Bytes.toBytes("row1");

        //创建put对象
        Put put = new Put(rowid);

        byte[] f1 = Bytes.toBytes("f1");
        byte[] id = Bytes.toBytes("id") ;
        byte[] value = Bytes.toBytes(102);
        put.addColumn(f1,id,value);

        //执行插入
        table.put(put);
    }

 用scan 命令查看

hbase(main):011:0> scan 'ns1:t1'
ROW                                 COLUMN+CELL                                                                                          
 row1                               column=f1:id, timestamp=1535676987854, value=\x00\x00\x00f                                           
1 row(s) in 0.0760 seconds

每一个put操作实际上都是一个RPC操作,它将客户端数据传送到服务器然后返回。它只适合小数据量的操作,如果有个应用程序需要每秒存储上千行数据到HBase表中,这样处理就不合适了。

4、客户端的写缓冲区 (大表操作)

HBase 的API配备了一个客户端的写缓冲区(write  buffer),缓冲区负责收集put操作,然后调用PRC操作一次性将put送往服务器。全局交换机控制着该缓冲区是否在用。以下是其方法:

void setAutoFlust(boolean autoFlush)

boolean isAutoFlush()

默认情况下,客户端缓冲区是禁用的。可以通过将autoflush 设置为false来激活缓冲区。table.setAutoFlush(false).

当需要强制把数据写到服务器时,可以调用另一个API函数:

void  flushCommits()  throw IOException

用户可以在hbase-site.xml 配置文件中添加一个属性配置缓冲区大小,  一旦超出缓冲区指定的大小限制,客户端就会隐士的调用刷写命令。

默认大小是2M

<property>
    <name>hbase.client.write.buffer</name>
    <value>20971520</value>
    <source>hbase-default.xml</source>
</property>

这会将缓冲区大小增加到20M.

    @Test
    public void bigInsert() throws Exception {

        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000");

        long start = System.currentTimeMillis() ;
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        HTable table = (HTable)conn.getTable(tname);
        //不要自动清理缓冲区
        table.setAutoFlush(false);

        for(int i = 1 ; i < 1000 ; i ++){
            Put put = new Put(Bytes.toBytes("row" + format.format(i))) ;
            //关闭写前日志
            put.setWriteToWAL(false);
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(i));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes("tom" + i));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("age"),Bytes.toBytes(i % 100));
            table.put(put);

            if(i % 200 == 0){
                table.flushCommits();
            }
        }
        //
        table.flushCommits();
        System.out.println(System.currentTimeMillis() - start );
    }

5、原子性put操作

HBase 还有一个特别的put调用,其能保证自身操作的原子性:检查写(check and put )。方法签名如下:

boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
  byte[] value, Put put) throws IOException;
    /**
     * 原子性操作,先检查,在插入
     * @throws IOException
     */
    @Test
    public void testCompareAndSet() throws IOException {

        Configuration conf = HBaseConfiguration.create();
        Connection conn =ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table =conn.getTable(tname);

        Put put = new Put(Bytes.toBytes("row1"));

        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(1));

        boolean res =table.checkAndPut(Bytes.toBytes("row1"),Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(1),put);
        System.out.println(res);
    }

6 、Get 操作 

a、单行get

    public void get() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("ns1:t1");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        byte[] rowid = Bytes.toBytes("row0001");
        Get get = new Get(rowid);
        Result r = table.get(get);
        byte[] idvalue = r.getValue(Bytes.toBytes("f1"),Bytes.toBytes("id"));
        System.out.println(Bytes.toInt(idvalue));
    }

b. get 列表

即传递多个get ,返回Result 数组.

Result[] get(List<Get> gets) throws IOException;

7、delete(删除操作)

a、单行删除

void delete(Delete delete) throws IOException;
    @Test
    public void deleteData() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");

        Table table = conn.getTable(tname);
        Delete del = new Delete(Bytes.toBytes("row0001"));
        del.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"));
        del.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"));
        table.delete(del);
        System.out.println("over");
    }

b、列表删除

void delete(List<Delete> deletes) throws IOException;

8、扫描器

    public void scan() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table = conn.getTable(tname);
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("row0001"));
        scan.setStopRow(Bytes.toBytes("row0003"));

        ResultScanner rs = table.getScanner(scan);
        Iterator<Result> it = rs.iterator();
        while (it.hasNext()) {
            Result r = it.next();
            byte[] name = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));
        }
        rs.close();
    }

如下是“HBase权威指南”的扫描器超时案例:

 public void testScanTimeOut() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn =ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table =conn.getTable(tname);
        Scan scan = new Scan();
        ResultScanner scanner =table.getScanner(scan);
        int scannerTimeOut= (int) conf.getLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,-1);
        System.out.println(scannerTimeOut);
        Thread.sleep(scannerTimeOut+5000);
        while (true){
            try {
            Result result = scanner.next();
            if(result==null) break;
            System.out.println(result);
            }
            catch (Exception e){
                e.printStackTrace();
                break;
            }
        }
        scanner.close();
    }

不之为啥,并没有出现预期的ScannerTimeoutException ,发现问题所在的请留言,非常感谢

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

HBase Java 编程 的相关文章

  • 输入格式是否负责在 Hadoop 的 MapReduce 中实现数据局部性?

    我试图了解数据局部性 因为它与 Hadoop 的 Map Reduce 框架相关 特别是我想了解哪个组件处理数据局部性 即它是输入格式吗 雅虎的开发者网络页面 http developer yahoo com hadoop tutorial
  • 如何在Python中导入hbase?

    我正在尝试在 python 中使用 hbase 并且使用 cloudera 存储库来安装 hadoop hbase 包 它似乎可以工作 因为我可以使用 shell 访问和处理数据库 但它在 python 中不能完全工作 我知道要与 hbas
  • 实时查询/聚合数百万条记录 - hadoop?数据库?卡桑德拉?

    我有一个可以并行化的解决方案 但我 还 没有 hadoop nosql 的经验 并且我不确定哪种解决方案最适合我的需求 理论上 如果我有无限的 CPU 我的结果应该立即返回 因此 任何帮助将不胜感激 谢谢 这是我所拥有的 数千个数据集 da
  • HBASE 行前缀在 hbase 中按相反顺序扫描

    我有一个以下形式的行键
  • 从 HBase shell 导出数据

    我正在尝试将数据从 HBase Shell 导出到我可以解析的文本文件 然后添加到 msysql 数据库 我目前正在使用以下命令 echo scan registration COLUMNS gt registration status h
  • SparkSQL+Hive+Hbase+Hbase集成不起作用

    当我尝试连接配置单元表 正在使用 时出现错误 通过 Hbase 集成创建 在 Spark 中 我遵循的步骤 Hive表创建代码 CREATE TABLE test sample id string name string STORED BY
  • 增加 Hadoop 2 中 Hive 映射器的数量

    我从 Hive 创建了一个 HBase 表 并尝试对其进行简单的聚合 这是我的 Hive 查询 from my hbase table select col1 count 1 group by col1 地图缩减作业仅产生 2 个映射器 我
  • 如何实现hbase安全批量加载

    我已经在 kerberos 集群中的 hbase 中创建了一个批量加载 其驱动程序类与此类似 工作 public static void main String args try int response ToolRunner run HB
  • 在 HBase 中获取一组行的最网络有效的方法是什么?

    假设我有一组行键 作为一个集合 为这组行获取特定列族的最网络有效方法是什么 Using HTable get List 获取 http hbase apache org apidocs org apache hadoop hbase cli
  • 重新部署后 HBase 协处理器未更新

    我正在使用 HBase 1 1 2 并尝试重新部署自定义端点协处理器来修复 Java 代码中的错误 我对协处理器代码进行了一些更改 并通过以下步骤重新部署它 重建协处理器 jar 将其复制到 HDFS 上的某个位置 删除现有的协处理器 al
  • 使用Java连接到远程HBase服务

    我有一个小示例代码 我在其中尝试建立与远程 HBase 实体的连接 该代码在未安装 HBase 的 Windows 计算机上运行 我尝试连接到已安装并运行该代码的远程 Ubuntu 服务器 下面代码片段中的 IP 当然只是一个占位符 代码如
  • HBase 上的 Thrift 有性能基准吗?

    我有一个可以将大量数据写入 hbase 的系统 系统是用c 编写的 发现hbase有其他语言的thrift接口 我的问题是 HBase 上的 Thrift 有性能基准吗 与java原生api相比 最劣势是什么 我推荐最近关于这个主题的两篇博
  • Spark 在 Hbase 的 InputSplit 期间给出空指针异常

    我正在使用 Spark 1 2 1 Hbase 0 98 10 和 Hadoop 2 6 0 从 hbase 检索数据时出现空点异常 找到下面的堆栈跟踪 sparkDriver akka actor default dispatcher 2
  • 我的 cdh5.2 集群在运行 hbase MR 作业时出现 FileNotFoundException

    我的 cdh5 2 集群运行 hbase MR 作业时出现问题 例如 我将 hbase 类路径添加到 hadoop 类路径中 vi etc hadoop conf hadoop env sh 添加行 export HADOOP CLASSP
  • 如何将多个 QualifierFilter 应用于 HBase 中的一行

    我们想使用两个 QualifierFilters 过滤 HBase 表上的扫描 意味着我们只想获取表中确实具有特定列 col A 的行AND 某个其他列 col B 我们当前的方法如下所示 FilterList filterList new
  • 错误:org.apache.hadoop.hbase.MasterNotRunningException:null+hbase+hadoop

    我最近用两台机器 在ubuntu上 配置了hadoop集群 到目前为止效果很好 但是当我尝试在上面的 hadoop 集群上配置 hbase 时 它 显示错误 这就是我所做的 我有两台机器 192 168 1 110 Hadoop主站 192
  • Hbase 列族

    Hbase 文档表示 避免创建超过 2 3 个列族 因为 Hbase 不能很好地处理超过 2 3 个列族 其原因在于压缩和刷新 以及 IO 但是 如果我的所有列总是填充 对于每一行 那么我认为这个推理并不那么重要 因此 考虑到我对列的访问是
  • HBase Java 客户端 - 未知主机:localhost.localdomain

    版本 Hadoop 2 0 0 cdh4 3 1 HBase 0 94 6 cdh4 3 1 我正在运行cloudera快速启动vm 一切都在172 16 144 150上运行 这是我的小HBase Java客户端 HbaseClient
  • 在hbase中创建表

    我是 hbase 和 hadoop 的新手 无论如何 我已经成功建立了一个由3台机器组成的hadoop集群 现在我需要一些帮助来建立数据库 我有一个表 评论 包含字段 user id comments 对评论的评论 可以多个 和状态字段相同
  • 2n + 1 法定人数是什么意思?

    我在描述 HBase 的 Zookeeper 配置时遇到过这个问题 但我对这个术语并不熟悉 N 与我的 HBase 集群中的节点数量有关系吗 或者我应该在 Zookeeper 集群中使用的节点数量 2f 1是指你所需要的可靠性 可用性水平

随机推荐

  • 校园网络系统服务器配置摘要,校园网网络应用服务器配置

    浅谈校园网网络应用服务器配置 摘要 以某校园网为例 在网络应用服务器设计这个环节中 我们分别用到了web服务器 ftp服务器 dns服务器 dhcp服务器 mail服务器 并且在一台已经安装了windows 2003 server的计算机上
  • 前端框架——React 学习总结,这篇7000字全解决

    React组件复用 React组件复用 把多个组件中部分功能相似或者相同的状态或者逻辑进行复用 复用 state和操作state的方法 复用的方式 render props模式 高阶组件 HOC render props模式 用childr
  • Mysql 时间转换 && 时间函数

    1 时间转换 涉及的函数 DATE FORMAT date format MySQL日期格式化函数 STR TO DATE str format MySQL字符串格式化为日期 UNIX TIMESTAMP MySQL其他数据转换为时间戳 F
  • Vue的antd多选下拉框增加全选操作

    因为antd的多选下拉框没有提供全选操作 我做了一个简易的全选操作 data return categoryList 存放获取到的分选数据 category 已选分类数据
  • QT信号槽的5种连接方式

    在面试中 这是一个经常被问到的问题点 也是刚刚上qt的工程师不会去注意的一个点 qt源代码定义的连接方式如下 1 Qt AutoConnection 一般信号槽不会写第五个参数 其实使用的默认值 使用这个值则连接类型会在信号发送时决定 如果
  • markdown编辑数学公式

    在输入数学公式的时候 需要在数学公式的前后加入 符号 将需要输入的公式加入到 中间 上下标 上标 下标 名称 数学表达式 markdown公式 上标 ab a b a b 下标 ab a b a b 分数 frac 第一个 写分子 第二个
  • React Native(RN)-组件生命周期

    生命周期简介 像 Android 开发一样 React Native RN 中的组件也有生命周期 Lifecycle 借用大神流程图 这张图很简洁直观地告诉我们 生命周期的整个流程以及阶段划分 第一阶段 getDefaultProps gt
  • 目标检测入门

    目录 R CNN 1 1提取候选区域 1 1 1合并规则 1 1 2多样化与后处理 1 2特征提取 1 2 1预处理 2 Fast RCNN 2 1RoI Pooling Layer Faster RCNN 结构 RPN anchor 目标
  • Junit中使用线程池不执行任务代码

    1 在test中使用线程池发送MQ 没有报错 没有执行线程池中的代码 2 查资料 junit框架只要主线程结束完成 单元测试就会关闭 导致线程池中的线程没有执行代码就被销毁关闭了 可以在主线程中sleep一段时间 或者用main方法
  • 稳定ORACLE的执行计划

    很多时候可能我们都希望CBO能够帮我们生成正确 高效的执行计划 但是很多时候事实并非如此 可能因为各种各样的原因 如 统计信息不正确或者CBO天生的缺陷等 都会导致生成的执行计划特别的低效 之前的一家公司有一台专门用于批量做数据校验清洗的数
  • 大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了

    大学四年 看课本是不可能一直看课本的了 对于学习 特别是自学 善于搜索网上的一些资源来辅助 还是非常有必要的 下面我就把这几年私藏的各种资源 网站贡献出来给你们 主要有 电子书搜索 实用工具 在线视频学习网站 非视频学习网站 软件下载 面试
  • 'vue-cli-service' 不是内部或外部命令,也不是可运行的程序 或批处理文件。

    vue时 报 vue cli service 不是内部或外部命令 也不是可运行的程序 或批处理文件 罪该万死 怎么能忘记 npm install 如果你下载的淘宝镜像 也可以cnpm install 转载于 https www cnblog
  • Java设计模式-状态模式

    1 概述 定义 对有状态的对象 把复杂的 判断逻辑 提取到不同的状态对象中 允许状态对象在其内部状态发生改变时改变其行为 例 通过按钮来控制一个电梯的状态 一个电梯有开门状态 关门状态 停止状态 运行状态 每一种状态改变 都有可能要根据其他
  • STM32F031串口(RS485)中断+DMA发送(预备知识)

    STM32F031串口 RS485 中断 DMA发送 前言 GPIO移植过程 与F1系列的一些区别 串口 DMA 前言 最近在搞STM32F031的项目 F0系列与常用的F1系列有一定区别 在开发过程中遇到一些问题 而且花了好长花间在搜寻解
  • js操作剪贴板讲解

    文章目录 复制 剪切 到剪贴板 Document execCommand Clipboard复制 Clipboard writeText Clipboard write copy cut事件 从剪贴板进行粘贴 document execCo
  • 【E2EL5】A Year in Computer Vision中关于图像增强系列部分

    http www themtank org a year in computer vision 部分中文翻译汇总 https blog csdn net chengyq116 article details 78660521 The M T
  • eclipse修改文字显示大小及html乱码修改编码格式

    1 修改字体大小 2 修改编码格式 html文件出现乱码时需要修改编码格式 备注 有时候修改后还会是乱码 重启eclipse即可
  • 2022年7月3日leetcode每日一题打卡——112.路径总和

    一 题目描述与要求 112 路径总和 力扣 LeetCode 题目描述 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 判断该树中是否存在 根节点到叶子节点 的路径 这条路径上所有节点值相加等于目标和 target
  • 基于YOLO-V5的结核杆菌目标检测系统【毕业设计,AI+医疗】

    项目背景 结核病 Tuberculosis TB 是由结核分枝杆菌 Mycobacterium tuberculosis 引起的一种慢性人畜共患病 它不受年龄 性别 种族 职业 地区的影响 人体许多器官 系统均可患结核病 其中以肺结核最为常
  • HBase Java 编程

    一 环境配置 1 引入Maven 库