实验六 MapReduce实验:二次排序

2023-05-16

实验指导:

 

6.1 实验目的

基于MapReduce思想,编写SecondarySort程序。

6.2 实验要求

要能理解MapReduce编程思想,会编写MapReduce版本二次排序程序,然后将其执行并分析执行过程。

6.3 实验原理

MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到MR计算过程之中,而不是单独来做。

二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

6.4 实验步骤

6.4.1 编写程序

程序主要难点在于排序和聚合。

对于排序我们需要定义一个IntPair类用于数据的存储,并在IntPair类内部自定义Comparator类以实现第一字段和第二字段的比较。

对于聚合我们需要定义一个FirstPartitioner类,在FirstPartitioner类内部指定聚合规则为第一字段。

此外,我们还需要开启MapReduce框架自定义Partitioner 功能和GroupingComparator功能。

IntPair 类:

package mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

public class IntPair implements WritableComparable {
    private IntWritable first;
    private IntWritable second;
    public void set(IntWritable first, IntWritable second) {
        this.first = first;
        this.second = second;
    }
    //注意:需要添加无参的构造方法,否则反射时会报错。
    public IntPair() {
        set(new IntWritable(), new IntWritable());
    }
    public IntPair(int first, int second) {
        set(new IntWritable(first), new IntWritable(second));
    }
    public IntPair(IntWritable first, IntWritable second) {
        set(first, second);
    }
    public IntWritable getFirst() {
        return first;
    }
    public void setFirst(IntWritable first) {
        this.first = first;
    }
    public IntWritable getSecond() {
        return second;
    }
    public void setSecond(IntWritable second) {
        this.second = second;
    }
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }
    public boolean equals(Object o) {
        if (o instanceof IntPair) {
            IntPair tp = (IntPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }
    public String toString() {
        return first + "\t" + second;
    }
    public int compareTo(IntPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

完整代码:

package mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondarySort {
    static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t");
            int field1 = Integer.parseInt(fields[0]);
            int field2 = Integer.parseInt(fields[1]);
            context.write(new IntPair(field1,field2), NullWritable.get());
        }
    }
    static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
        //private static final Text SEPARATOR = new Text("------------------------------------------------");
        @Override
        protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
        public int getPartition(IntPair key, NullWritable value,
                int numPartitions) {
            return Math.abs(key.getFirst().get()) % numPartitions;
        }
    }
    //如果不添加这个类,默认第一列和第二列都是升序排序的。
//这个类的作用是使第一列升序排序,第二列降序排序
    public static class KeyComparator extends WritableComparator {
        //无参构造器必须加上,否则报错。
        protected KeyComparator() {
            super(IntPair.class, true);
        }
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
            //第一列按升序排序
            int cmp = ip1.getFirst().compareTo(ip2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            //在第一列相等的情况下,第二列按倒序排序
            return -ip1.getSecond().compareTo(ip2.getSecond());
        }
    }
    //入口程序
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SecondarySort.class);
        //设置Mapper的相关属性
        job.setMapperClass(TheMapper.class);
        //当Mapper中的输出的key和value的类型和Reduce输出
//的key和value的类型相同时,以下两句可以省略。
        //job.setMapOutputKeyClass(IntPair.class);
        //job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //设置分区的相关属性
        job.setPartitionerClass(FirstPartitioner.class);
        //在map中对key进行排序
        job.setSortComparatorClass(KeyComparator.class);
        //job.setGroupingComparatorClass(GroupComparator.class);
        //设置Reducer的相关属性
        job.setReducerClass(TheReducer.class);
        job.setOutputKeyClass(IntPair.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //设置Reducer数量
        int reduceNum = 1;
        if(args.length >= 3 && args[2] != null){
            reduceNum = Integer.parseInt(args[2]);
        }
        job.setNumReduceTasks(reduceNum);
        job.waitForCompletion(true);
    }
}

6.4.2 打包提交

使用Eclipse开发工具将该代码打包,选择主类为mr.Secondary。如果没有指定主类,那么在执行时就要指定须执行的类。假定打包后的文件名为Secondary.jar,主类SecondarySort位于包mr下,则可使用如下命令向Hadoop集群提交本应用。

[root@master hadoop]# bin/hadoop jar SecondarySort.jar mr.Secondary /user/mapreduce/secsort/in/secsortdata.txt  /user/mapreduce/secsort/out  1

其中“hadoop”为命令,“jar”为命令参数,后面紧跟打的包,/user/mapreduce/secsort/in/secsortdata.txt”为输入文件在HDFS中的位置,如果HDFS中没有这个文件,则自己自行上传。“/user/mapreduce/secsort/out/”为输出文件在HDFS中的位置,“1”为Reduce个数。

6.5 实验结果

6.5.1 输入数据

输入数据如下:secsortdata.txt ('\t'分割)(数据放在/root/data/6目录下):

7    444
3    9999
7    333
4    22
3    7777
7    555
3    6666
6    0
3    8888
4    11

6.5.2 执行结果

在master上执行对hdfs上的文件/user/mapreduce/secsort/out/part-r-00000内容查看的操作

[root@master hadoop]# bin/hadoop fs -cat  /user/mapreduce/secsort/out/p*

如图6-1所示:

图6-1

实验操作:

步骤1:搭建Hadoop集群

步骤2:上传数据文件至HDFS

步骤3:编写IntPair程序

步骤4:编写SecondarySort程序

步骤5:打包程序

步骤6:运行程序

步骤7:查看运行结果

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

实验六 MapReduce实验:二次排序 的相关文章

  • 机试第一节

    问题 xff1a 1 6中的n的阶乘太大时发生溢出怎么处理 分割线 1 双精度浮点类型的定义 xff1a double 输出 lf 单精度浮点类型的定义 xff1a float 输出 xff1a f 2鸡兔同笼问题 xff0c 判断所给样例
  • Python小技巧之——巧用with语句实现异常处理

    Python的异常处理语句try except大家都很熟悉了 xff0c 例如 xff1a try 1 0 except Exception as ex print ex integer division or modulo by zero
  • 将lwip1.4.1工程移植至lwip2.1.2记录

    将lwip1 4 1工程移植至lwip2 1 2记录 1 ip addr结构体2 etharp h3 cc h与arch h4 tcp impl h 关于二者文件 功能等差异网上已有很多文章介绍 xff0c 类似这个文档有简单说了下这两个版
  • 在Windows和Ubuntu上安装VNC连接远程服务器

    如果你是公用的服务器的管理员需要添加一些用户 xff0c 并配置VNC服务以便远程访问 xff0c 简要介绍一下配置方法 具体的命令可以参照命令手册去查看 man span class hljs command span class hlj
  • 变频器的工作原理及其电路分析

    变频器简单的说就是结合了变频技术和微电子技术研制出来的可以改变输入电源的频率得到另外一种频率电源输出的设备 其输入的电源就是我们工业上面使用的电源 xff0c 一般都是电压和频率都固定不变的交流电 240v或者380v交流电 通过内置的一些
  • 欠拟合、过拟合及其解决方法

    在我们机器学习或者训练深度神经网络的时候经常会出现欠拟合和过拟合这两个问题 xff0c 但是 xff0c 一开始我们的模型往往是欠拟合的 xff0c 也正是因为如此才有了优化的空间 xff0c 我们需要不断的调整算法来使得模型的表达能拿更强
  • ubuntu18.04安装ROS Melodic的详细过程以及填坑经历

    一 版本说明 ROS官方将在2021年不再维护Kinetic xff0c 后续使用Ubuntu18 04 43 Melodic组合 xff0c Melodic支持时间到2023年5月 二 安装前Ubuntu18 04设置 打开Ubuntu1
  • win10和ubuntu20双系统设置默认启动系统为win10

    在win10下安装了Ubuntu20 04系统 xff0c 默认情况下 xff0c 启动的是Ubuntu系统 要将默认启动系统设置成win10 xff0c 方法如下 xff1a 1 进入ubuntu系统 xff0c 按住Ctrl 43 Al
  • Keil添加芯片支持包(Pack)

    1 前言 一直用STM32的芯片 xff0c 现在想看看工程是否可以在其他厂家的芯片上跑 xff0c 可是keil的Device中只有ST厂家的 因此 xff0c 尝试在keil中添加其他厂家的芯片支持包 2 keil软件内安装 点击工具栏
  • Qt 设置窗体大小和背景颜色

    1 一种方法是设置它的最大窗口值和最小窗口值 xff0c 并且使最大值和最小值相等 简单的示例 xff1a setMinimumSize 370 150 setMaximumSize 370 150 此时窗口大小便被固定为 xff08 37
  • Shell 脚本详解

    简介 shell xff1a 蛋 壳 shell脚本是在操作系统外 xff0c 可以直接调用系统内核命令的一个脚本语言 shell脚本可以分为两大类组成 xff1a 1 命令行 xff08 系统命令行 xff09 2 脚本语法 xff08
  • Windows——电脑不能连接手机热点(WLAN显示已经禁用)的解决办法

    笔记本电脑提示 xff1a 已关闭无线功能 基于这篇博客之上 xff0c 在第二步中 xff0c 关闭WLAN AutoConfig 服务 xff0c 之后重新打开WLAN AutoConfig 服务 xff0c 即可
  • Ubuntu——系统语言由英文切换到中文的方法

    一 方法一 ubuntu设置系统语言为中文 二 方法二 若方法一中不能拖动中文输入法到第一行 xff0c 则可以直接采取卸载英文输入法 xff0c 这样就中文输入法到第一行了 xff0c 切换成中文了 英文输入法可以根据需要考虑是否安装 一
  • RealSense D435——基本介绍

    一 结构介绍 采用的是结构光Tof成像方案 正面的四个摄像头从左至右 xff0c 依次是左红外相机 红外点阵投影仪 右红外相机 RGB相机 xff08 前三个负责形成深度图 xff0c 最后一个就形成RGB图 xff09 二 小贴士 RGB
  • RealSense D435——相机内参获取

    RealSense D435 相机内参获取 一 参考博客二 小贴士2 1 遇到的问题及解决方案问题一描述问题一解决方法问题二描述问题二解决方法 一 参考博客 RealSense D435内参获取环境配置 xff1a Realsense D4
  • Vscode——报错解决:Unable to start debugging.Unexpected GDB output from command. 或 程序点击运行一直无结果

    一 报错截图 1 Unable to start debugging Unexpected GDB output from command 2 程序点击运行一直无结果 二 原因 路径中含有中文 三 解决办法 将文件放入不包含中文的路径下
  • Github——合并分支

    一 当两个分支不一样时 xff0c 会出现下面的标志 xff08 前提是设定了分支保护 xff09 xff0c 点击Compare amp pull request 二 选择双方分支 三 处理请求 四 确认请求
  • 基于四旋翼飞行器的陀螺仪、加速度计、磁力计传感器说明

    一 什么是磁力计 加速度计和陀螺仪以及他们之间的区别 1 什么是陀螺仪 加速度计和磁力计 xff1f xff08 1 xff09 陀螺仪 xff08 Gyroscope GYRO Sensor xff09 也叫地感器 xff0c 三轴陀螺仪
  • 操作系统(二) -- 操作系统的接口与实现

    前言操作系统的接口 什么是操作系统的接口POSIX标准 系统调用的实现 1 xff0c 用户程序能不能直接调用系统内核2 xff0c 如果不能直接调用 xff0c 为什么 xff1f 如何实现的3 xff0c 用户程序如何才能调用系统内核系
  • 自动驾驶路径规划技术-高速公路路径规划

    Path Planning Highway Driving project Github https github com williamhyin CarND Path Planning Email williamhyin 64 outlo

随机推荐