[Binospace] 深入分析HBase RPC(Protobuf)实现机制

2023-11-13

背景

在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。

HBase的RPC Protocol

 在HMaster、RegionServer内部,实现了rpc 多个protocol来完成管理和应用逻辑,具体如下protocol如下:

 
HMaster支持的Rpc协议:
MasterMonitorProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase集群监控的目的。

 

MasterAdminProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase表格的管理。例如TableSchema的更改,Table-Region的迁移、合并、下线(Offline)、上线(Online)以及负载平衡,以及Table的删除、快照等相关功能。
RegionServerStatusProtoco,RegionServer与Master之间的通信,Master是RpcServer端,负责提供RegionServer向HMaster状态汇报的服务。
 
RegionServer支持的Rpc协议:
ClientProtocol,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现用户的读写请求。例如get、multiGet、mutate、scan、bulkLoadHFile、执行Coprocessor等。
AdminProtocols,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现Region、服务、文件的管理。例如storefile信息、Region的操作、WAL操作、Server的开关等。
 
(备注:以上提到的Client可以是用户Api、也可以是RegionServer或者HMaster)

hbase-protorpc_1

 

 HBase-RPC实现机制分析

RpcServer配置三个队列:

1)普通队列callQueue,绝大部分Call请求存在该队列中:callQueue上maxQueueLength为${ipc.server.max.callqueue.length},默认是${hbase.master.handler.count}*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER,目前0.95.1中,每个Handler上CallQueue的最大个数默认值(DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)为10。

2)优先级队列: PriorityQueue。如果设置priorityHandlerCount的个数,会创建与callQueue相当容量的queue存储Call,该优先级队列对应的Handler的个数由rpcServer实例化时传入。

3)拷贝队列:replicationQueue。由于RpcServer由HMaster和RegionServer共用,该功能仅为RegionServer提供,queue的大小为${ipc.server.max.callqueue.size}指定,默认为1024*1024*1024,handler的个数为hbase.regionserver.replication.handler.count。

RpcServer由三个模块组成:

Listener ===Queue=== Responder

 hbase_rpc_95

这里以HBaseAdmin.listTables为例,分析一个Rpc请求的函数调用过程:

1) RpcClient创建一个BlockingRpcChannel。

2)以channel为参数创建执行RPC请求需要的stub,此时的stub已经被封装在具体Service下,stub下定义了可执行的rpc接口。

3)stub调用对应的接口,实际内部channel调用callBlockingMethod方法。

RpcClient内实现了protobuf提供的BlockingRpcChannel接口方法callBlockingMethod,

? View Code JAVA
 
  @Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType)
throws ServiceException {
return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
this.isa, this.rpcTimeout);
}

通过以上的实现细节,最终转换成rpcClient的调用,使用MethodDescriptor封装了不同rpc函数,使用Message基类可以接收基于Message的不同的Request和Response对象。

4)RpcClient创建Call对象,查找或者创建合适的Connection,并唤醒Connection。

5)Connection等待Call的Response,同时rpcClient调用函数中,会使用connection.writeRequest(Call call)将请求写入到RpcServer网络流中。

6)等待Call的Response,然后层层返回给更上层接口,从而完成此次RPC调用。

RPCServer收到的Rpc报文的内部组织如下:

Magic

(4Byte)

Version

(1Byte)

AuthMethod

(1Byte)

Connection

HeaderLength

(4Byte)

ConnectionHeader

Request

“HBas”

         
 

验证RpcServer的CURRENT_VERSION

与RPC报文一致

目前支持三类:

AuthMethod.SIMPLE

AuthMethod.KERBEROS

AuthMethod.DIGEST

 

RPC.proto定义
RPCProtos.ConnectionHeader
message ConnectionHeader {
optional UserInformation

userInfo = 1;
optional string

serviceName = 2;
// Cell block codec we will use

sending over optional cell

blocks.  Server throws

exception
// if cannot deal.
optional string

cellBlockCodecClass = 3

[default = "org.apache.

hadoop.hbase.codec.

KeyValueCodec"];
// Compressor we will use

if cell block is compressed. 

Server will throw exception

if not supported.
// Class must implement

hadoop’s

CompressionCodec

Interface
optional string

cellBlockCompressorClass = 4;
}
序列化之后的数据

 

整个Request存储是经过编码之后的byte数组,包括如下几个部分:

RequestHeaderLength(RawVarint32)

RequestHeader

ParamSize(RawVarint32)

Param

CellScanner

 

RPC.proto定义:
message RequestHeader {
// Monotonically increasing callId to keep track of RPC requests and their response
optional uint32 callId = 1;
optional RPCTInfo traceInfo = 2;
optional string methodName = 3;
// If true, then a pb Message param follows.
optional bool requestParam = 4;
// If present, then an encoded data block follows.
optional CellBlockMeta cellBlockMeta = 5;
// TODO: Have client specify priority
}
序列化之后的数据
并从Header中确认是否存在Param和CellScanner,如果确认存在的情况下,会继续访问。

 

Protobuf的基本类型Message,
Request的Param继承了Message,
这个需要获取的Method类型决定。

 

从功能上讲,RpcServer上包含了三个模块,

1)Listener。包含了多个Reader线程,通过Selector获取ServerSocketChannel接收来自RpcClient发送来的Connection,并从中重构Call实例,添加到CallQueue队列中。

 ”IPC Server listener on 60021″ daemon prio=10 tid=0x00007f7210a97800 nid=0x14c6 runnable [0x00007f720e8d0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c43cae68> (a sun.nio.ch.Util$2)
- locked <0x00000000c43cae50> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4322ca8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java:646)

2)Handler。负责执行Call,调用Service的方法,然后返回Pair<Message,CellScanner>

“IPC Server handler 0 on 60021″ daemon prio=10 tid=0x00007f7210eab000 nid=0x14c7 waiting on condition [0x00007f720e7cf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000c43cad90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1804)

3) Responder。负责把Call的结果返回给RpcClient。

 ”IPC Server Responder” daemon prio=10 tid=0x00007f7210a97000 nid=0x14c5 runnable [0x00007f720e9d1000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c4407078> (a sun.nio.ch.Util$2)
- locked <0x00000000c4407060> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4345b68> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcServer.java:833)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.java:816)

RpcClient为Rpc请求建立Connection,通过Connection将Call发送RpcServer,然后RpcClient等待结果的返回。

 思考

1)为什么HBase新版本使用了Protobuf,并实现RPC接口?

HBase是Hadoop生态系统内重要的分布式数据库,Hadoop2.0广泛采用Protobuf作为中间数据组织方式,整个系统内Wire-Compatible的统一需求。

2)HBase内部实现的Rpc框架对于服务性能的影响?

目前使用Protobuf作为用户请求和内部数据交换的数据格式,采用更为紧缩编码格式,能够提高传输数据的效率。但是,有些优化仍然可以在该框架内探索:

实现多个Request复用Connection(把多个短连接合并成一个长连接);

在RpcServer内创建多个CallQueue,分别处理不同的Service,分离管理逻辑与应用逻辑的队列,保证互不干扰;

Responder单线程的模式,是否高并发应用的瓶颈所在?

是否可以分离Read/Write请求占用的队列,以及处理的handler,从而使得读写性能能够更加平衡?

针对读写应用的特点,在RpcServer层次内对应用进行分级,建立不同优先级的CallQueue,按照Hadoop-FairScheduler的模式,然后配置中心调度(类似OMega或者Sparrow轻量化调度方案),保证实时应用的低延迟和非实时应用的高吞吐。优先级更好的Call会优先被调度给Handler,而非实时应用可以实现多个Call的合并操作,从而提高吞吐。

3)Protobuf内置编码与传统压缩技术是否可以配合使用?

使用tcpdump获取了一段HMaster得到的RegionServer上报来的信息:

tcpdump_rpc_0

 

以上的信息几乎是明文出现在tcp-ip连接中,因此,是否在Protobuf-RPC数据格式采取一定的压缩策略,会给scan、multiGet等数据交互较为密集的应用提供一种优化的思路。

参考文献:

[1] HBase Rpc Protocols:  http://blog.zahoor.in/2012/08/protocol-buffers-in-hbase/

[2] HBase project 0.95.1
本系列文章属于Binos_ICTBinospace个人技术博客原创,原文链接为http://www.binospace.com/index.php/in-depth-analysis-hbase-rpc-0-95-version-implementation-mechanism/,未经允许,不得转载。

文章的脚注信息由WordPress的wp-posturl插件自动生成

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

[Binospace] 深入分析HBase RPC(Protobuf)实现机制 的相关文章

  • 恢复在 HBase 中的工作原理

    我想实际观察 HBase 中的恢复是如何工作的 我使用了以下代码片段 Put p new Put Bytes toBytes name10 p setWriteAheadLog true p add Bytes toBytes cf Byt
  • 如何在 Hive 中访问 HBase 表,反之亦然?

    作为一名开发人员 我通过使用以下命令从现有 MySQL 表导入数据 为我们的项目创建了 HBase 表 sqoop job 问题是我们的数据分析师团队熟悉MySQL语法 意味着他们可以查询HIVE轻松上桌 对于他们 我需要在 HIVE 中公
  • 在 HBase 中获取一组行的最网络有效的方法是什么?

    假设我有一组行键 作为一个集合 为这组行获取特定列族的最网络有效方法是什么 Using HTable get List 获取 http hbase apache org apidocs org apache hadoop hbase cli
  • Spark 2 的 hbase-spark

    我想要进行全面扫描hbase from Spark 2 using Scala 我没有固定的目录定义 因此库为SHC https github com hortonworks spark shc不是一个选择 我的逻辑选择是使用 hbase
  • 如何解决grpc Deadline Exceeded错误?

    我有用go和python客户端编写的grpc服务器 有时会出现如下错误 eggs grpcio 1 0 0 py2 7 linux x86 64 egg grpc channel py line 432 in end unary respo
  • 如何在 Java 中对 Hive 进行异步调用?

    我想以异步方式在服务器上执行 Hive 查询 Hive 查询可能需要很长时间才能完成 因此我不想阻止调用 我目前正在使用 Thirft 进行阻塞调用 在 client execute 上阻塞 但我还没有看到如何进行非阻塞调用的示例 这是阻止
  • Java RMI 和 RPC 有什么区别?

    Java RMI 和 RPC 之间的实际区别是什么 我在一些地方读到 RMI 使用对象 RPC是基于C的 因此它具有结构化编程语义 另一方面 RMI是基于Java的技术 并且是面向对象的 通过 RPC 您可以调用导出到服务器中的远程函数 在
  • security.UserGroupInformation:MR 的 PrivilegedgedActionException 错误

    每当我尝试执行映射缩减作业以写入 Hbase 表时 我都会在控制台中收到以下错误 我正在从用户帐户运行 MR 作业 错误 security UserGroupInformation PriviledgedActionException 为
  • 如何在spark中配置hbase?

    Spark连接hbase的步骤是什么 我有两者的主地址 我是否只需将 hbase 地址添加到 Spark 类路径中 这篇关于 Spark 与 HBase 连接的文章应该会有所帮助 http www vidyasource com blog
  • 如何将多个 QualifierFilter 应用于 HBase 中的一行

    我们想使用两个 QualifierFilters 过滤 HBase 表上的扫描 意味着我们只想获取表中确实具有特定列 col A 的行AND 某个其他列 col B 我们当前的方法如下所示 FilterList filterList new
  • HRegionServer 显示“错误告诉主机我们已经启动”。显示套接字异常:参数无效

    我正在尝试在 3 台 centos 机器上创建一个 hbase 集群 Hadoop v 2 8 0 已启动并在我配置的 HBase v 1 2 5 上运行 Hbase 启动正常 它启动了 HMaster 和区域服务器 但仍然在区域服务器和
  • 使用 MultipleOutputs 写入 MapReduce 中的 HBase

    我目前有一个 MapReduce 作业 它使用 MultipleOutputs 将数据发送到多个 HDFS 位置 完成后 我使用 HBase 客户端调用 在 MR 之外 将一些相同的元素添加到一些 HBase 表中 使用 TableOutp
  • HBase Java 客户端 - 未知主机:localhost.localdomain

    版本 Hadoop 2 0 0 cdh4 3 1 HBase 0 94 6 cdh4 3 1 我正在运行cloudera快速启动vm 一切都在172 16 144 150上运行 这是我的小HBase Java客户端 HbaseClient
  • hbase api - 通过行ID列表获取数据行信息

    是否可以通过hbase java API通过行id列表获取hbase数据记录 例如 我有一个已知的 hbase 行 ID 列表 mykey1 myhash1 mykey1 myhash2 mykey1 myhash3 mykey2 myha
  • 将多个前缀行过滤器设置为扫描仪 hbase java

    我想创建一台扫描仪 它可以为我提供带有 2 个前缀过滤器的结果例如 我想要其键以字符串 x 开头或以字符串 y 开头的所有行 目前我知道只能使用一个前缀 方法如下 scan setRowPrefixFilter prefixFiltet 在
  • 从 Storm Bolt 将行插入 HBase

    我希望能够从分布式 非本地 Storm 拓扑将新条目写入 HBase 有一些 GitHub 项目提供以下任一功能 HBase 映射器 https github com ptgoetz storm hbase or 预制风暴螺栓 https
  • 2n + 1 法定人数是什么意思?

    我在描述 HBase 的 Zookeeper 配置时遇到过这个问题 但我对这个术语并不熟悉 N 与我的 HBase 集群中的节点数量有关系吗 或者我应该在 Zookeeper 集群中使用的节点数量 2f 1是指你所需要的可靠性 可用性水平
  • Janusgraph 0.3.2 + HBase 1.4.9 - 无法设置 graph.timestamps

    我在 Docker 容器中运行 Janusgraph 0 3 2 并尝试使用运行 HBase 1 4 9 的 AWS EMR 集群作为存储后端 我可以运行 gremlin server sh 但如果我尝试保存某些内容 我会得到粘贴在下面的堆
  • HBase Shell 日志记录

    使用 HBase shell 时 我收到大量日志记录 包括 INFO 和 DEBUG 消息 虽然这对于学习 HBase 内部结构来说很有趣 但它非常冗长并且可能会掩盖输出 我尝试过以多种不同的方式更改日志记录级别 包括所描述的here ht
  • HBase:返回不存在字段的记录

    我在 HBase 中有一个人员表 如下所示 ROW KEY COLUMN CELL dinesh column details code value dr 01 dinesh column status is error value fal

随机推荐

  • 基于TMF SID的高可扩展性数据模型

    基于TMF SID的高可扩展性数据模型 前言 此文根据TMF SID规范撰写 欢迎大家提出建议和意见 TMF文档版权信息 Copyright TeleManagement Forum 2013 All Rights Reserved Thi
  • Flutter Windows应用开发环境配置

    为什么要入Flutter开发的坑 首先在当今Windows开发已经逐渐成为一个偏小众的领域 不仅要涉及的知识面广 还对开发人员的要求不低 界面的精美也成为一个重要因素 目前已知的Windows 客户端主要分成以下几种 开发语言 Qt C C
  • Android登录 之 Twitter登录

    作为Android登录 之 GooglePlay登录的姊妹篇 这俩篇主要是对接国外平台登录的文章 作者文笔并不好 但是 管他呢 实现功能不就得了嘛 Twitter官网 兄弟们自带梯子啊 然后按照流程 创建申请什么的 也就不多说了 接下来就是
  • Google C++风格指南 阅读笔记

    这个Google C 风格指南出得太好了 有很多C 的问题 其实通过阅读这份文档就可以了 相信读完后 可以在简历上加上一句 具有良好的编码风格 哈哈 下面记录一下我的读书笔记吧 整份文档的中文版本我已经上传到了资源里面 1 头文件 1 1头
  • 在vue使用jsx来解决template中复杂的逻辑处理

    1 首先安装依赖 npm install postcss loader autoprefixer babel loader babel core 2 在 babelrc文件中修改 把 presets env stage 2 plugins
  • 【Python】Windows如何在cmd中切换python版本

    相信很多小伙伴都会有像我一样经历 在windows中装了很多python版本 那么如果我们正式使用的时候应该如何切换呢 方法一 从环境变量中切换python 第一步 打开环境变量 第二步 打开系统变量中Path变量 第三步 将你想使用的Py
  • spring 多个切面的执行顺序及原理

    最近和同事聊起来了springAOP的话题 说了到多个切面的时候程序是怎么执行的 我们常用的spring事务本身也是一个切面 使用的AOP原理 本人从网上找了一些资料 然后根据这些资料进行一下总结 资料地址 1 https blog csd
  • CodeLlama本地部署的实战方案

    大家好 我是herosunly 985院校硕士毕业 现担任算法研究员一职 热衷于机器学习算法研究与应用 曾获得阿里云天池比赛第一名 CCF比赛第二名 科大讯飞比赛第三名 拥有多项发明专利 对机器学习和深度学习拥有自己独到的见解 曾经辅导过若
  • C++:没有与参数列表匹配的构造函数

    报错 E0289 没有与参数列表匹配的构造函数 sales data sales data 实例 初始化一个实例对象 类内定义的构造函数 报错原因 构造函数中第二个参数的类型为 unsigned 而引用只能是引用一个对象 实例化对象时 括号
  • 神经网络(十四)Pytorch完整模型训练和调用GPU加速

    一 模型的训练 Step1 准备数据集 import torchvision train data torchvision dataset CIFAR10 data train True transform torchvision ToTe
  • 微信投屏服务器出错,微信发布7.0.21版本,修复异常问题,增加超实用新功能

    原标题 微信发布7 0 21版本 修复异常问题 增加超实用新功能 iOS微信又更新了 今天发布v7 0 21版本 距离上次更新才两个礼拜的时间 以往微信的更新频率是比较慢的 这次之所以如此之快地发布新版本 感觉是与问题修复有关 因为之前有一
  • UNIX环境高级编程 学习笔记 第六章 系统数据文件和信息

    UNIX系统口令文件 POSIX 1称其为用户数据库 包含以下字段 这些字段也包含在头文件pwd h中定义的passwd结构中 由于历史原因 口令文件 etc passwd是一个ASCII文件 其中的每行都包含以上各字段 字段之间用冒号分隔
  • 无法使用@RequestBody或无法直接使用对象类型获取前端的传递的对象数据

    一 问题概述 当前端发送请求时 传递的参数是一个对象类型 例如 searchForm name 1 age 18 这种格式时 会习惯性使用 RequestBody在后端进行接收 但会发现无法接收到数据 如果你使用的请求方式是get 用的还是
  • 学好ES6/ES2015-核心部分(上)

    ECMAScript 6 以下简称ES6 是JavaScript语言的下一代标准 因为当前版本的ES6是在2015年发布的 所以又称ECMAScript 2015 也就是说 ES6就是ES2015 虽然目前并不是所有浏览器都能兼容ES6全部
  • 贝叶斯优化python包_贝叶斯优化

    万壑松风知客来 摇扇抚琴待留声 1 文起 本篇文章记录通过 Python 调用第三方库 从而调用使用了贝叶斯优化原理的 Hyperopt 方法来进行超参数的优化选择 具体贝叶斯优化原理与相关介绍将在下一次文章中做较为详细的描述 可以参考这里
  • React:判断是否为true有可能会出现的问题

    今天遇到个小问题改了好久 因为state值是要改成url地址 以为是自己哪里逻辑出问题了 搞了好久才发现 是自己判断出现了错误 记录一下 写个小例子 防止二次发生 菜鸡一枚 还希望得到大佬的详解 以下是数字和字符串隐试转换规则 任何非零的数
  • sqli-labs:less-11/12 简单SQL注入和身份验证漏洞综合

    这两个靶场是一样的题 我就拿less 12说事了吧 首先 尝试胡乱输入密码进行测试 发现存在报错 这时用admin和admin这个正确的账号密码进行测试 1 10前面的题目告诉了 发现有着正确的提示 但是还不够 我们尝试在username后
  • vue中axios的二次封装

    1 如果对axios不了解的可以先移步中文axios网 axios中文文档 axios中文网 axioshttp www axios js com zh cn docs 2 这是稀土掘金上的关于axios的封装个人感觉比较细节易懂 vue中
  • UE4 Slate 柱状图

    Fill out your copyright notice in the Description page of Project Settings include ZZTWidget h include Windows AllowWind
  • [Binospace] 深入分析HBase RPC(Protobuf)实现机制

    背景 在HMaster RegionServer内部 创建了RpcServer实例 并与Client三者之间实现了Rpc调用 HBase0 95内部引入了Google Protobuf作为中间数据组织方式 并在Protobuf提供的Rpc接