MapReduce中的二次排序

2023-05-16

在MapReduce操作时,我们知道传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。

我们先看一下Mapper任务的数据处理过程吧,见下图。

<a href="http://www.superwu.cn/wp-content/uploads/2013/08/image11.png" class="cboxElement" rel="example4" 492"="" style="text-decoration: none; color: rgb(1, 150, 227);">image

在图中,数据处理分为四个阶段:

(1)Mapper任务会接收输入分片,然后不断的调用map函数,对记录进行处理。处理完毕后,转换为新的<key,value>输出。

(2)对map函数输出的<key, value>调用分区函数,对数据进行分区。不同分区的数据会被送到不同的Reducer任务中。

(3)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。

(4)对于排序后的<key,value>,会按照key进行分组。如果key相同,那么相同key的<key,value>就被分到一个组中。最终,每个分组会调用一次reduce函数。

(5)排序、分组后的数据会被送到Reducer节点。

在MapReduce的体系结构中,我们没有看到对value的排序操作。怎么实现对value的排序哪?这就需要我们变通的去实现这个需求。

变通手段:我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。

下面看个例子,结合着理解。

假设有以下输入数据,这是两列整数,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。


20    21
50    51
50    52
50    53
50    54
60    51
60    53
60    52
60    56
60    57
70    58
60    61
70    54
70    55
70    56
70    57
70    58  

分析一下, 这是一个典型的二次排序问题。

我们先对现在第一列和第二列整数创建一个新的类,作为newkey,代码如下



/**
 * 把第一列整数和第二列作为类的属性,并且实现WritableComparable接口
 */
public static class IntPair implements WritableComparable<IntPair> {
  private int first = 0;
  private int second = 0;

  public void set(int left, int right) {
    first = left;
    second = right;
  }
  public int getFirst() {
    return first;
  }
  public int getSecond() {
    return second;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first = in.readInt();
    second = in.readInt();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeInt(first);
    out.writeInt(second);
  }
  @Override
  public int hashCode() {
    return first+"".hashCode() + second+"".hashCode();
  }
  @Override
  public boolean equals(Object right) {
    if (right instanceof IntPair) {
      IntPair r = (IntPair) right;
      return r.first == first && r.second == second;
    } else {
      return false;
    }
  }
  //这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
  @Override
  public int compareTo(IntPair o) {
    if (first != o.first) {
      return first - o.first;
    } else if (second != o.second) {
      return second - o.second;
    } else {
      return 0;
    }
  }
}  

一定要注意上面的compareTo方法,先按照first比较,再按照second比较。在以后调用的时候,key就是first,value就是second。

下面看一下分组比较函数,代码如下



/**
 * 在分组比较的时候,只比较原来的key,而不是组合key。
 */
public static class GroupingComparator implements RawComparator<IntPair> {
  @Override
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);
  }

  @Override
  public int compare(IntPair o1, IntPair o2) {
    int first1 = o1.getFirst();
    int first2 = o2.getFirst();
    return first1 - first2;
  }
}  

一定要注意上面代码中,虽然泛型是IntPair,但是比较的始终是第一个字段,而不是所有的字段。因为要按照原有的key进行分组啊。

如果以上的代码明白,再看一下自定义的Mapper类和Reducer类吧



public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> {

  private final IntPair key = new IntPair();
  private final IntWritable value = new IntWritable();

  @Override
  public void map(LongWritable inKey, Text inValue, 
                  Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(inValue.toString());
    int left = 0;
    int right = 0;
    if (itr.hasMoreTokens()) {
      left = Integer.parseInt(itr.nextToken());
      if (itr.hasMoreTokens()) {
        right = Integer.parseInt(itr.nextToken());
      }
      key.set(left, right);
      value.set(right);
      context.write(key, value);
    }
  }
}

public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
  private static final Text SEPARATOR = new Text("------------------------------------------------");
  private final Text first = new Text();

  @Override
  public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    context.write(SEPARATOR, null);
    first.set(Integer.toString(key.getFirst()));
    for(IntWritable value: values) {
      context.write(first, value);
    }
  }
}  

在map函数中,要注意k2是由哪几个字段组成的;在reduce函数中,要注意输出的k3是IntPair中的第一个字段,而不是所有字段。

好了,看一下驱动代码吧,如下



public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();

  final FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop2:9000"), conf);
  fileSystem.delete(new Path(OUTPUT_PATH), true);

  Job job = new Job(conf, "secondary sort");
  job.setJarByClass(SecondarySortApp.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  job.setGroupingComparatorClass(GroupingComparator.class);

  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
  FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}  

以上驱动代码中,重大变化是设置了分组比较函数。好了,看看执行结果吧


------------------------------------------------
20    21
------------------------------------------------
50    51
50    52
50    53
50    54
------------------------------------------------
60    51
60    52
60    53
60    56
60    57
60    61
------------------------------------------------
70    54
70    55
70    56
70    57
70    58
70    58  

看看,是不是我们想要的结果啊!!

如果读者能够看明白,那么我出个思考题:在以上例子中,按照第一列升序,第二列倒序输出

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce中的二次排序 的相关文章

随机推荐

  • int 类型究竟多少字节?

    今天发现NEON技术中 int类型的字节数是2 xff0c 感觉很奇怪 xff0c 最早写51单片机时也是2 xff0c 后来到了观念转变成了4 xff0c 现在有遇到了2 一 转自 http www tuicool com article
  • python实现K均值聚类算法

    之前做大作业的时候本来想用聚类法给点集分类的 xff0c 但是太复杂了 xff0c 于是最后没有采用这个方案 现在把之前做的一些工作整理出来写个小博客 K means聚类法原理 xff1a 聚类是一个将数据集中在某些方面相似的数据成员进行分
  • 复合型自适应步长的Gauss型求积(附代码)

    复合型自适应步长的Gauss型求积 先前在做数值分析实验时 xff0c 把高斯型求积公式和复合型 自适应步长的求积融合到了一起 xff0c 但是后来发现题目没有这个要求 现在就把这个思路分享一下 上题目 xff1a 实验目的 xff1a 学
  • pid摄像头循迹(opencv和openmv)

    pid摄像头循迹 xff08 opencv和openmv xff09 用摄像头进行循迹的方法参考硬件选型方面软件思路一 图像预处理 xff1a 代码部分二 线性拟合opencv线性拟合 xff1a 实际在树莓派上运行时 xff0c 帧率也比
  • 通过云端自动生成openmv的神经网络模型,进行目标检测

    通过云端自动生成openmv的神经网络模型 xff0c 进行目标检测 OpenMV训练神经网络模型 xff08 目标识别 xff09 一 准备材料 xff1a 二 软件下载三 准备数据集 xff1a 四 数据集的上传与训练 OpenMV训练
  • opencv学习(9):cv::Scalar、cv::Mat::zeros

    1 cv Scalar cv Scalar是opencv的一个结构体 xff0c 其定义如下 xff1a xff08 c 43 43 中的结构体如下 xff0c 可以存放1 4个数值 xff09 various constructors S
  • 德国大陆ARS408系列毫米波雷达数据解析

    本人已完成对该型号系列毫米波雷达的解析工作 xff0c 有需求请私信联系
  • nmap使用详解

    nmap介绍 nmap xff08 Network Mapper xff09 是一款开源免费的针对大型网络的端口扫描工具 xff0c nmap可以检测目标主机是否在线 主机端口开放情况 检测主机运行的服务类型及版本信息 检测操作系统与设备类
  • OLED屏幕花屏的原因(I2C+DMA)

    OLED屏幕在通电后花屏 xff0c 呈雪花状 在网上查询原因 xff0c 开始了尝试 xff1a 1 可能是由于杜邦线的问题 xff0c 可能接触不良导致 xff0c 但更换了杜邦线依然花屏 2 可能是OLED屏幕问题 因为经常在工作 x
  • 竞赛保研(自动化专业)

    一 感谢 从大三的五月份一直到9 28号推免结束 xff0c 最终也是保研到了梦校 xff0c 还好我坚持到了最后一刻 感谢父母 xff0c 感谢远方的她 xff0c 感谢老师 xff0c 感谢实验室的平台 xff0c 也要感谢每一位一起拼
  • 随机森林的简单学习记录

    随机森林小记 这里采用的随机森林的库选择sklearn库 1 首先是导入数据 xff1a path span class token operator 61 span span class token string 34 D Epilept
  • Linux的c++环境配置与cmake的使用

    Ubuntu18 04安装 虚拟机安装 虚拟机软件版本 xff1a VMware Workstation 16 Pro 版本号 xff1a Ubuntu18 04 安装参考 xff1a http t csdn cn P71XR 虚拟机分辨率
  • 用KDevelop来编辑与编译ROS文件

    新建一个ROS工程 xff1a 首先在工作目录下打开终端 xff0c 创建一个src目录 xff0c 放置源代码 xff08 系统要求 xff09 xff0c 并将当前目录切换到src目录中 xff1a mkdir src cd src s
  • 自制三维激光扫描建模

    看图片就是我做的东西 xff0c 很炫酷是不是 好吧 xff0c 开玩笑 xff0c 这是电影普罗米修斯的截图 当初看这个电影的时候就感觉这东西好眩酷 xff0c 我能不能做出来 最近借着帮做毕业设计的机会我也做了一个 就是这个丑丑的东西啦
  • ICE C++ Hello World

    ICE C 43 43 Hello World实例教程 1 概述 本文演示了如何编写一个最简单的C 43 43 ICE Internet Communications Engine 应用程序 xff0c 包括必要环境的安装 该应用程序包含客
  • 华为工作的感悟

    参考 xff1a http www openlab net cn forums thread 1002986 1 p10035795 北邮北 xff0c 清华硕 xff0c 一年两个月的华为生活总结 xff0c 算了 xff0c 贴出来了
  • MRCP 媒体资源控制协议

    媒体资源控制协议 xff08 Media Resource Control Protocol MRCP xff09 是一种通讯协议 xff0c 用于语音服务器向客户端提供各种语音服务 如语音识别和语音合成 MRCP并不定义会话连接 xff0
  • matlab(1):使用matlab处理excel数据进行画图

    目录 0 说明 1 直接使用xlsread读取出错 2 解决办法 3 绘图 0 说明 Excel数据示例 xff08 number filter radius 0 8 3 csv xff09 xff0c 一共99行数据 xff08 4列 x
  • Hadoop中VIntWritable编码方式解析

    最近因为实验室的云计算项目 xff0c 开始学习Hadoop xff0c 有时间就记录一下自己在学习过程中的一些小收获吧 Hadoop权威指南 在序列化这一节有个例子程序 xff0c 叫做TextPair xff0c 代码略长 xff0c
  • MapReduce中的二次排序

    在MapReduce操作时 xff0c 我们知道传递的 lt key value gt 会按照key的大小进行排序 xff0c 最后输出的结果是按照key排过序的 有的时候我们在key排序的基础上 xff0c 对value也进行排序 这种需