一起学Hadoop——二次排序算法的实现

2023-05-16

二次排序,从字面上可以理解为在对key排序的基础上对key所对应的值value排序,也叫辅助排序。一般情况下,MapReduce框架只对key排序,而不对key所对应的值排序,因此value的排序经常是不固定的。但是我们经常会遇到同时对key和value排序的需求,例如Hadoop权威指南中的求一年的高高气温,key为年份,value为最高气温,年份按照降序排列,气温按照降序排列。还有水果电商网站经常会有按天统计水果销售排行榜的需求等等,这些都是需要对key和value同时进行排序。如下图所示:

如何设计一个MapReduce程序解决对key和value同时排序的需求呢?这就需要用到组合键、分区、分组的概念。在这里又看到分区的影子,可知分区在MapReduce是多么的重要,一定要好好掌握,是优化的重点。

按照上图中数据流转的方向,我们首先设计一个Fruit类,有三个字段,分别是日期、水果名和销量,将日期、水果名和销量作为一个复合键;接着设计一个自定义Partition类,根据Fruit的日期字段分区,让相同日期的数据流向同一个partition分区中;最后定义一个分组类,实现同一个分区内的数据分组,然后按照销量字段进行二次排序。

具体实现思路:
1、定义Fruit类,实现WritableComparable接口,并且重写compareTo、equal和hashcode方法以及序列化和反序列化方法readFields和write方法。Java类要在网络上传输必须序列化和反序列化。在Map端的map函数中将Fruit对象当做key。compareTo方法用于比较两个key的大小,在本文中就是比较两个Fruit对象的排列顺序。

2、自定义第一次排序类,继承WritableComparable或者WritableComparator接口,重写compareTo或者compare方法,。就是在Map端对Fruit对象的第一个字段进行排序

3、自定义Partition类,实现Partitioner接口,并且重写getPartition方法,将日期相同的Fruit对象分发到同一个partition中。

4、定义分组类,继承WritableComparator接口,并且重写compare方法。用于比较同一分组内两个Fruit对象的排列顺序,根据销量字段比较。日期相同的Fruit对象会划分到同一个分组。通过setGroupingComparatorClass方法设置分组类。如果不设置分组类,则按照key默认的compare方法来对key进行排序。

代码如下:


  1 import org.apache.hadoop.conf.Configured;
  2 import org.apache.hadoop.io.WritableComparable;
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import org.apache.hadoop.io.*;
  7 import org.apache.hadoop.mapreduce.Partitioner;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Reducer;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.FileSystem;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.util.Tool;
 19 import org.apache.hadoop.util.ToolRunner;
 20 import org.slf4j.Logger;
 21 import org.slf4j.LoggerFactory;
 22 
 23 public class SecondrySort extends Configured implements Tool {
 24 
 25     static class Fruit implements WritableComparable<Fruit>{
 26         private static final Logger logger = LoggerFactory.getLogger(Fruit.class);
 27         private String date;
 28         private String name;
 29         private Integer sales;
 30         public Fruit(){
 31         }
 32         public Fruit(String date,String name,Integer sales){
 33             this.date = date;
 34             this.name = name;
 35             this.sales = sales;
 36         }
 37 
 38         public String getDate(){
 39             return this.date;
 40         }
 41 
 42         public String getName(){
 43             return this.name;
 44         }
 45 
 46         public Integer getSales(){
 47             return this.sales;
 48         }
 49 
 50         @Override
 51         public void readFields(DataInput in) throws IOException{
 52             this.date = in.readUTF();
 53             this.name = in.readUTF();
 54             this.sales = in.readInt();
 55         }
 56 
 57         @Override
 58         public void write(DataOutput out) throws IOException{
 59             out.writeUTF(this.date);
 60             out.writeUTF(this.name);
 61             out.writeInt(sales);
 62         }
 63 
 64         @Override
 65         public int compareTo(Fruit other) {
 66             int result1 = this.date.compareTo(other.getDate());
 67             if(result1 == 0) {
 68                 int result2 = this.sales - other.getSales();
 69                 if (result2 == 0) {
 70                     double result3 = this.name.compareTo(other.getName());
 71                     if(result3 > 0) return -1;
 72                     else if(result3 < 0) return 1;
 73                     else return 0;
 74                 }else if(result2 >0){
 75                     return -1;
 76                 }else if(result2 < 0){
 77                     return 1;
 78                 }
 79             }else if(result1 > 0){
 80                 return -1;
 81             }else{
 82                 return 1;
 83             }
 84             return 0;
 85         }
 86 
 87         @Override
 88         public int hashCode(){
 89             return this.date.hashCode() * 157 + this.sales + this.name.hashCode();
 90         }
 91 
 92         @Override
 93         public boolean equals(Object object){
 94             if (object == null)
 95                 return false;
 96             if (this == object)
 97                 return true;
 98             if (object instanceof Fruit){
 99                 Fruit r = (Fruit) object;
100 //                if(r.getDate().toString().equals(this.getDate().toString())){
101                 return r.getDate().equals(this.getDate()) && r.getName().equals(this.getName())
102                         && this.getSales() == r.getSales();
103             }else{
104                 return false;
105             }
106         }
107 
108         public String toString() {
109             return this.date + " " + this.name + " " + this.sales;
110         }
111 
112     }
113 
114     static class FruitPartition extends Partitioner<Fruit, NullWritable>{
115         @Override
116         public int getPartition(Fruit key, NullWritable value,int numPartitions){
117             return Math.abs(Integer.parseInt(key.getDate()) * 127) % numPartitions;
118         }
119     }
120 
121     public static class GroupingComparator extends WritableComparator{
122         protected GroupingComparator(){
123             super(Fruit.class, true);
124         }
125 
126         @Override
127         public int compare(WritableComparable w1, WritableComparable w2){
128             Fruit f1 = (Fruit) w1;
129             Fruit f2 = (Fruit) w2;
130 
131             if(!f1.getDate().equals(f2.getDate())){
132                 return f1.getDate().compareTo(f2.getDate());
133             }else{
134                 return f1.getSales().compareTo(f2.getSales());
135             }
136         }
137     }
138 
139     public static class Map extends Mapper<LongWritable, Text, Fruit, NullWritable> {
140 
141         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
142             String line = value.toString();
143             String str[] = line.split(" ");
144             Fruit fruit = new Fruit(str[0],str[1],new Integer(str[2]));
145             //Fruit fruit = new Fruit();
146             //fruit.set(str[0],str[1],new Integer(str[2]));
147             context.write(fruit, NullWritable.get());
148         }
149     }
150 
151     public static class Reduce extends Reducer<Fruit, NullWritable, Text, NullWritable> {
152 
153         public void reduce(Fruit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
154             String str = key.getDate() + " " + key.getName() + " " + key.getSales();
155             context.write(new Text(str), NullWritable.get());
156         }
157     }
158 
159     @Override
160     public int run(String[] args) throws Exception {
161         Configuration conf = new Configuration();
162         // 判断路径是否存在,如果存在,则删除
163         Path mypath = new Path(args[1]);
164         FileSystem hdfs = mypath.getFileSystem(conf);
165         if (hdfs.isDirectory(mypath)) {
166             hdfs.delete(mypath, true);
167         }
168 
169         Job job = Job.getInstance(conf, "Secondry Sort app");
170         // 设置主类
171         job.setJarByClass(SecondrySort.class);
172 
173         // 输入路径
174         FileInputFormat.setInputPaths(job, new Path(args[0]));
175         // 输出路径
176         FileOutputFormat.setOutputPath(job, new Path(args[1]));
177 
178         // Mapper
179         job.setMapperClass(Map.class);
180         // Reducer
181         job.setReducerClass(Reduce.class);
182 
183         // 分区函数
184         job.setPartitionerClass(FruitPartition.class);
185 
186         // 分组函数
187         job.setGroupingComparatorClass(GroupingComparator.class);
188 
189         // map输出key类型
190         job.setMapOutputKeyClass(Fruit.class);
191         // map输出value类型
192         job.setMapOutputValueClass(NullWritable.class);
193 
194         // reduce输出key类型
195         job.setOutputKeyClass(Text.class);
196         // reduce输出value类型
197         job.setOutputValueClass(NullWritable.class);
198 
199         // 输入格式
200         job.setInputFormatClass(TextInputFormat.class);
201         // 输出格式
202         job.setOutputFormatClass(TextOutputFormat.class);
203 
204         return job.waitForCompletion(true) ? 0 : 1;
205     }
206 
207     public static void main(String[] args) throws Exception{
208         int exitCode = ToolRunner.run(new SecondrySort(), args);
209         System.exit(exitCode);
210     }
211 }  

测试数据:

20180906 Apple 200
20180904 Apple 200
20180905 Banana 100
20180906 Orange 300
20180906 Banana 400
20180904 Orange 100
20180905 Apple 400
20180904 Banana 300
20180905 Orange 500

运行结果:

20180906 Banana 400
20180906 Orange 300
20180906 Apple 200
20180905 Orange 500
20180905 Apple 400
20180905 Banana 100
20180904 Banana 300
20180904 Apple 200
20180904 Orange 100

 

总结:

1、在使用实现WritableComparable接口的方式实现自定义比较器时,必须有一个无参的构造函数。否则会报Unable to initialize any output collector的错误。
2、readFields和write方法中处理字段的顺序必须一致,否则会报MapReduce Error: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197)的错误。

 

了解更多大数据的知识请关注我的微信公众号:summer_bigdata

欢迎可以扫码关注本人的公众号:

 

转载于:https://www.cnblogs.com/airnew/p/9631718.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一起学Hadoop——二次排序算法的实现 的相关文章

  • Hadoop 构建在 Windows 中失败:native.sln 中缺少 zconf.h?

    我正在尝试在使用 Windows 10 家庭版的开发计算机上构建以下 hadoop 版本 hadoop 2 7 3 src 以下是我本地开发环境的详细信息 Windows 10家庭版 英特尔酷睿 i5 6200U CPU 2 30GHz 内
  • 如何为 HDFS 递归列出子目录?

    我在 HDFS 中递归创建了一组目录 如何列出所有目录 对于普通的 UNIX 文件系统 我可以使用以下命令来做到这一点 find path type d print 但我想为 HDFS 得到类似的东西 递归列出目录内容hadoop dfs
  • Amazon MapReduce 日志分析最佳实践

    我正在解析 Apache Nginx Darwin 视频流服务器 生成的访问日志 并按日期 引用者 用户代理聚合每个交付文件的统计信息 每小时都会生成大量日志 而且这个数字在不久的将来可能会急剧增加 因此通过 Amazon Elastic
  • Sqoop Import --password-file 功能在 sqoop 1.4.4 中无法正常工作

    我使用的是hadoop 1 2 1 sqoop版本是1 4 4 我正在尝试运行以下查询 sqoop import connect jdbc mysql IP 3306 database name table clients target d
  • Curl下载到HDFS

    我有这个代码 curl o fileName csv url xargs hdfs dfs moveFromLocal 1 somePath 当我执行此代码时 curl 将请求中的值放入 fileName csv 中 该文件将移动到 HDF
  • Hive查询快速查找表大小(行数)

    是否有 Hive 查询可以快速查找表大小 即行数 而无需启动耗时的 MapReduce 作业 这就是为什么我想避免COUNT I tried DESCRIBE EXTENDED 但这产生了numRows 0这显然是不正确的 对新手问题表示歉
  • 在 Hadoop 中按文件中的值排序

    我有一个文件 其中每行包含一个字符串 然后是一个空格 然后是一个数字 例子 Line1 Word 2 Line2 Word1 8 Line3 Word2 1 我需要按降序对数字进行排序 然后将结果放入文件中 为数字分配排名 所以我的输出应该
  • 如果 HBase 不是运行在分布式环境中,它还有意义吗?

    我正在构建数据索引 这将需要以形式存储大量三元组 document term weight 我将存储多达几百万个这样的行 目前我正在 MySQL 中将其作为一个简单的表来执行 我将文档和术语标识符存储为字符串值 而不是其他表的外键 我正在重
  • Sqoop mysql错误-通信链路故障

    尝试运行以下命令 sqoop import connect jdbc mysql 3306 home credit risk table bureau target dir home sqoop username root password
  • 是否值得购买 Mahout in Action 以跟上 Mahout 的速度,或者还有其他更好的来源吗?

    我目前是一个非常随意的用户阿帕奇马胡特 http mahout apache org 我正在考虑购买这本书象夫在行动 http www manning com owen 不幸的是 我很难理解这本书的价值 并且认为它是一本曼宁早期访问计划 h
  • hive查询无法通过jdbc生成结果集

    我是 Hive 和 Hadoop 的新手 在我的教程中 我想将表创建为 import java sql SQLException import java sql Connection import java sql ResultSet im
  • Namenode高可用客户端请求

    谁能告诉我 如果我使用java应用程序请求一些文件上传 下载操作到带有Namenode HA设置的HDFS 这个请求首先去哪里 我的意思是客户端如何知道哪个名称节点处于活动状态 如果您提供一些工作流程类型图或详细解释请求步骤 从开始到结束
  • 获取行 HBase 的特定列族中的列

    我正在编写一个应用程序 通过 JSP 显示 HBase 中特定表中的数据 我想获取一行的特定列族中的所有列 有什么办法可以做到这一点吗 public String getColumnsInColumnFamily Result r Stri
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • hive 从两个数组创建映射或键/值对

    我有两个具有相同数量值的数组 它们映射为 1 1 我需要从这两个数组创建一个键 值对或映射 键 值 任何想法或提示都会有帮助 当前表结构 USA WEST NUMBER Street City 135 Pacific Irvine USA
  • Hive - 线程安全的自动递增序列号生成

    我遇到一种情况 需要将记录插入到特定的 Hive 表中 其中一列需要是自动递增的序列号 即在任何时间点都必须严格遵循 max value 1 规则 记录从许多并行的 Hive 作业插入到这个特定的表中 这些作业每天 每周 每月批量运行 现在
  • 非 hdfs 文件系统上的 hadoop/yarn 和任务并行化

    我已经实例化了 Hadoop 2 4 1 集群 并且发现运行 MapReduce 应用程序的并行化方式会有所不同 具体取决于输入数据所在的文件系统类型 使用 HDFS MapReduce 作业将生成足够的容器 以最大限度地利用所有可用内存
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • Windows 上的 Apache Pig 在运行“pig -x local”时出现“hadoop-config.cmd”未被识别为内部或外部命令”错误

    如果您由于以下错误而无法在 Windows 上运行 Apache Pig hadoop 2 4 0 bin hadoop config cmd is not recognized as an internal or external com

随机推荐