MapReduce之二次排序

2023-05-16

目录

应用场景

什么是二次排序

怎样实现二次排序       

示例代码


应用场景

假如输入文件内容如下:

a,1
z,3
b,2
a,100
a,3
b,1

要求经过MapReduce处理后,key升序排列,相同key的vaule也升序排列,如下:

a,1
a,3,
a,100
b,1
b,2
z,3

什么是二次排序

二次排序是指我们对key进行排序后,同时也需要对value中的某个字段进行排序。实现二次排序的关键在于将初始的key与待排序字段组合成自定义类型的数据类型,将其作为新的key,利用mapreduce自动对key进行排序的原理,完成二次排序。在上面的示例输入中,初始key为字母列,待排序字段为数字列。

怎样实现二次排序       

由初始key字段与待排序字段组成的自定义的数据类型需要实现WritableComparable接口,WritableComparable接口继承自Writable接口和Comparable接口。Writable接口主要是用来实现序列化和反序列化,Comparable是Java中用于比较的接口。

在自定义的数据类型中,需要将初始key和待排序字段分别定义为变量,添加构造方法和get()/set()方法,同时重写序列化和反序列的方法,注意方法中数据类型要一致,最后重写比较方法。

         将上面自定义好的数据类型作为map输出的key,value还是初始的value,并输出。示例内容中,经过此步处理后,Map输出内容为

((a,1),1)
((z,3),3)
((b,2),2)
((a,100),100)
((a,3),3)
((b,1),1)


​

Map输出之后在reduce之前,还需要对map端输出的值进行处理,如下:

1,分区(partition)。因为使用了组合key作为新的key,如果还用之前的默认分区方法,在存在多个reduce  时,会将数据分散开,不符合要求,所以要 需要使用自定义的分区,然后按照初始的key进行区分,这样才能使结果符合要求。

     自定义分区类需要继承Partitioner类,重写getPartition方法;在Job中通过setPartitionerClass设置使用自定义的分区类。

2,分组(group),需要使用自定义group来处理我们需要的key,按照组合key中第一个字段,即初始key进行分组,这样得到的数据就是有序而且全部的.

       自定义分组类需要实现原生的RawComparator接口,RawComparator是一个原生的优化接口类,它只是简单的提供了数据流中的简单数据比较方法,此接口并没有被多数的衍生类所实现,最常用的实现类为WritableComparator,多数情况下是作为实现Writable接口的类的内部类,提供序列化字节的比较。RawComparator有两个比较方法,一个是对象间的比较,一个是字节数组的比较。

示例代码

(1)     创建map类,使用自定义数据类型作为输出结果的key,并实现map方法 ,设置组合key的值写入到context中


public static class SecondarySortMapper extends

          Mapper<LongWritable,Text,PairWritable,IntWritable>{}

/**......省略....**/
String lineValue = value.toString();

String[] strs = lineValue.split(",") ;

PairWritable mapOutputKey = new PairWritable ();

mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));

mapOutputValue.set(Integer.valueOf(strs[1]));
/**.....省略.....**/

context.write(mapOutputKey, mapOutputValue);

(2)     创建reduce类,使用自定义数据类型作为输入的key,在reduce方法中,将输入key的第一个字段值,即初始key作为输出结果的key,循环输入的列表,将完成排序的key/value输出到上下文中。


/**部分示例代码**/
public static class SecondarySortReducer extends

          Reducer<PairWritable,IntWritable,Text,IntWritable>{}

PairWritable  outputKey = new outputKey();

outputKey.set(key.getFirst());

     for(IntWritable value : values){

          context.write(outputKey, value);

     }

(3)     设置job类,注意map输出类型和reduce类型均为自定义数据类型

/**部分示例代码**/
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());

job.setJarByClass(this.getClass());

job.setMapperClass(SecondarySortMapper.class);

job.setMapOutputKeyClass(PairWritable.class);

job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(SecondarySortReducer.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

/**设置分区类**/
job.setPartitionerClass(FirstPartitioner.class);

/**设置分组类**/
job.setGroupingComparatorClass(FirstGroupingComparator.class);

(4)     实现自定义数据类型PairWritable,实现WritableComparable接口,

public class PairWritable implements WritableComparable<PairWritable> {

     private String first;
     private int second;

     public PairWritable() { }

     public PairWritable(String first, int second) {
          this.set(first, second);
     }

     public void set(String first, int second) {
          this.setFirst(first);
          this.setSecond(second);
     }

     public String getFirst() {
          return first;
     }

     public void setFirst(String first) {
          this.first = first;
     }

/**Get和set方法都用到了Integer的最大值,这是一种保持数据同为正数或同为负数的常用方法,避免出现正数和负数进行比较的情况**/
     public int getSecond() {
          return second - Integer.MAX_VALUE;

     }

     public void setSecond(int second) {
          this.second = second + Integer.MAX_VALUE;
     }

/**序列化和反序列化的方法,注意数据类型要前后对应**/
     public void write(DataOutput out) throws IOException {
          out.writeUTF(first);
          out.writeInt(second);

     }

     public void readFields(DataInput in) throws IOException {
          this.first = in.readUTF();
          this.second = in.readInt();
     }

/**比较对象的大小,返回结果为int类型,先对第一个字段进行比较,如果相同,继续比较第二个字段**/
     public int compareTo(PairWritable o) {
          int comp =this.first.compareTo(o.getFirst()) ;
        /**如果不相等**/
          if(0 != comp){
               return comp ;
          }

       /**相等**/
          return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;

     }
}

(5)     自定义分组类FirstPartitioner,如果第一个字段相同则分为一组

/**部分示例代码**/    
public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {

     @Override
     public int getPartition(PairWritable key, IntWritable value,
               int numPartitions) {

   /**使用哈希码进行分组**/
          return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
     }
}

(6)     自定义分组类FirstGroupingComparator,如果第一个字段相同则为一组
  

/**部分示例代码**/
 public class FirstGroupingComparator implements RawComparator<PairWritable> {

           /**比较对象值,我们需要的是对组合key中的第一个字段进行比较**/

     public int compare(PairWritable o1, PairWritable o2) {
          return o1.getFirst().compareTo(o2.getFirst());
     }

     /**比较字节数组,因为我们输出类型为int,占4个字节,所以用数组总长度l减去4,即为需要的字节长度**/
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
          return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
     }

}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce之二次排序 的相关文章

  • 将json数据保存在hadoop的hdfs中

    我有以下减速器类 public static class TokenCounterReducer extends Reducer
  • 如何在有或没有 Pig 的情况下使用 Cassandra 的 Map Reduce?

    有人可以解释 MapReduce 如何与 Cassandra 6 配合使用吗 我已经阅读了字数统计示例 但我不太明白 Cassandra 端与 客户端 端发生的情况 https svn apache org repos asf cassan
  • CouchDB 视图:MapReduce 中可以接受多少处理?

    我一直在尝试使用 CouchDB 进行 MapReduce 一些示例显示了映射归约函数中可能存在的一些繁重逻辑 在一种特殊情况下 他们在映射内执行 for 循环 在发出您选择的文档之前 MapReduce 是否会在每个可能的文档上运行 如果
  • Cloudera 5.1下作业在LocalJobRunner中保持运行

    需要一些快速帮助 我们的作业在 MapR 下运行良好 但是当我们在 Cloudera 5 1 上启动相同的作业时 它继续以本地模式运行 我确信这是某种配置问题 它是哪个配置设置 14 08 22 12 16 58 INFO mapreduc
  • Hadoop 中的分割大小与块大小

    Hadoop 中的分割大小和块大小之间有什么关系 当我读到this http willddy github io 2012 08 25 Hadoop Split and Block html 分割大小必须是块大小的n倍 n是整数并且n gt
  • MapReduce:ChainMapper 和 ChainReducer

    我需要将 MapReduce jar 文件拆分为两个作业 以获得两个不同的输出文件 每个文件来自两个作业的每个减速器 我的意思是第一个作业必须生成一个输出文件 该文件将作为链中第二个作业的输入 我在 hadoop 版本 0 20 中读到了一
  • 我如何调试 Hadoop MapReduce [重复]

    这个问题在这里已经有答案了 我正在尝试构建一个地图缩减作业 它运行完成 但最后呈现奇怪的数据 当我尝试使用 system out println debug data 调试它时 它没有显示在屏幕上 使用 java API 生成外部日志文件
  • 线程“主”java.lang.VerifyError 中出现异常:操作数堆栈上的类型错误

    在给定 input txt 文件中查找最大温度的 Map Reduce 程序中发生了此错误 我写了两栏 分别是年份和温度 Exception in thread main java lang VerifyError Bad type on
  • JA017:无法查找已启动的 hadoop 作业 ID

    当我在Hue的Oozie编辑器中提交mapreduce作业时 如何解决这个问题 JA017 无法查找与操作 0000009 150711083342968 oozie root W mapreduce f660 关联的已启动 hadoop
  • 使用 Google AppEngine MapReduce 处理所有记录后,如何从计数器获取值?

    使用 Google AppEngine MapReduce 处理所有记录后 如何从计数器获取值 或者我在这里错过了计数器的用例 示例代码来自http code google com p appengine mapreduce wiki Us
  • array_reduce() 不能用作 PHP 的关联数组“reducer”?

    我有一个关联数组 assoc 并且需要将其简化为字符串 在这种情况下 OUT
  • 在 Hadoop MapReduce 中解析 PDF 文件

    我必须在 Hadoop 的 MapReduce 程序中解析 HDFS 中的 PDF 文件 所以我从 HDFS 获取 PDF 文件为输入分割它必须被解析并发送到 Mapper 类 为了实现这个输入格式我已经经历过这个link http cod
  • Spark 无法再执行作业。执行器创建目录失败

    我们已经有一个小型 Spark 集群运行了一个月 它已经成功执行了作业 或者让我为该集群启动一个 Spark shell 无论我向集群提交作业还是使用 shell 连接到集群 错误总是相同的 root SPARK HOME bin spar
  • 两个相等的组合键不会到达同一个减速器

    我正在使用 MapReduce 框架用 Java 制作 Hadoop 应用程序 我仅使用文本键和值进行输入和输出 在减少最终输出之前 我使用组合器进行额外的计算步骤 但我有一个问题 钥匙没有进入同一个减速器 我在组合器中创建并添加键 值对
  • Hadoop 性能

    我安装了hadoop 1 0 0并尝试了字数统计示例 单节点集群 完成时间为 2 分 48 秒 然后我尝试了标准的 Linux 字数统计程序 该程序在同一组 180 kB 数据 上运行只需 10 毫秒 是我做错了什么 还是 Hadoop 非
  • 如何处理 YARN MapReduce 作业的容器故障?

    YARN 中如何处理软件 硬件故障 具体来说 如果容器发生故障 崩溃 会发生什么 容器和任务失败由节点管理器处理 当容器失败或死亡时 节点管理器会检测到失败事件并启动一个新容器来替换失败的容器并在新容器中重新启动任务执行 如果应用程序主机发
  • hadoop map reduce 中的错误处理

    根据文档 有几种方法可以在 MapReduce 中执行错误处理 以下是一些 A 使用枚举的自定义计数器 每个失败记录的增量 b 记录错误并稍后分析 计数器给出失败记录的数量 然而 为了获取失败记录的标识符 可能是其唯一键 以及发生异常的详细
  • MapReduce 排序和洗牌如何工作?

    我正在使用 yelps MRJob 库来实现映射缩减功能 我知道 MapReduce 有一个内部排序和洗牌算法 它根据键对值进行排序 所以如果我在地图阶段后得到以下结果 1 24 4 25 3 26 我知道排序和洗牌阶段将产生以下输出 1
  • 如何使用新的 Hadoop API 来使用 MultipleTextOutputFormat?

    我想编写多个输出文件 如何使用 Job 而不是 JobConf 来执行此操作 创建基于密钥的输出文件名的简单方法 input data type key value cupertino apple sunnyvale banana cupe
  • 遍历 ArrayWritable - NoSuchMethodException

    我刚刚开始使用 MapReduce 并且遇到了一个奇怪的错误 我无法通过 Google 回答该错误 我正在使用 ArrayWritable 制作一个基本程序 但是当我运行它时 在Reduce过程中出现以下错误 java lang Runti

随机推荐

  • 结构程序设计的经典定义

    结构程序设计的经典定义如下所述 xff1a 如果一个程序的 代码块仅仅通过顺序 选择和循环这3种基本控制结构 进行连接 xff0c 并且每个代码块只有一个入口和一个出口 xff0c 则称这个程序是结构化的 如果只允许使用顺序 IF THEN
  • 总体设计启发性规则7条

    nbsp 启发性规则 7条 1 改进软件结构提高模块独立性 通过模块分解或合并 降低耦 合提高内聚 2 模块规模应该适中 过大的模块往往是由于分解不充分 过小 的模块将导致模块数目过多将使系统接口复杂 3 深度 宽度 扇出和扇入都应适当 深
  • SqlServer调用webapi和webService接口

    1 通过http协议post调用webapi接口 xff08 json数据格式 xff09 declare 64 ServiceUrl as varchar 1000 set 64 ServiceUrl 61 39 http 127 0 0
  • C语言和C++的区别是什么?8个点通俗易懂的告诉你

    有些初学的同学傻傻分不清其中的区别 xff0c 下面我将详细的讲解C语言和C 43 43 的区别点 帮助大家尽快的理解 1 关键字 蓝色标注为C语言关键字 xff0c C 43 43 继承了C语言的所有关键字 xff0c 以下红色标注为C
  • money 最小花费(spfa)

    问题描述 在n个人中 xff0c 某些人的银行账号之间可以互相转账 这些人之间转账的手续费各不相同 给定这些人之间转账时需要从转账金额里扣除百分之几的手续费 xff0c 请问A最少需要多少钱使得转账后B收到100元 输入格式 第一行输入两个
  • cpu优化-cpu亲和性

    cpu亲和性 taskset命令可以将进程绑核 格式为taskset p c cpu list pid xff0c 其中cpu list是数字化的cpu列表 xff0c 从0开始 多个不连续的cpu可用逗号连接 xff0c 连续的可用 连接
  • Homebrew安装慢,总是失败如何解决

    前言 如果使用Homebrew xff08 https brew sh xff09 官网提供的命令来进行下载的 xff0c 是从github上进行下载 xff0c 没有翻墙工具的话会比较慢 xff0c 甚至可能会下载失败 xff0c 所以我
  • 开源飞控APM与PIXHAWK

    一 APM 官网地址 xff1a http ardupilot org APM xff08 ArduPilotMega xff09 是在2007年由DIY无人机社区 xff08 DIY Drones xff09 推出的飞控产品 xff0c
  • Pixhawk解锁常见错误

    第一次解锁 xff0c 接上MP看着HUD的提示 xff0c 即飞行数据的界面 xff1a 一般的不成功解锁有以下的原因 xff08 网络整理 xff09 xff08 1 xff09 HUD显示 RC not calibrated xff1
  • 学习MySQL——单表查询

    文章目录 一 SQL语言规范二 基本的SELECT语句1 列的别名2 去除重复行3 空值参与运算4 着重号 96 96 5 显示表结构 三 运算符 比较运算符1 等号运算符 xff08 61 xff09 2 不等于运算符 xff08 lt
  • FreeRTOS-启动第一个任务

    FreeRTOS开始第一个任务源码分析 vTaskStartScheduler xff1a 1 创建一个空任务 xff1a 优先级为0 2 是否使用软件定时器 是的话 创建软件定时器 3 关闭中断 xff08 关中断操作的寄存器是BASEP
  • 我的2014个人总结——学习篇、工作篇、生活篇

    2013的个人总结在我印象当中是写过的 xff0c 2014已成为过去 xff0c 当我想回过头来看看我2013年的总结时 xff0c 奈何我已不知它的踪迹了 xff0c 所以决定以后的个人总结还是以博客的形式记录吧 xff01 平静下来
  • ubuntu vnc 已经配置好,一键开启,节省大家时间

    1 粘贴复制 自己找个目录复制过去 2 修改权限 sudo chmod 777 x0vncserver 3 开启 x0vncserver rfbport 61 5900 SecurityTypes 61 None 下载地址 xff1a ht
  • ArcGIS Engine许可突然用不了了或者localhost没有有效的许可管理器

    ArcGIS Engine许可突然用不了了或者localhost没有有效的许可管理器 在Arc Engine安装包中再重新安装一次许可管理
  • ZeroMQ学习 (五)发布-订阅模式

    7 发布 订阅模式 发布者不用管是否有订阅者 xff0c 它只管不停的发布 xff0c 也不用接受客户端的请求 多订阅者可以尝试链接发布者 xff0c 来接受信息 xff0c 但是不能往发布者发送请求 发布者源码 xff1a span st
  • 树莓派3B+安装Ubuntu20.04

    ros2已经出到F版本了 xff0c 本来想下载一个尝鲜一下 xff0c 怒肝了两天终于把Ubuntu20 04装到树莓派里面了 但是 xff0c 忽然发现F版本的还未发行 xff0c 只是在网站上更新了安装方法 xff0c 安装包和公钥都
  • 【SpringBoot】SpringBoot+SpringSecurity+CAS实现单点登录

    文章目录 一 CAS的概述1 SSO2 CAS3 概念 二 CAS的流程三 CAS服务端部署1 下载地址2 源码打包3 部署运行4 java io FileNotFoundException etc cas thekeystore 系统找不
  • Google Scholar 谷歌学术文献检索技巧总结

    原文链接 xff1a https zhuanlan zhihu com p 24369927 身边有朋友想学如何使用谷歌学术 xff0c 为了更广泛的传播和重复查阅 xff0c 故将个人了解到的谷歌学术检索文献的技巧总结在此 当然 xff0
  • kubernetes flannel pod CrashLoopBackoff解决

    背景 某环境客户部署了一个kubernetes集群 xff0c 发现flannel的pod一直重启 xff0c 始终处于CrashLoopBackOff状态 排查 对于始终CrashLoopBackOff的pod xff0c 一般是应用本身
  • MapReduce之二次排序

    目录 应用场景 什么是二次排序 怎样实现二次排序 示例代码 应用场景 假如输入文件内容如下 xff1a a 1 z 3 b 2 a 100 a 3 b 1 要求经过MapReduce处理后 xff0c key升序排列 xff0c 相同key