MapReduce中的二次排序

2023-05-16

在MapReduce操作时,我们知道传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。

我们先看一下Mapper任务的数据处理过程吧,见下图。

<a href="http://www.superwu.cn/wp-content/uploads/2013/08/image11.png" class="cboxElement" rel="example4" 492"="" style="text-decoration: none; color: rgb(1, 150, 227);">image

在图中,数据处理分为四个阶段:

(1)Mapper任务会接收输入分片,然后不断的调用map函数,对记录进行处理。处理完毕后,转换为新的<key,value>输出。

(2)对map函数输出的<key, value>调用分区函数,对数据进行分区。不同分区的数据会被送到不同的Reducer任务中。

(3)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。

(4)对于排序后的<key,value>,会按照key进行分组。如果key相同,那么相同key的<key,value>就被分到一个组中。最终,每个分组会调用一次reduce函数。

(5)排序、分组后的数据会被送到Reducer节点。

在MapReduce的体系结构中,我们没有看到对value的排序操作。怎么实现对value的排序哪?这就需要我们变通的去实现这个需求。

变通手段:我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。

下面看个例子,结合着理解。

假设有以下输入数据,这是两列整数,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。


20    21
50    51
50    52
50    53
50    54
60    51
60    53
60    52
60    56
60    57
70    58
60    61
70    54
70    55
70    56
70    57
70    58  

分析一下, 这是一个典型的二次排序问题。

我们先对现在第一列和第二列整数创建一个新的类,作为newkey,代码如下



/**
 * 把第一列整数和第二列作为类的属性,并且实现WritableComparable接口
 */
public static class IntPair implements WritableComparable<IntPair> {
  private int first = 0;
  private int second = 0;

  public void set(int left, int right) {
    first = left;
    second = right;
  }
  public int getFirst() {
    return first;
  }
  public int getSecond() {
    return second;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first = in.readInt();
    second = in.readInt();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeInt(first);
    out.writeInt(second);
  }
  @Override
  public int hashCode() {
    return first+"".hashCode() + second+"".hashCode();
  }
  @Override
  public boolean equals(Object right) {
    if (right instanceof IntPair) {
      IntPair r = (IntPair) right;
      return r.first == first && r.second == second;
    } else {
      return false;
    }
  }
  //这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
  @Override
  public int compareTo(IntPair o) {
    if (first != o.first) {
      return first - o.first;
    } else if (second != o.second) {
      return second - o.second;
    } else {
      return 0;
    }
  }
}  

一定要注意上面的compareTo方法,先按照first比较,再按照second比较。在以后调用的时候,key就是first,value就是second。

下面看一下分组比较函数,代码如下



/**
 * 在分组比较的时候,只比较原来的key,而不是组合key。
 */
public static class GroupingComparator implements RawComparator<IntPair> {
  @Override
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);
  }

  @Override
  public int compare(IntPair o1, IntPair o2) {
    int first1 = o1.getFirst();
    int first2 = o2.getFirst();
    return first1 - first2;
  }
}  

一定要注意上面代码中,虽然泛型是IntPair,但是比较的始终是第一个字段,而不是所有的字段。因为要按照原有的key进行分组啊。

如果以上的代码明白,再看一下自定义的Mapper类和Reducer类吧



public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> {

  private final IntPair key = new IntPair();
  private final IntWritable value = new IntWritable();

  @Override
  public void map(LongWritable inKey, Text inValue, 
                  Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(inValue.toString());
    int left = 0;
    int right = 0;
    if (itr.hasMoreTokens()) {
      left = Integer.parseInt(itr.nextToken());
      if (itr.hasMoreTokens()) {
        right = Integer.parseInt(itr.nextToken());
      }
      key.set(left, right);
      value.set(right);
      context.write(key, value);
    }
  }
}

public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
  private static final Text SEPARATOR = new Text("------------------------------------------------");
  private final Text first = new Text();

  @Override
  public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    context.write(SEPARATOR, null);
    first.set(Integer.toString(key.getFirst()));
    for(IntWritable value: values) {
      context.write(first, value);
    }
  }
}  

在map函数中,要注意k2是由哪几个字段组成的;在reduce函数中,要注意输出的k3是IntPair中的第一个字段,而不是所有字段。

好了,看一下驱动代码吧,如下



public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();

  final FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop2:9000"), conf);
  fileSystem.delete(new Path(OUTPUT_PATH), true);

  Job job = new Job(conf, "secondary sort");
  job.setJarByClass(SecondarySortApp.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  job.setGroupingComparatorClass(GroupingComparator.class);

  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
  FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}  

以上驱动代码中,重大变化是设置了分组比较函数。好了,看看执行结果吧


------------------------------------------------
20    21
------------------------------------------------
50    51
50    52
50    53
50    54
------------------------------------------------
60    51
60    52
60    53
60    56
60    57
60    61
------------------------------------------------
70    54
70    55
70    56
70    57
70    58
70    58  

看看,是不是我们想要的结果啊!!

如果读者能够看明白,那么我出个思考题:在以上例子中,按照第一列升序,第二列倒序输出

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce中的二次排序 的相关文章

  • MapReduce 上的Reduce 函数显示不正确的结果——为什么?

    我有一个数据结构来跟踪不同城市的人们 in db persons name John city Seattle name Bill city Portland 我想运行一个地图缩减来获取每个城市有多少人的列表 所以结果将如下所示 id Se
  • Hadoop中数据是如何分割的

    Hadoop是否根据程序中设置的mapper数量来分割数据 也就是说 有一个大小为 500MB 的数据集 如果 Mapper 的数量为 200 个 假设 Hadoop 集群允许同时存在 200 个 Mapper 那么每个 Mapper 是否
  • Hadoop 中的分割大小与块大小

    Hadoop 中的分割大小和块大小之间有什么关系 当我读到this http willddy github io 2012 08 25 Hadoop Split and Block html 分割大小必须是块大小的n倍 n是整数并且n gt
  • Hadoop 减少多种输入格式

    我在 HDFS 中有两个数据格式不同的文件 如果我需要减少两个数据文件 那么作业设置会是什么样子 例如想象一下常见的字数统计问题 在一个文件中使用空格作为世界分隔符 在另一个文件中使用下划线 在我的方法中 我需要针对各种文件格式使用不同的映
  • 我的 cdh5.2 集群在运行 hbase MR 作业时出现 FileNotFoundException

    我的 cdh5 2 集群运行 hbase MR 作业时出现问题 例如 我将 hbase 类路径添加到 hadoop 类路径中 vi etc hadoop conf hadoop env sh 添加行 export HADOOP CLASSP
  • security.UserGroupInformation:MR 的 PrivilegedgedActionException 错误

    每当我尝试执行映射缩减作业以写入 Hbase 表时 我都会在控制台中收到以下错误 我正在从用户帐户运行 MR 作业 错误 security UserGroupInformation PriviledgedActionException 为
  • couchdb 视图使用另一个视图?

    我对 couchdb 中的视图有疑问 目前 我有许多视图 例如 view A view B view Z 对于每个视图 它们包含相同范围的键但具有不同的值 IE view A key key 1 value 10 key key 2 val
  • 在mongo中执行优先级查询

    样本文件 name John age 35 address join month 3 的员工优先级为 1 地址包含字符串 Avenue 的员工优先级为 2 地址包含字符串 Street 的员工优先级为 3 地址包含字符串 Road 的员工优
  • 线程“主”java.lang.VerifyError 中出现异常:操作数堆栈上的类型错误

    在给定 input txt 文件中查找最大温度的 Map Reduce 程序中发生了此错误 我写了两栏 分别是年份和温度 Exception in thread main java lang VerifyError Bad type on
  • MongoDB 从两个数组计算值、排序和限制

    我有一个存储浮点数组的 MongoDB 数据库 假设以下格式的文档集合 id 0 vals 0 8 0 2 0 5 有一个查询数组 例如 带有值 0 1 0 3 0 4 我想计算集合中所有元素的距离 例如 差异之和 对于给定的文档和查询 它
  • JA017:无法查找已启动的 hadoop 作业 ID

    当我在Hue的Oozie编辑器中提交mapreduce作业时 如何解决这个问题 JA017 无法查找与操作 0000009 150711083342968 oozie root W mapreduce f660 关联的已启动 hadoop
  • Hadoop:Reducer 将 Mapper 输出写入输出文件

    我遇到了一个非常非常奇怪的问题 减速器确实可以工作 但是如果我检查输出文件 我只找到了映射器的输出 当我尝试调试时 在将映射器的输出值类型从 Longwritable 更改为 Text 后 我 发现字数示例存在相同的问题 package o
  • 使用 CouchDB 视图替换 SQL 中的多个联接

    我正在为我的应用程序实现过滤功能 但在 CouchDB 上编写视图时遇到问题 在 SQL 中 这将是一个具有多个连接的语句 如何替换 CouchDB 中的多重连接 本文涵盖单连接 http www cmlenz net archives 2
  • Spark 无法再执行作业。执行器创建目录失败

    我们已经有一个小型 Spark 集群运行了一个月 它已经成功执行了作业 或者让我为该集群启动一个 Spark shell 无论我向集群提交作业还是使用 shell 连接到集群 错误总是相同的 root SPARK HOME bin spar
  • CouchDB“加入”两个文档

    我有两个看起来有点像这样的文档 Doc id AAA creator id data DataKey id credits left 500 times used 0 data id AAA 我想要做的是创建一个视图 它允许我传递 Data
  • Hadoop YARN 作业陷入映射 0% 并减少 0%

    我正在尝试运行一个非常简单的作业来测试我的 hadoop 设置 所以我尝试使用 Word Count Example 它陷入了 0 所以我尝试了一些其他简单的作业 并且每个作业都陷入了困境 52191 0003 14 07 14 23 55
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • mongodb - 检索数组子集

    看似简单的任务对我来说是一个挑战 我有以下 mongodb 结构 services TCP80 data status 1 delay 3 87 ts 1308056460 status 1 delay 2 83 ts 1308058080
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m

随机推荐

  • sftp文件上传功能实现

    参考博客 https blog csdn net u011937566 article details 81666347 方式一 使用jsch 0 1 53 jar 0 gt 添加jsch 0 1 52 jar依赖 1 gt 创建JSch对
  • 安装IDEA出现Missing essential plugins: com.intellij (platform prefix: null)如何解决

    这里写自定义目录标题 这是一个重装IDEA新版本引发的悲剧 这是一个重装IDEA新版本引发的悲剧 如果你在重装IDEA后打不开出现以下报错 com intellij ide plugins PluginManagerCore Essenti
  • Android在getString()中添加参数

    转载 xff1a http blog chinaunix net uid 20771867 id 2990700 html 转载只是给自己留一个笔记 xff0c 向原作者致敬 Android中String一般都是定义在res string
  • ftp上传,下载,删除文件

    ftp上传 xff0c 下载 xff0c 删除文件 直接看最下面的main 方法中的代码 xff0c 复制全部代码 xff0c 输入自己的ftp路径和用户信息 package com sinosoft lis ybt bl import i
  • powershell 压缩和解压zip

    项目场景 xff1a 前端项目发布到windows环境需要需要先压缩传输后再解压 问题描述 简单的压缩和解压zip在windows下 xff0c 视窗情况下 xff0c 右键就可以实现 xff0c 但是如果是在命令下 xff0c windo
  • vscode 搜索插件报 提取扩展时出错。XHR failed

    项目场景 xff1a 有一段时间没有打开vscode的插件市场了 问题描述 今天打开vscode插件管理 xff0c 搜索插件 xff0c 报了一个错误 提取扩展时出错 XHR failed xff0c 一时看不出错误原因 原因分析 xff
  • mybatis报“Invalid value for getInt()“

    使用mybatis遇到一个非常奇葩的问题 xff0c 错误如下 xff1a Cause org apache ibatis executor result ResultMapException Error attempting to get
  • 5 essential skills every Web Developer should have?

    The idea here is that most of us should already know most of what is on this list But there just might be one or two ite
  • slf4j下log.info()无法输出到控制台&重复打印

    在logback xml中添加如下 lt logger name 61 34 你要在哪个类或者包下使用log的全限定名 34 level 61 34 日志输出等级 这里要用log info 所以级别是INFO 34 additivity 6
  • 在php中使用redis cluster 集群

    目前我们用到的 php 的 redis 扩展 主要有2个 xff0c 第一个是最常用的 phpredis 它是用c写的php的高效扩展 xff1a https github com phpredis phpredis xff0c 还有1个是
  • csdn markdown帮助文档

    欢迎使用Markdown编辑器写博客 本Markdown编辑器使用 StackEdit 6 修改而来 xff0c 用它写博客 xff0c 将会带来全新的体验哦 xff1a Markdown和扩展Markdown简洁的语法 代码块高亮 图片链
  • Springboot+Thymeleaf配置与使用

    Springboot 43 Thymeleaf配置与使用 前言 Springboot默认是不支持JSP的 xff0c 默认使用thymeleaf模板引擎 所以这里介绍一下springboot使用Thymeleaf的实例以及遇到的问题 配置与
  • git 解决pull origin 错误 error: The following untracked working tree files would be overwritten by merge

    error The following untracked working tree files would be overwritten by merge bin AndroidManifest xml Please move or re
  • SpringBootTest单元测试组件

    SpringBootTest单元测试组件 一 SpringbootTest 使用Junit4开发 1 添加依赖 span class token tag span class token tag span class token punct
  • ICE C++ Hello World

    ICE C 43 43 Hello World实例教程 1 概述 本文演示了如何编写一个最简单的C 43 43 ICE Internet Communications Engine 应用程序 xff0c 包括必要环境的安装 该应用程序包含客
  • 华为工作的感悟

    参考 xff1a http www openlab net cn forums thread 1002986 1 p10035795 北邮北 xff0c 清华硕 xff0c 一年两个月的华为生活总结 xff0c 算了 xff0c 贴出来了
  • MRCP 媒体资源控制协议

    媒体资源控制协议 xff08 Media Resource Control Protocol MRCP xff09 是一种通讯协议 xff0c 用于语音服务器向客户端提供各种语音服务 如语音识别和语音合成 MRCP并不定义会话连接 xff0
  • Hadoop中VIntWritable编码方式解析

    最近因为实验室的云计算项目 xff0c 开始学习Hadoop xff0c 有时间就记录一下自己在学习过程中的一些小收获吧 Hadoop权威指南 在序列化这一节有个例子程序 xff0c 叫做TextPair xff0c 代码略长 xff0c
  • 测试分析报告

    测试分析报告 1 引言 1 1 1 编写目的 1 1 2 背景 1 1 3 定义 2 1 4 参考资料 2 2 测试概要 2 3 测试结果及发现 3 3 1 测试 1 xff08 normal xff09 3 3 2 测试 2 xff08
  • MapReduce中的二次排序

    在MapReduce操作时 xff0c 我们知道传递的 lt key value gt 会按照key的大小进行排序 xff0c 最后输出的结果是按照key排过序的 有的时候我们在key排序的基础上 xff0c 对value也进行排序 这种需