分拆TableSplit 让多个mapper同时读取

2023-11-01

默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。 

由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
Java代码   收藏代码
  1. mapred.min.split.size  
  2. mapred.max.split.size  


所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。 

HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据: 
Java代码   收藏代码
  1. TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan,  
  2.                     UserViewHisMapper2.class, Text.class, Text.class,  
  3.                     genRecommendations);  

而这个方法,最终是调用以下方法进行初始化设置的: 
Java代码   收藏代码
  1. public static void initTableMapperJob(byte[] table, Scan scan,  
  2.      Class<? extends TableMapper> mapper,  
  3.      Class<? extends WritableComparable> outputKeyClass,  
  4.      Class<? extends Writable> outputValueClass, Job job,  
  5.      boolean addDependencyJars)  
  6.  throws IOException {  
  7.      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,  
  8.              outputValueClass, job, addDependencyJars, TableInputFormat.class);  
  9.  }  


所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase: 

Java代码   收藏代码
  1. public class TableInputFormat extends TableInputFormatBase  
  2. implements Configurable   


最终要修改的则是TableInputFormatBase这个类,修改其以下方法: 

Java代码   收藏代码
  1. public List<InputSplit> getSplits(JobContext context) throws IOException {}  


这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit: 
Java代码   收藏代码
  1. public List<InputSplit> getSplits(JobContext context) throws IOException {  
  2. f (table == null) {  
  3.    throw new IOException("No table was provided.");  
  4.   
  5.   Pair<byte[][], byte[][]> keys = table.getStartEndKeys();  
  6.   if (keys == null || keys.getFirst() == null ||  
  7.       keys.getFirst().length == 0) {  
  8.     throw new IOException("Expecting at least one region.");  
  9.   }  
  10.   int count = 0;  
  11.   List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);  
  12.   for (int i = 0; i < keys.getFirst().length; i++) {  
  13.     if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {  
  14.       continue;  
  15.     }  
  16.     String regionLocation = table.getRegionLocation(keys.getFirst()[i]).  
  17.       getHostname();  
  18.     byte[] startRow = scan.getStartRow();  
  19.     byte[] stopRow = scan.getStopRow();  
  20.     // determine if the given start an stop key fall into the region  
  21.     if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||  
  22.          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&  
  23.         (stopRow.length == 0 ||  
  24.          Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {  
  25.       byte[] splitStart = startRow.length == 0 ||  
  26.         Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?  
  27.           keys.getFirst()[i] : startRow;  
  28.       byte[] splitStop = (stopRow.length == 0 ||  
  29.         Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&  
  30.         keys.getSecond()[i].length > 0 ?  
  31.           keys.getSecond()[i] : stopRow;  
  32.       InputSplit split = new TableSplit(table.getTableName(),  
  33.         splitStart, splitStop, regionLocation);  
  34.       splits.add(split);  
  35.       if (LOG.isDebugEnabled())  
  36.         LOG.debug("getSplits: split -> " + (count++) + " -> " + split);  
  37.     }  
  38.   }  
  39.   return splits;  
  40. }  


这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。 
以下是我的实现方法: 

Java代码   收藏代码
  1. public List<InputSplit> getSplits(JobContext context) throws IOException {  
  2.     if (table == null) {  
  3.         throw new IOException("No table was provided.");  
  4.     }  
  5.     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();  
  6.     if (keys == null || keys.getFirst() == null  
  7.             || keys.getFirst().length == 0) {  
  8.         throw new IOException("Expecting at least one region.");  
  9.     }  
  10.     int count = 0;  
  11.     List<InputSplit> splits = new ArrayList<InputSplit>(  
  12.             keys.getFirst().length);  
  13.     for (int i = 0; i < keys.getFirst().length; i++) {  
  14.         if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {  
  15.             continue;  
  16.         }  
  17.         String regionLocation = table.getRegionLocation(keys.getFirst()[i],true)  
  18.                 .getHostname();  
  19.         byte[] startRow = scan.getStartRow();  
  20.         byte[] stopRow = scan.getStopRow();  
  21.         // determine if the given start an stop key fall into the region  
  22.         if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes  
  23.                 .compareTo(startRow, keys.getSecond()[i]) < 0)  
  24.                 && (stopRow.length == 0 || Bytes.compareTo(stopRow,  
  25.                         keys.getFirst()[i]) > 0)) {  
  26.             byte[] splitStart = startRow.length == 0  
  27.                     || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys  
  28.                     .getFirst()[i] : startRow;  
  29.             byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(  
  30.                     keys.getSecond()[i], stopRow) <= 0)  
  31.                     && keys.getSecond()[i].length > 0 ? keys.getSecond()[i]  
  32.                     : stopRow;  
  33.   
  34.             Scan scan1 = new Scan();  
  35.             scan1.setStartRow(splitStart);  
  36.             scan1.setStopRow(splitStop);  
  37.             scan1.setFilter(new KeyOnlyFilter());  
  38.             scan1.setBatch(500);  
  39.               
  40.             ResultScanner resultscanner = table.getScanner(scan1);  
  41.               
  42.             //用来保存该region的所有key  
  43.             List<String> rows = new ArrayList<String>();  
  44.             //Iterator<Result>  it = resultscanner.iterator();  
  45.               
  46.             for(Result rs : resultscanner)  
  47.             {  
  48.                 if(rs.isEmpty())  
  49.                     continue;  
  50.                 rows.add(new String(rs.getRow()));  
  51.             }  
  52.               
  53.             int splitSize = rows.size() / mappersPerSplit;  
  54.               
  55.             for (int j = 0; j < mappersPerSplit; j++) {  
  56.                 TableSplit tablesplit = null;  
  57.                 if (j == mappersPerSplit - 1)  
  58.                     tablesplit = new TableSplit(table.getTableName(),  
  59.                             rows.get(j * splitSize).getBytes(),  
  60.                             rows.get(rows.size() - 1).getBytes(),  
  61.                             regionLocation);  
  62.                 else  
  63.                     tablesplit = new TableSplit(table.getTableName(),  
  64.                             rows.get(j * splitSize).getBytes(),  
  65.                             rows.get(j * splitSize + splitSize).getBytes(), regionLocation);  
  66.                 splits.add(tablesplit);  
  67.                 if (LOG.isDebugEnabled())  
  68.                     LOG.debug((new StringBuilder())  
  69.                             .append("getSplits: split -> ").append(i++)  
  70.                             .append(" -> ").append(tablesplit).toString());  
  71.             }  
  72.             resultscanner.close();                
  73.         }  
  74.     }  
  75.     return splits;  
  76. }  


通过配置设置需要拆分的split数。 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分拆TableSplit 让多个mapper同时读取 的相关文章

  • Spark 2.0 弃用了“DirectParquetOutputCommitter”,没有它如何生活?

    最近 我们从 HDFS 上的 EMR gt S3 上的 EMR 启用了一致视图的 EMRFS 迁移 我们意识到 Spark SaveAsTable 镶木地板格式 写入 S3 的速度比 HDFS 慢约 4 倍 但我们发现使用 DirectPa
  • java.lang.IllegalArgumentException:错误的 FS:,预期:hdfs://localhost:9000

    我正在尝试实现reduce side join 并使用mapfile reader来查找分布式缓存 但在stderr中检查时它没有查找值 它显示以下错误 lookupfile文件已经存在于hdfs中 并且似乎已正确加载进入缓存 如标准输出中
  • Hadoop setInputPathFilter错误

    我正在使用 Hadoop 0 20 2 无法更改 并且我想向我的输入路径添加一个过滤器 数据如下 path1 test a1 path1 test a2 path1 train a1 path1 train a2 我只想处理所有文件trai
  • Cat 文件与 HDFS 中的模式不匹配?

    我正在尝试 cat 与 hadoop HDFS 中的以下模式不匹配的文件 hdfs dfs cat gz 如何捕获所有不以 gz 结尾的文件 编辑 抱歉 但我需要在 Hadoop 中管理文件 显然 hdfs 附带的命令非常少 编辑2 所有文
  • Hive(查找连续 n 列中的最小值)

    我在 Hive 中有一个表 有 5 列 即电子邮件 a first date b first date c first date d first date a b c d 是用户可以执行的 4 个不同操作 上表中的 4 列表示用户执行第一个
  • 一个目录下可以有两个oozieworkflow.xml文件吗?

    一个目录下可以有两个oozieworkflow xml文件吗 如果是这样 我如何指示 oozie runner 运行哪一个 您可以有两个工作流程文件 只需为它们指定唯一的名称 然后您可以通过设置oozie wf application pa
  • 将 hadoop fs 路径转换为 ​​EMR 上的 hdfs:// 路径

    我想知道如何将数据从 EMR 集群的 HDFS 文件系统移动到 S3 存储桶 我认识到我可以直接在 Spark 中写入 S3 但原则上 之后执行它也应该很简单 到目前为止 我还没有发现在实践中这是正确的 AWS 文档建议s3 dist cp
  • Hadoop 安全模式恢复 - 花费太长时间!

    我有一个包含 18 个数据节点的 Hadoop 集群 我在两个多小时前重新启动了名称节点 并且名称节点仍处于安全模式 我一直在寻找为什么这可能花费太长时间 但找不到好的答案 发帖在这里 Hadoop 安全模式恢复 花费大量时间 https
  • 2017 年在 OS X 上从源代码构建 Apache Hadoop

    到目前为止 我已经分叉了 Git 存储库 https github com apache hadoop 我一直在寻找有关如何从源代码构建的信息 我尝试过以下命令 mvn package Pdist Dtar DskipTests 这导致了以
  • 在 Hadoop 中按文件中的值排序

    我有一个文件 其中每行包含一个字符串 然后是一个空格 然后是一个数字 例子 Line1 Word 2 Line2 Word1 8 Line3 Word2 1 我需要按降序对数字进行排序 然后将结果放入文件中 为数字分配排名 所以我的输出应该
  • 在 Hadoop 中处理带标头的文件

    我想在 Hadoop 中处理很多文件 每个文件都有一些头信息 后面跟着很多记录 每个记录都存储在固定数量的字节中 对此有何建议 我认为最好的解决方案是编写一个自定义的InputFormat http hadoop apache org co
  • 是否值得购买 Mahout in Action 以跟上 Mahout 的速度,或者还有其他更好的来源吗?

    我目前是一个非常随意的用户阿帕奇马胡特 http mahout apache org 我正在考虑购买这本书象夫在行动 http www manning com owen 不幸的是 我很难理解这本书的价值 并且认为它是一本曼宁早期访问计划 h
  • Oozie SSH 操作

    Oozie SSH 操作问题 Issue 我们正在尝试在集群的特定主机上运行一些命令 我们为此选择了 SSH Action 我们面对这个 SSH 问题已经有一段时间了 这里真正的问题可能是什么 请指出解决方案 logs AUTH FAILE
  • 异常:java.lang.Exception:使用 master 'yarn' 运行时,必须在环境中设置 HADOOP_CONF_DIR 或 YARN_CONF_DIR。在火花中

    我是新的阿帕奇火花 我已经在spark独立模式下测试了一些应用程序 但我想运行应用程序yarn模式 我正在windows中运行apache spark 2 1 0 这是我的代码 c spark gt spark submit2 master
  • 将日期字符串转换为“MM/DD/YY”格式

    我刚刚看到这个例子 我该如何解决这个问题 Hive 元存储包含一个名为 Problem1 的数据库 其中包含一个名为 customer 的表 customer 表包含 9000 万条客户记录 90 000 000 每条记录都有一个生日字段
  • hive创建表的多个转义字符

    我正在尝试将带有管道分隔符的 csv 加载到配置单元外部表 数据值包含单引号 双引号 括号等 使用 Open CSV 版本 2 3 测试文件 csv id name phone 1 Rahul 123 2 Kumar s 456 3 Nee
  • hadoop中reducer的数量

    我正在学习hadoop 我发现减速器的数量非常令人困惑 1 reducer的数量与partition的数量相同 2 reducer 的数量是 0 95 或 1 75 乘以 节点数 每个节点的最大容器数 3 减速机数量设定为mapred re
  • 如何使用 Amazon 的 EMR 在 CLI 中使用自定义 jar 指定 mapred 配置和 java 选项?

    我想知道如何指定mapreduce配置 例如mapred task timeout mapred min split size等等 当使用自定义 jar 运行流作业时 当我们使用 ruby 或 python 等外部脚本语言运行时 我们可以使
  • Hive“添加分区”并发

    我们有一个外部 Hive 表 用于处理原始日志文件数据 这些文件每小时一次 并按日期和源主机名分区 目前 我们正在使用简单的 python 脚本导入文件 这些脚本每小时触发几次 该脚本根据需要在 HDFS 上创建子文件夹 从临时本地存储复制
  • 伪分布式模式下的 Hadoop。连接被拒绝

    P S 请不要将此标记为重复 Hi 我一直在尝试以伪分布式模式设置和运行 Hadoop 当我运行 start all sh 脚本时 我得到以下输出 starting namenode logging to home raveesh Hado

随机推荐

  • styled-components 基本用法

    styled components 基本用法 安装 npm install save styled components 或 yarn add styled components 注 如使用tsx语法请同时安装相应的 types声明文件 n
  • qt 程序中执行额外程序和脚本

    1 最简单的 我们可以通过system直接启动一个应用程序或者脚本 system helloworld system hello sh 操作简单 但是我们可以很清晰的看到弊端 虽然很顺利的匹出一个进程去执行另外一个应用 但是我们拿不到这个新
  • 新冠造成的经济崩溃对女性影响最大

    Yui Koizumi 化名 曾经过的挺不错的 大学毕业后她进入了一家广告公司 人生逐渐走上正轨 今年3月的时候 她收到了公司发来的邮件 公司暂时要关闭 不过她无须担心 因为收到了一些补偿金 一旦COVID 19疫情缓解了 公司就又会开张营
  • 23 KVM管理虚拟机-使用VNC密码登录虚拟机

    文章目录 23 KVM管理虚拟机 使用VNC密码登录虚拟机 23 1 概述 23 2 前提条件 23 3 操作步骤 23 KVM管理虚拟机 使用VNC密码登录虚拟机 本章介绍使用VNC密码登录虚拟机的方法 23 1 概述 当虚拟机操作系统安
  • IDEA安装MybatisX插件及使用

    打开idea File gt Setting gt Plugins gt Marketplace gt 搜索 mybatis 出现MybatisX选择点击Install gt Apply gt OK 提示重启即可 图示如下 在IDEA中使用
  • 机械硬盘无法弹出的问题:进程 ID 为 4 的应用程序 System 已停止删除或弹出设备

    一般的解决方法 此电脑单机右键选管理 1 计算机管理 gt 系统工具 gt 事件查看器 gt 自定义视图 gt 管理事件 2 在日期与事件进行排序找到最新的事件 3 合理的关掉这个程序 直接结束进程 保存相关文档后关闭 Word 等程序 另
  • android sdk 64bit,Android SDK不安裝在win 7 64位上。

    I am trying to install Android SDK on windows 7 64 bit but it doesn t work I keep getting this screen 我正在嘗試在windows 7 64
  • android中卡号输入框控件(每四位用空格分隔)(解决输入法跳转的问题)

    由于项目的需求 需要在卡号输入时 每四位用空间分隔 于是就写了个控件 该控件支持中间删除 中间增加 粘贴 末尾输入等 光标的位置显示正确 主要的思想就是 对于添加TextWatcher监听Text的改变 text改变后 拿到该text 将t
  • python爬取51job简历查看信息

    python 爬虫 51job简历 存储历史 效果展示 脚本实现 linux 定时任务 查看定时任务是否添加成功 查看定时任务日志 运行常见问题 1 No module named requests 解决方法 2 No module nam
  • kafka的简单实例

    关于kafka的安装 我主要是在windows下部署的 大家可以看这一篇 https blog csdn net woshixiazaizhe article details 80610432 然后后台启动这个kafka 进入到kafka的
  • 朝圣Java(问题集锦)之:The Apache Tomcat installation at this directory is version 8.5.32. A Tomcat 8.0 inst...

    最近开始学Java了 有C 底子 但是学起来Java还是很吃力 感觉别人架好了各种包 自己只要调用就行了 结果还有各种bug出现 掩面中 启动Tomcat的时候 报错The Apache Tomcat installation at thi
  • Dubbo通信模型

    Dubbo和通信结合 通信实现 服务的发布过程使用通信功能 Protocol export 时会为每个服务创建一个Server 服务的引用过程使用通信功能 Protocol refer 时会创建一个Client 整个类结构及调用关系如下 从
  • (HAL库学习4)STM32CubeMX HAL FreeRTOS 任务创建与删除(也会教直接用代码实现方式)

    这次教的是使用STM32CubeMX使用FreeRTOS来进行任务的创建与任务的删除 其实还有FreeRTOS还有一些需要注意的地方 但是任务的创建与删除就是最重要的了 其他的会在后面讲到 首先说说对FreeRTOS的看法吧 这是公认的大面
  • gitbook 入门教程之 gitbook 简介

    gitBook 是一个基于node js的命令行工具 使用 github git 和 markdown asciiDoc 构建精美的电子书 gitbook 支持输出静态网页和电子书等多种格式 其中默认输出静态网页格式 gitbook 不仅支
  • 除了安苏哪个服务器稳定,同样是魔兽世界玩家,为什么安苏服务器那么不受玩家待见?...

    原标题 同样是魔兽世界玩家 为什么安苏服务器那么不受玩家待见 在魔兽世界中只要一提到安苏服务器 相信很多玩家想到的第一个标签就是贵族服务器 然而同样作为魔兽世界国服服务器中的一员 为什么安苏服务器就那么不受玩家的待见呢 其实原因就很多 今天
  • HTTP 协议详解

    目录 前言 1 HTTP 介绍 2 URL介绍 1 了解 URL 和 URI 2 URL 格式 3 URL encode 3 HTTP 协议格式 1 请求报文格式 2 响应报文格式 3 协议格式总结 4 HTTP 请求 Request 1
  • 红宝书--第一章总结分享

    红宝书 第一章总结分享 作为一名前端开发者 我想很有必要认真阅读业界大佬的著作 这不仅能拓展认知 更能发现曾经的遗漏点和误区 为了激励自己能坚持阅读完 特在此分享自己的品读总结 菜鸟也会变成老鸟 为了我的全栈梦 前端是少不了的 1 Java
  • 如何在Windows 11上安装pycocotools(实操记录)

    参考 https blog csdn net m0 45971439 article details 118332681 https blog csdn net en Wency article details 124767742 目录 一
  • 轻松拿结果-第一部分-第二章 管理者要做“定海神针”

    第二章 管理者要做 定海神针 管理者的三张面孔 做一个严厉的爸爸 在整个团队的管理过程中坚持执行制度 提高团队的人效 做一个温暖的妈妈 让所有员工感受到团队带来的安全感 让大家有所依靠 做一个优秀的教练 有方法 成系统 精细化 过程化的对员
  • 分拆TableSplit 让多个mapper同时读取

    分拆TableSplit 让多个mapper同时读取 默认情况下 一个region是一个tableSplit 对应一个mapper进行读取 但单mapper读取速度较慢 因此想着把默认一个table split分拆成多个split 这样ha