(hadoop学习-1)mapreduce实现数据过滤、聚合与排序

2023-05-16

翻译:http://blog.ditullio.fr/2015/12/24/hadoop-basics-filter-aggregate-sort-mapreduce/
数据源:
对样例数据集Donations进行过滤、聚合与排序操作。数据来自donaorschoose。解压缩后是一个1.5G的CSV文件,共460万行记录。具体详见官方网站(http://data.donorschoose.org/open-data/overview/)。
注意,
1)该数据集的换行符格式是CR/LF。在linux下,需要首先删除CR格式(\r,0x0D),并对换行符(CR)所在的上下两行合并。具体可以使用perl。
perl -pe ’s/\\\\\n/ /;’ filename
2)该数据集每个字段结果都有双引号,字符串分割时需要留意分割字符。
3)主要数据字段说明
_donationid
_projectid
_donor_acctid
_cartid
donor_city
donor_state
donor_zip: 因隐私原因,最后两位隐藏
is_teacher_acct: 教师发起的捐献
donation_timestamp
dollar_amount
donation_included_optional_support
payment_included_acct_credit
payment_included_campaign_gift_card
payment_included_web_purchased_gift_card
via_giving_page: 通过特定活动开展的捐献
for_honoree: 包括被捐献人的活动
thank_you_packet_mailed:
查询要求:
查询对所有捐赠者不是教师的记录,按照城市对捐赠金额汇总,输出城市名、捐赠金额,并按照捐赠金额排降序。城市名不区分大小写。
<span style="font-size:14px;"><span style="font-size:18px;"><span style="font-size:14px;">SELECT SUM(dollar_amount) as sumtotal, UPPER(donor_city) as city
FROM donations
WHERE is_teacher_acct != 't'
GROUP BY UPPER(donor_city)
ORDER BY sumtotal DESC;</span></span></span>

解决方案:
利用chain mapreduce,依次执行两个mapreduce Job。第一个Job抽取donor_city(城市名)、total(捐赠金额)字段,并按照城市名实现捐赠金额聚合,实现数据过滤、聚合;第二个Job,按照捐赠金额排降序。

- 第一个Job Mapper:抽取donor_city(城市名)、total(捐赠金额)字段。
- 第一个Job Combiner:按照donor_city,累加该filesplit的total。减少中间数据传送。
- 第一个Job Reducer: 按照donor_city,累加total。输出数据存储为donor_city、total。
- 第二个Job Mapper:读入第一个Job Reducer输出结果,交换key、value,输出total、donor_city。
- 第二个Job Reducer:自定义sortComparator,实现double按照降序排序。经过shuffle排序后,输出排降序的total、donor_city。
注意事项:
1、对内置数据类型,如DoubleWritable,自定义排序顺序时候,可以使用sortComparatorClass()。通过自定义排序类,继承自对应数据类型,实现排序。
2、chain mapreduce,上一个job的输出文件(存放在hdfs),直接作为下一个job的输入文件。
3、对于可能的异常,可以使用Mrunit进行测试。
执行结果:
$ hdfs dfs -cat output/donation-price/p* |head -n 20
3514021.3 New York
2328154.0 San Francisco
815354.1 Seattle
677975.6 Chicago
508308.2 West Berlin
500588.5 Los Angeles
447923.0 Brooklyn
418111.1 Oklahoma City
343251.9 Indianapolis
215072.7 Framingham
209319.9 Springfield
158270.3 Charlotte
153875.1 San Ramon
149707.1 Washington
131766.5 Tulsa
119922.8 Raleigh
115334.9 Houston
108732.2 Baltimore
101028.8 Dallas

==========donation job1==========

User:     xuefei
Name:     donation-job1
Application Type:     MAPREDUCE
Application Tags:     
YarnApplicationState:     FINISHED
FinalStatus Reported by AM:     SUCCEEDED
Started:     星期五 五月 13 09:24:57 +0800 2016
Elapsed:     57sec
Tracking URL:     History
Diagnostics:     

        Map input records=4631337
        Map output records=1502321
        Map output bytes=27150482
        Map output materialized bytes=2375954
        Input split bytes=1632
        Combine input records=1502321
        Combine output records=115926
        Reduce input groups=24224
        Reduce shuffle bytes=2375954
        Reduce input records=115926
        Reduce output records=24224  //压缩了80%记录数,减少shuffle数据量
        Spilled Records=231852
        Shuffled Maps =12 //共12个split,启动12个Map进程
        Failed Shuffles=0
        Merged Map outputs=12
        GC time elapsed (ms)=6325
        CPU time spent (ms)=93630
        Physical memory (bytes) snapshot=3480043520
        Virtual memory (bytes) snapshot=10956181504
        Total committed heap usage (bytes)=2666004480

==========donation job2==========

User:     xuefei
Name:     donation-job2
Application Type:     MAPREDUCE
Application Tags:     
YarnApplicationState:     FINISHED
FinalStatus Reported by AM:     SUCCEEDED
Started:     星期五 五月 13 09:25:56 +0800 2016
Elapsed:     21sec
Tracking URL:     History
Diagnostics:     


        Map input records=24224
        Map output records=24224
        Map output bytes=454936
        Map output materialized bytes=503390
        Input split bytes=132
        Combine input records=0
        Combine output records=0
        Reduce input groups=5990
        Reduce shuffle bytes=503390
        Reduce input records=24224
        Reduce output records=24224
        Spilled Records=48448
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=62
        CPU time spent (ms)=4010
        Physical memory (bytes) snapshot=451493888
        Virtual memory (bytes) snapshot=1703575552
        Total committed heap usage (bytes)=402653184

程序代码:
<span style="font-size:14px;"><span style="font-size:18px;"><span style="font-size:14px;">package donation1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import donation1.Donation.Descdouble;

public class Donation extends Configured implements Tool {


     //自定义sortComparatorClass,对第二个job实现按照total排降序
    public static class Descdouble extends WritableComparator {

        public Descdouble() {
            super(DoubleWritable.class, true);
            // TODO Auto-generated constructor stub
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            DoubleWritable lhs=(DoubleWritable)a;
            DoubleWritable rhs=(DoubleWritable)b;
            return ((rhs.get()-lhs.get())>0)?1:-1;
        }

    }

    public static class Djob2mapper extends
            Mapper<LongWritable, Text, DoubleWritable, Text>{
        DoubleWritable outputKey=new DoubleWritable();
        Text outputValue=new Text();
        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String []words=StringUtils.split(value.toString(), '\t');
            outputKey.set(Double.parseDouble(words[1]));
            outputValue.set(words[0]);
            context.write(outputKey, outputValue);
        }

    }

    public static class Djobreducer1 extends
            Reducer<Text, DoubleWritable, Text, DoubleWritable>{

        DoubleWritable outputValue=new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            double sumtotal=0.0;
            for(DoubleWritable value:values){
                sumtotal+=value.get();
            }
            outputValue.set(sumtotal);
            context.write(key, outputValue);
        }

    }

    public static class Djobmapper1 extends
            Mapper<LongWritable, Text, Text, DoubleWritable> {
        Text outputKey=new Text();
        DoubleWritable outputValue=new DoubleWritable();
        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

          //对输入行按照“,”进行分列
            String []words=value.toString().split("\",\"");

            if(words[0].equals("_donationid")||words[7].substring(1, 1).equals("t")||words[4].isEmpty() || words[11].isEmpty())
                return;

            String city=words[4];
            String strprice=words[11];
            strprice=strprice.substring(1, strprice.length()-1); //删除price末尾的双引号
            Double total=Double.parseDouble(strprice);
            outputKey.set(city);
            outputValue.set(total);

            context.write(outputKey, outputValue);

        }

    }

    public int run(String []args) throws Exception{
        Job job1=Job.getInstance(getConf(), "donation-job1");

        Configuration conf1=job1.getConfiguration();
        job1.setJarByClass(getClass());

        FileInputFormat.setInputPaths(job1, new Path("data/donation"));
        Path out1=new Path("output/donation-city");
        out1.getFileSystem(conf1).delete(out1, true);
        FileOutputFormat.setOutputPath(job1, out1);

        job1.setInputFormatClass(TextInputFormat.class);
        job1.setOutputFormatClass(TextOutputFormat.class);

        job1.setMapperClass(Djobmapper1.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(DoubleWritable.class);

        job1.setCombinerClass(Djobreducer1.class);
        job1.setReducerClass(Djobreducer1.class);
        job1.setOutputKeyClass(DoubleWritable.class);
        job1.setOutputValueClass(Text.class);

        //实现chain mapreduce的关键。如果job1能够成功执行,则继续继续后面代码;否则退出。
        if(job1.waitForCompletion(true)==false)
            return 1;

        Job job2=Job.getInstance(getConf(), "donation-job2");
        Configuration conf2=job2.getConfiguration();
        job2.setJarByClass(getClass());

        FileInputFormat.setInputPaths(job2, out1);
        Path out2=new Path("output/donation-price");
        out2.getFileSystem(conf2).delete(out2, true);
        FileOutputFormat.setOutputPath(job2, out2);

        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        job2.setMapperClass(Djob2mapper.class);
        job2.setMapOutputKeyClass(DoubleWritable.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setSortComparatorClass(Descdouble.class);
        job2.setReducerClass(Reducer.class);
        job2.setOutputKeyClass(DoubleWritable.class);
        job2.setOutputValueClass(Text.class);

        return job2.waitForCompletion(true)?0:1;
    }

    public static void main(String []args){
        int result=0;
        try{
            result=ToolRunner.run(new Configuration(), new Donation(), args);
        }catch(Exception e){
            e.printStackTrace();
        }
        System.exit(result);
    }
}

========Mrunit========

package donation1test;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;

import donation1.Donation.Djobmapper1;

public class DonationTest {
    MapDriver<LongWritable, Text, Text, DoubleWritable> mapdriver;

    @Before
    public void setup(){
        Djobmapper1 djm=new Djobmapper1();
        mapdriver=MapDriver.newMapDriver(djm);
    }

    @Test
    public void testMapper() throws IOException{
        LongWritable inputKey = new LongWritable(0);
        Text inputValue=new Text("\"b1e82d0b63b949927b205441c543f249\",\"8a61c8ab4d91632dbf608ae6b1a832f3\",\"90b8c62c2e07a03d2cae3a0a52f18687\",\"\",\"NEWYORK\",\"NY\",\"100\",\"f\",\"2007-12-21 18:55:13.722\",\"85.00\",\"15.00\",\"100.00\",\"100_and_up\",\"t\",\"no_cash_received\",\"f\",\"t\",\"f\",\"f\",\"f\",\"f\",\"t\",\"\"");
        mapdriver.withInput(inputKey, inputValue);
        Text outputKey = new Text("NEWYORK");
        mapdriver.withOutput(outputKey, new DoubleWritable(0));
        mapdriver.runTest();
    }

}</span></span></span>



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

(hadoop学习-1)mapreduce实现数据过滤、聚合与排序 的相关文章

  • ubuntu 终端打不开解决办法

    由于ubuntu自带的是python3 5 在新安装了python3 6以后 xff0c 开机突然发现无论是点击图标还是使用快捷键终端都无法打开 xff0c 解决办法如下 xff1a xff11 xff0e 按Ctrl 43 Alt 43
  • Jack server already installed in "/***/.jack-server" 异常

    xff08 1 xff09 在新增新用户后 xff0c 进行android编译 xff0c 出现如下异常 xff1a Ensure Jack server is installed and started FAILED bin bash c
  • gstreamer移植qnx(二):交叉编译glib

    一 简介 这里以glib的2 63 0版本 xff0c QNX系统的版本是 xff1a 6 6 这里是为了编译gstreamer的依赖库 xff0c 也就是说最终目标 xff0c 是将gstreamer移植到QNX6 6系统上 我选择的是g
  • repo安装与简单使用

    一 概述 当一个大的项目需要拆分成很多的子项目 xff0c 或者说一个软件系统拆分成多个子系统 每一个子项目或者子系统都对应一个git repository 这种需求在实际项目当中是很常见的 xff0c 有的可能就直接写一个shell脚本来
  • 通过qemu-img命令将raw image转换成VMware虚拟硬盘vmdk

    为了在VMware中跑QNX系统 xff0c 我需要想办法将编译BSP生成的img文件固化到VMware的虚拟硬盘中去 xff0c 之前一直找不到方法 xff0c 到渐渐的只能用很笨的方法几次中专 将生成的img文件通过win32DiskI
  • WSL2 Ubuntu安装Qt(包括QtCreator)

    最近因为需要在Linux下使用qtcreator做一些界面开发的预研和学习 xff0c 主要是因为要交叉编译Qt 但又不想再使用虚拟机了 xff0c 真的太消耗内存了 于是就想着直接使用Windows10 下面的WSL2 怎么安装WSL2这
  • 架构师成长之路工具篇(1):markdown撰写文档

    今天笔者想说的工具就是markdown xff0c 正所谓工欲善其事必先利其器 xff0c 选择高效的工具自然能提升工作效率 笔者使用的markdown工具是 xff1a typora word太重 xff0c 太复杂 xff0c 在写文档
  • Artifact xxxx:Web exploded: Error during artifact deployment. See server log........

    从Git上拉取了一个新项目到idea xff0c 结果一运行就报错 xff0c 错误下图 看大家的解决方法基本都是重新部署Tomcat Maven或者项目 xff0c 还有什么jar包冲突要删除的 xff0c 齐齐试了一遍 xff0c 并没
  • 如何优雅的退出qemu虚拟环境

    在console环境下 xff0c 先 按 ctrl 43 a xff0c 释放之后再按 x 键 既可terminate qemu 注 xff1a 1 a 和 x 均为小写 2 必须先释放ctrl 43 a 之后 再按x键
  • xmake经验总结1:解决c++ future/promise抛出std::system_error的问题

    1 背景 1 1 场景 编译器 xff1a gcc 9 4 运行系统 xff1a Ubuntu 20 04 4 LTS xmake v2 6 7 场景 xff1a 其大致场景是使用c 43 43 的future promise功能 xff0
  • 神经网络实现手写数字识别(MNIST)

    一 缘起 原本想沿着 传统递归算法实现迷宫游戏 gt 遗传算法实现迷宫游戏 gt 神经网络实现迷宫游戏的思路 xff0c 在本篇当中也写如何使用神经网络实现迷宫的 xff0c 但是研究了一下 xff0c 感觉有些麻烦不太好弄 xff0c 所
  • 从高考到吃“软”饭

    上大学之前 xff0c 我是一个连本科和专科都分不清的农村小娃 那时的我天真的以为 xff0c 专科就是教授比较专业的知识 xff0c 而本科就是学得比较广而不深 上大学之后 xff0c 我算是开眼界了 xff0c 各种社团真是百花齐放 对
  • 解决visio对象在word中显示不全的问题

    作为一个软件工程师 xff0c 编写技术文档是常有的事情 xff0c 使用visio绘制各种图形 如 xff0c 流程图 xff0c 结构图 xff0c 框架图 xff0c 状态图等等 也是再正常不过的事情 如果我们在word中撰写文档时
  • git submodule使用以及注意事项

    一 背景 在平时的软件开发过程中常常会有这样的场景 xff0c 自己负责的某个模块会依赖其他模块或者第三方的library 这时你自己的模块是一个独立的代码仓库 xff0c 你想要实现这样一种功能 xff0c 当你从你的模块的代码仓库里把代
  • Webpack5 - 基本使用

    一 webpack有何作用 webpack是一个Javascript应用程序的模块打包器 它可以递归地构建一个应用程序的模块依赖关系图 xff0c 然后将所有模块打包在一起 为什么需要模块打包器 xff1a 现在的应用程序模块文件很多 xf
  • Vue.js - VueRouter的Hash与History模式 / 手写VueRouter

    一 Hash与History模式 Hash模式History模式url地址外观http localhost 8081 abouthttp localhost 8080 about原理基于锚点 xff0c 监听锚点变化时触发的onhashch
  • Vue.js - Vue.js响应式原理(1/2)

    一 数据驱动 数据响应式 xff1a 数据改变 xff0c 则视图改变 xff0c 避免频繁的Dom操作 xff0c 提高运行效率 双向绑定 xff1a 数据改变 xff0c 则视图改变 xff1b 视图改变 xff0c 则数据也随之改变
  • Vue.js - 模拟Vue.js响应式原理(2/2)

    项目仓库 xff1a https gitee com big right right vue responsive tree master L8 一 类的说明 Vue类 xff1a 保存传入的选项数据 xff0c 把选项data中的成员注入
  • OpenFlow Switch Specification 1.3.0 (三)

    六 OpenFlow 安全通道 xff08 OpenFlow Channel xff09 OpenFlow 通道是连接每一个交换到控制器的接口 通过这个接口 xff0c 控制器配置和管理交换机 xff0c 从交换机接收事件 xff0c 向交
  • MATLAB并行加速方法

    用MATLAB运行计算任务时 xff0c 有时会遇到程序中有很多重复计算部分 xff0c 多次循环中 xff0c 每一次的计算之间无相互依赖 xff08 即后一次的计算不需要使用到前一次的计算结果 xff09 xff0c 可能仅改变了输入参

随机推荐