利用MapReduce进行二次排序--附例子

2023-05-16

首先先来明确几个概念:
 1.分区-partition
                1)分区(partition):
                             默认采取散列值进行分区,但此方法容易造成 “ 数据倾斜 ” (大部分数据分到同一个reducer中,影响运行效率);
                      所以需要自定义partition;
               2)分区概念:***  指定key/value被分配到哪个reducer上
                            哪个key到哪个Reducer的分配过程,是由Partitioner规定的;
                          (重写:getPartition(Text key, Text value, int numPartitions))
               3)如何自定义partition??
                         只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的                                         setPartitionerClass  指 定一下即可。

                4)系统默认的分区partition
                              系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样就保证                       如果有相同的key值,肯定被分配到同一个reducre上 
                5)执行过程
                                 Map的结果,会通过partition分发到Reducer上。如果设置了Combiner,Map的结果会先送到Combiner进行合并,再                    partition,再将合并后数据发送给Reducer。

  2.分组grouping
                 1)概念:
                               主要定义哪些key可以放置在一组;
                 2)自定义分组排序
                                  定义实现一个WritableComparator,重写compare(),  设置比较策略;
                      还需要声明:自定义分组的类
                                  job.setGroupingComparatorClass(SencondarySortGroupComparator.class);//自定义分组
                 3)分组之后的组内排序--(实现优化)
                             也就是自定义RawComparator类,系统默认;
                 4)  如何自定义组内的排序呢?如下:
                                    继承WritableComparator,重写compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法;
                      还需要声明:
                                    job.setSortComparatorClass(SencondarySortComparator.class);//自定义组内排序

  先编写一个案例,加深二次排序的映像:
                 所谓二次排序,对第1个字段相同的数据,使用第2个字段进行排序。
    举个例子,电商平台记录了每一用户的每一笔订单的订单金额,现在要求属于同一个用户的所有订单金额作排序,
 并且输出的用户名也 要排序。
                账户(account)        订单金额(Cost)
                  hadoop@apache         200
                  hive@apache              550
                  yarn@apache              580
                  hive@apache              159
                 hadoop@apache          300
                 hive@apache               258
                 hadoop@apache          300
          
    二次排序后的结果如下:
            账户(account)        订单金额(Cost)
            hadoop@apache          200
            hadoop@apache          300
            hadoop@apache          300
            hive@apache               159
            hive@apache               258
            hive@apache               550
            yarn@apache               580
代码部分:
   a.实现自定义Writable类
 

public class AccountBean  implements WritableComparable<AccountBean>{
	private Text accout;
	private IntWritable cost;
	public AccountBean() {
		setAccout(new Text());
		setCost(new IntWritable());
	}
	public AccountBean(Text accout, IntWritable cost) {
		this.accout = accout;
		this.cost = cost;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		accout.write(out);
		cost.write(out);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		accout.readFields(in);
		cost.readFields(in);
	}
	@Override
	public int compareTo(AccountBean o) {
		int tmp = accout.compareTo(o.accout);
		if(tmp ==0){
			return cost.compareTo(o.cost);
		}
		return tmp;
	}
	public Text getAccout() {
		return accout;
	}

	public void setAccout(Text accout) {
		this.accout = accout;
	}

	public IntWritable getCost() {
		return cost;
	}

	public void setCost(IntWritable cost) {
		this.cost = cost;
	} 
    @Override
    public String toString() {
	return accout + "\t" + cost;
    }
}

b.自定义partition:按account进行分区:--根据key或value及reduce的数量来决定当前的
                                                                    这对输出数据最终应该交由哪个reduce task处理

  public class SencondarySortPartition extends Partitioner<AccountBean, NullWritable> {
            @Override
            public int getPartition(AccountBean key, NullWritable value,int numPartitions) {
                return (key.getAccout().hashCode() & Integer.MAX_VALUE) % numPartitions;
            }
  }          

c.自定义分组比较器:按account进行分组:--key相同的在一个组内;最后执行是组的并行性

public class SencondarySortGroupComparator extends WritableComparator {
			public SencondarySortGroupComparator() {
				super(AccountBean.class,true);
			}
			
			@Override
			public int compare(WritableComparable a, WritableComparable b) {
				AccountBean acc1 = (AccountBean)a;
				AccountBean acc2 = (AccountBean)b;
				return acc1.getAccout().compareTo(acc2.getAccout());//账号相同的在一个组
			}
	}

d.自定义RawComparator类:--主要是实现在组内的排序(有利于优化),可省略!!!

public class SencondarySortComparator extends WritableComparator {
			private static final IntWritable.Comparator INTWRITABLE_COMPARATOR = new IntWritable.Comparator();

			public SencondarySortComparator() {
				super(AccountBean.class);
			}
			@Override
			public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
				try {
					int firstL1 = WritableUtils.decodeVIntSize(b1[s1])+ readVInt(b1, s1);
					int firstL2 = WritableUtils.decodeVIntSize(b2[s2])+ readVInt(b2, s2);
					int cmp = INTWRITABLE_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
					if (cmp != 0) {
						return cmp;
					}
					return INTWRITABLE_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2,s2 + firstL2, l2 - firstL2);
				} catch (IOException e) {
					throw new IllegalArgumentException(e);
				}
			}

		//	static {
		//		WritableComparator.define(AccountBean.class,new SencondarySortComparator());
		//	}
		}

e.编写Mapper

public class SencondarySortMapper extends Mapper<LongWritable, Text, AccountBean, NullWritable> {
			private AccountBean acc = new AccountBean();
			@Override
			protected void map(LongWritable key, Text value,Context context)
					throws IOException, InterruptedException {
				StringTokenizer st = new StringTokenizer(value.toString());
				while (st.hasMoreTokens()) {
					acc.setAccout(new Text(st.nextToken()));
					acc.setCost(new IntWritable(Integer.parseInt(st.nextToken())));
				}
				context.write(acc ,NullWritable.get());
			}
		}

f.编写Reducer

 public class SencondarySortReducer extends Reducer<AccountBean, NullWritable, AccountBean, NullWritable>{
            @Override
            protected void reduce(AccountBean key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {
            for (NullWritable nullWritable : values) {
                context.write(key, NullWritable.get());
                }
            }
        }

g.编写主类Driver

public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Path outfile = new Path("file:///D:/outtwo1");
		FileSystem fs = outfile.getFileSystem(conf);
		if(fs.exists(outfile)){
			fs.delete(outfile,true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(SencondarySortDriver.class);
		job.setJobName("Sencondary Sort");
		job.setMapperClass(SencondarySortMapper.class);  
		job.setReducerClass(SencondarySortReducer.class);
		
		job.setOutputKeyClass(AccountBean.class);
		job.setOutputValueClass(NullWritable.class);
		//声明自定义分区和分组
		job.setPartitionerClass(SencondarySortPartition.class);
		job.setGroupingComparatorClass(SencondarySortGroupComparator.class);
     //job.setSortComparatorClass(SencondarySortComparator.class);//组内排序需要声明的类

		FileInputFormat.addInputPath(job, new Path("file:///D:/测试数据/二次排序/"));
		FileOutputFormat.setOutputPath(job,outfile);
		System.exit(job.waitForCompletion(true)?0:1);
	}

I.  运行结果

            hadoop@apache          200
            hadoop@apache          300
            hadoop@apache          300
            hive@apache               159
            hive@apache               258
            hive@apache               550
            yarn@apache               580


总结:
        理解分区和分组的概念;
        分区:指定key/value到哪个Reducer中;
        分组:相同的key在一个组group中,执行Reducer Task它会并行处理组,
                   提高运行效率;要是没有组,它会处理很多个reducer任务;
     
 一个小案例:
分别对map task和reducer task数分别计数,看它们分别执行多少次;
          1)无组
                  runmap
                            map运行次数=17
                  runreducer
                            reducer运行次数=10
          2)有组
                runmap
                         map运行次数=17
                runreducer
                         reducer运行次数=3
          3)可以看出,分组之后,reducer task数明显减少,有利于提高效率!!


 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

利用MapReduce进行二次排序--附例子 的相关文章

随机推荐

  • volatile c语言关键字 / cache / 内存一致性

    Notion The all in one workspace for your notes tasks wikis and databases
  • Qt中的QWidget::move函数

    QWidget move函数 原型 xff1a void move int x int y void move const QPoint amp 其中move的原点是父窗口的左上角 xff0c 如果没有父窗口 xff0c 则桌面即为父窗口
  • 欧拉角和万向节死锁

    一 什么是欧拉角 欧拉角就是物体绕坐标系三个坐标轴 xff08 x xff0c y xff0c z轴 xff09 的旋转角度 xff0c 在这里坐标系可以是世界坐标系 xff0c 也可以是物体坐标系 xff0c 旋转顺序也是任意的 xff0
  • 【freeRTOS内存管理策略详解】

    内存管理对应用程序和操作系统来说都非常重要 现在很多的程序漏洞和运行崩溃都和内存分配使用错误有关 FreeRTOS操作系统将内核与内存管理分开实现 xff0c 操作系统内核仅规定了必要的内存管理函数原型 xff0c 而不关心这些内存管理函数
  • NGFF、M.2、PCIe、NVMe概念区分以及PCIEx1 x4 x8 x16区别

    对于NGFF M 2 PCIe NVMe等概念的说明 解决方案 NGFF Next Generation Form Factor xff0c 顾名思义 xff0c 是物理外形 Form Factor 的标准 与 NGFF 并列的是 2 5
  • 二重积分和雅可比行列式

    我们以二重积分为例进行说明 xff0c 首先说结论 xff1a 一 结论 若x 61 x u v y 61 y u v 存在偏导数 xff0c 则二阶雅可比行列式为 61 61 dxdy 61 J2 dudv J2的绝对值 且 其中积分区域
  • 雅可比行列式和雅可比矩阵

    接触雅可比行列式是在二重积分的变量变换中 xff0c 参见我的另一篇文章https blog csdn net xiaoyink article details 88432372 下面我们来详细说明一下雅可比行列式和雅可比矩阵 雅可比矩阵
  • jlink-v8 固件修复

    一 先说 jlink v8 v9 v10区别 v8基本价格在40左右 xff0c 芯片是atml的 xff0c 但是很多反应是掉固件和提示盗版问题 v9现在主流 xff0c 盗版价100左右 xff0c 主控芯片stm32 做的比较成熟 x
  • kubernetes学习-快速上手速查手册

    目录 使用k3s快速搭建k8s安装k8s dashboard使用Helm部署K8S资源k8s核心命令一切推倒重来资源创建方式NamespacePodDeploymentServiceIngress解决官网Ingress安装不了问题使用方式
  • 作为一个4年程序员至少需要掌握的专业技能

    一名3年工作经验的程序员应该具备的技能 xff0c 在机缘巧合之中 xff0c 看了这篇博客 感觉自己真的是很差 xff0c 一直想着会写if else 就已经是一名程序员了 xff0c 在工作之余也很少学习 于是 xff0c 自己的cod
  • C语言与C++的区别

    一 C 43 43 简介 本贾尼 斯特劳斯特鲁普 于1979年4月在贝尔实验室负责分析UNIX系统的内核的流量情况 于1979年10月开始着手开发一种新的编程语言 在C语言的基础上增加了面向对象机制 这就是C 43 43 的来历 在1983
  • 我的2011-当梦想照进现实

    我的2011年 xff0c 之所以是现在的样子 xff0c 始缘于我三年前的一个决定 离职考研 对于工作了两年的我来说 xff0c 离职考研是人生的一场博弈 我的2011年 xff0c 结束了研究生期间对三维骨骼动画渲染的相关研究 xff0
  • Dockerfile RUN 同时执行多条命令

    Dockerfile RUN 同时执行多条命令 Dokcerfile中的命令每执行一条即产生一个新的镜像 xff0c 当前命令总是在最新的镜像上执行 如下Dockerfile xff1a RUN span class hljs built
  • HC-SR04超声波模块使用记录

    文章目录 HC SR04超声波模块使用记录轮询测量方式一 模块使用中的问题二 应对方法三 注意 分时测量利用输入捕获测量利用输入捕获测量 HC SR04超声波模块使用记录 具体使用方法见HC SR04使用手册 xff0c 本文重点记录该模块
  • 【C语言冒泡排序、选择排序和快速排序】

    文章目录 前言一 冒泡排序二 选择排序三 快速排序四 代码设计与实现代码设计代码实现 调试结果冒泡排序改良 延伸思考总结 前言 本文简单介绍了C语言的冒泡排序 选择排序 快速排序 xff0c 结合本人的理解与使用做一下记录 一 冒泡排序 思
  • 平衡车制作---原理篇

    平衡车制作 原理篇 文章目录 平衡车制作 原理篇前言直立控制直观感受内部机理 速度控制方向控制总结 前言 本篇教程内容主要来自于 直立平衡车模参考设计方案 xff0c 且这里是从概念层面讲述的并没有具体的控制理论方面的内容 有了这些概念方面
  • FreeRTOS使用注意

    FreeRTOS使用注意 xff1a 中断中必须使用带FromISR结尾的API函数只有中断优先级处于FreeRTOS可管理的范围内时 xff0c 才能使用FreeRTOS提供的API函数中断中不要使用FreeRTOS提供的内存申请和释放函
  • 现代控制理论基础总结

    现代控制理论基础总结 xff08 线性部分 xff09 学习现代控制理论也有两个月的时间了 xff0c 里面涉及的基础内容和公式十分之多 xff0c 所以现在对各部分基础知识作一个总结 1 控制系统的状态表达式 在现代控制理论中 xff0c
  • 题库(关于c++的网站都盘了)大盘点(好多没盘到)

    1 keda ac 2 hydro ac 3 luogu com cn 4 cplusplus com 5 leetcode cn 6 https loj ac 7 noi cn 8 ybt ssoier cn 8088 9 learncp
  • 利用MapReduce进行二次排序--附例子

    首先先来明确几个概念 xff1a 1 分区 partition 1 xff09 分区 xff08 partition xff09 xff1a 默认采取散列值进行分区 xff0c 但此方法容易造成 数据倾斜 xff08 大部分数据分到同一个r