MapReduce程序开发

2023-05-16

通过API操作之前要先了解几个基本知识

基本数据类型

Hadoop的基本数据类型和Java的基本数据类型是不一样的,但是都存在对应的关系

如下图
数据类型关系

如果需要定义自己的数据类型,则必须实现Writable
hadoop的数据类型可以通过get方法获得对应的java数据类型
而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换

关于Hadoop的Writable接口,详情请看Hadoop I/O中的序列化部分

MapReduce执行的基本步骤

Hadoop提交作业的的步骤分为八个,可以理解为天龙八步

Map端工作

  • 1.1 读取要操作的文件–这步会将文件的内容格式化成键值对的形式,键为每一行的起始位置偏移,值为每一行的内容。
  • 1.2 调用map进行处理–在这步使用自定义的Mapper类来实现自己的逻辑,输入的数据为1.1格式化的键值对,输入的数据也是键值对的形式。
  • 1.3 对map的处理结果进行分区–map处理完毕之后可以根据自己的业务需求来对键值对进行分区处理,比如,将类型不同的结果保存在不同的文件中等。这里设置几个分区,后面就会有对应的几个Reducer来处理相应分区中的内容。
  • 1.4 分区之后,对每个分区的数据进行排序,分组–排序按照从小到大进行排列,排序完毕之后,会将键值对中,key相同的选项 的value进行合并。如,所有的键值对中,可能存在
    hello 1
    hello 1
    key都是hello,进行合并之后变成
    hello 2
    可以根据自己的业务需求对排序和合并的处理进行干涉和实现。
  • 1.5 归约(combiner)–简单的说就是在map端进行一次reduce处理,但是和真正的reduce处理不同之处在于:combiner只能处理本地数据,不能跨网络处理。通过map端的combiner处理可以减少输出的数据,因为数据都是通过网络传输的,其目的是为了减轻网络传输的压力和后边reduce的工作量。并不能取代reduce。

Reduce端工作

  • 2.1 通过网络将数据copy到各个reduce。
  • 2.2 调用reduce进行处理–reduce接收的数据是整个map端处理完毕之后的键值对,输出的也是键值对的集合,是最终的结果。
  • 2.3 将结果输出到hdfs文件系统的路径中。

程序开发

开发流程

一般情况下我们不可能直接在生产环境中直接拿海量数据进行程序开发,不说程序有没有bug,能不能跑起来都是一个问题
所以通常步骤是:

1.本地程序开发测试:从海量数据中抽取小部分数据到本地,使用IDE等工具进行开发,并在测试数据集上进行程序运行、逻辑等测试
2.集群环境运行:本地测试通过后,就可以将代码运行在海量数据中,这时候90%的小bug已经得到修复了,剩余要解决的就是生产环境中的问题了
3.调优:程序能够在集群中运行起来并不代表成功,通过一些集群、程序调优方式可以让你的代码跑的更好、更快

导包

新建一个java项目,并导入hadoop包,在项目选项上右键,如图选择
示例

找到hadoop的安装目录,选择所有的包
示例

在找到hadoop安装目录下的lib,导入其中的所有包
示例

代码

新建JMapper类为自定义的Mapper类

import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  

//自定义的Mapper类必须继承Mapper类,并重写map方法实现自己的逻辑  
public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {  
    //处理输入文件的每一行都会调用一次map方法,文件有多少行就会调用多少次  
    protected void map(  
            LongWritable key,  
            Text value,  
            org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)  
            throws java.io.IOException, InterruptedException {  
        //key为每一行的起始偏移量  
        //value为每一行的内容  

        //每一行的内容分割,如hello   world,分割成一个String数组有两个数据,分别是hello,world  
        String[] ss = value.toString().toString().split("\t");  
        //循环数组,将其中的每个数据当做输出的键,值为1,表示这个键出现一次  
        for (String s : ss) {  
            //context.write方法可以将map得到的键值对输出  
            context.write(new Text(s), new LongWritable(1));  
        }  
    };  
}  

新建JReducer类为自定义的Reducer

import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  

//自定义的Reducer类必须继承Reducer,并重写reduce方法实现自己的逻辑,泛型参数分别为输入的键类型,值类型;输出的键类型,值类型;之后的reduce类似  
public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  
    //处理每一个键值对都会调用一次reduce方法,有多少个键值对就调用多少次  
    protected void reduce(  
            Text key,  
            java.lang.Iterable<LongWritable> value,  
            org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)  
            throws java.io.IOException, InterruptedException {  
        //key为每一个单独的单词,如:hello,world,you,me等  
        //value为这个单词在文本中出现的次数集合,如{1,1,1},表示总共出现了三次  
        long sum = 0;  
        //循环value,将其中的值相加,得到总次数  
        for (LongWritable v : value) {  
            sum += v.get();  
        }  
        //context.write输入新的键值对(结果)  
        context.write(key, new LongWritable(sum));  
    };  
}  

新建执行提交作业的类,取名JSubmit

import java.io.IOException;  
import java.net.URI;  
import java.net.URISyntaxException;  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

public class JSubmit {  
    public static void main(String[] args) throws IOException,  
            URISyntaxException, InterruptedException, ClassNotFoundException {  
        //Path类为hadoop API定义,创建两个Path对象,一个输入文件的路径,一个输入结果的路径  
        Path outPath = new Path("hdfs://localhost:9000/out");  
        //输入文件的路径为本地linux系统的文件路径  
        Path inPath = new Path("/home/hadoop/word");  
        //创建默认的Configuration对象  
        Configuration conf = new Configuration();  
        //根据地址和conf得到hadoop的文件系统独享  
        //如果输入路径已经存在则删除  
        FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);  
        if (fs.exists(outPath)) {  
            fs.delete(outPath, true);  
        }  
        //根据conf创建一个新的Job对象,代表要提交的作业,作业名为JSubmit.class.getSimpleName()  
        Job job = new Job(conf, JSubmit.class.getSimpleName());  
        //1.1  
        //FileInputFormat类设置要读取的文件路径  
        FileInputFormat.setInputPaths(job, inPath);  
        //setInputFormatClass设置读取文件时使用的格式化类  
        job.setInputFormatClass(TextInputFormat.class);  

        //1.2调用自定义的Mapper类的map方法进行操作  
        //设置处理的Mapper类  
        job.setMapperClass(JMapper.class);  
        //设置Mapper类处理完毕之后输出的键值对 的 数据类型  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(LongWritable.class);  

        //1.3分区,下面的两行代码写和没写都一样,默认的设置  
        job.setPartitionerClass(HashPartitioner.class);  
        job.setNumReduceTasks(1);  
        //1.4排序,分组  

        //1.5归约,这三步都有默认的设置,如果没有特殊的需求可以不管
        //2.1将数据传输到对应的Reducer  

        //2.2使用自定义的Reducer类操作  
        //设置Reducer类  
        job.setReducerClass(JReducer.class);  
        //设置Reducer处理完之后 输出的键值对 的数据类型  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(LongWritable.class);  

        //2.3将结果输出  
        //FileOutputFormat设置输出的路径  
        FileOutputFormat.setOutputPath(job, outPath);  
        //setOutputFormatClass设置输出时的格式化类  
        job.setOutputFormatClass(TextOutputFormat.class);  

        //将当前的job对象提交  
        job.waitForCompletion(true);  
    }  

运行

运行java程序,可以再控制台看到提交作业的提示
示例

在hdfs中查看输出的文件
示例

程序调试

在集群上调试程序是十分困难的,就算采用最常见的println打印信息你也不知道该信息会在集群上哪个节点打印出来

在MapReduce中,可以使用系统错误信息+计数器的组合来进行调试,在map或者reduce函数中,我们可以这样做:

System.out.println("一些错误信息");
context.setStatus("关于错误的一些提示")
context.getCounter(计数器组名一般为枚举类型).increment(1);

在CLI中可以通过以下命令查看计数器:

mapred job -counter jobId counterGroup counterName

counterGroup:计数器组名,一般为枚举类型的全类名
counterName:计数器名,一般为枚举类型的值

性能调优

参考:MapReduce性能调优记录

JobControl

开发MapReduce程序的时候,我们需要考虑如何把需求转换为MapReduce模型来解决问题
对于一些复杂的场景,我们通常是使用多个Job来完成任务,而不是一个非常复杂的单一Job

所以一个完成的任务就可能会有多个Job,一个Job有可能有多个Mapper的情况,参考:
多个Mapper和Reducer处理多个输入

开发技巧

参考:MapReduce开发技巧

更多应用场景

下面是自己在学习过程中收集整理的一些MapReduce场景Demo,可以提供参考和帮助:
GitHub - chubbyjiang/MapReduce: MapReduce Demo

作者:@小黑

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce程序开发 的相关文章

随机推荐

  • SAP查看SPRO配置对应的事务码

    在SPRO里面 xff0c 选中配置项 xff0c 然后点击菜单 编辑 显示IMG活动 在显示实施指南活动 xff1a 已分配对象 维护对象 里面 xff0c 点维护对象 xff0c 找到定制对象 xff0c 就可以查看对应的事务码 但是不
  • 分卷压缩与解压缩

    分卷压缩命令格式 tar lt args gt lt filedir gt split d b lt size gt lt compress filename gt xff0c 意思是将 lt filedir gt 分卷压缩 xff0c 每
  • 27 RGB值 1颜色单位 2RGB 3用浓度表示颜色

    xff11 xff12 xff13 转载于 https www cnblogs com anvivi p 9703677 html
  • SAP有用的知识(持续更新)

    一 安装SAP 1 1 产品可用性矩阵 xff08 Product Availability Matrix xff09 SAP官网 Maintenance Product Availability Matrix xff0c 点击页面的Acc
  • ORA-01113 file 1 needs media recovery

    启动数据库时报错 ORA 01113 datafile1需要恢复 rman执行恢复 恢复后尝试打开数据库 xff0c 看结果 rman target recover datafile 1 alter database open 反复上述过程
  • 网络管理员比赛回顾01-基本操作和简单vlan

    目录 一 模拟器eNSP 二 基本操作 三 配置IP地址 四 VLAN 一 模拟器eNSP 使用eNSP模拟器 xff0c 来源于网络上的安装包 xff0c 学习一个 基本操作就不多说了 xff0c 在实践里慢慢记录 二 基本操作 认识3种
  • SAP有用的NOTE(持续更新)

    目录 2421240 Portal is not loaded on Chrome 56 or higher 66971 Supported SAP GUI platforms 66971 Supported SAP GUI platfor
  • 网络管理员比赛回顾02-网关、静态路由、动态路由

    目录 一 配置网关 二 配置静态路由 三 配置动态路由 3 1 使用RIP协议配置动态路由 3 2 使用OSPF协议配置动态路由 2021年9月参加青年网络管理员比赛 xff0c 因为网管超龄不能按照 青年 参赛 xff0c 临时培训我们这
  • 网络管理员比赛回顾03-单臂路由

    三层交换机sw1配置 xff0c 划分三个vlan xff0c 3个接入端口分别允许各自vlan通过 xff0c 一个骨干端口允许所有vlan通过 lt Huawei gt undo terminal monitor lt Huawei g
  • SAP创建webservice

    目录 一 创建webservice 二 更改webservice 三 SoapUI测试webservice 四 查看webservice日志及排错 一 创建webservice 以用户相关的函数User为例创建webservice xff0
  • SAP事务码f-02做账界面显示“页数”字段

    事务码 f 02 做账界面 xff0c 没有显示页数 用户账号的参数添加 CSF xff08 Country Specific Fields xff09 参数 xff0c 参数值为 CN xff08 伟大的China xff09 再次来到
  • 网络管理员比赛回顾04-DHCP

    目录 一 DHCP的配置 二 DHCP中继 2021年9月参加青年网络管理员比赛 xff0c 因为网管超龄不能按照 青年 参赛 xff0c 临时培训我们这批 青年 参赛 xff0c 回顾一下经过以及学到的技能 本节回顾DHCP 一 DHCP
  • SNMP服务配置

    由于服务器账号密码要定期更改 xff0c 监控服务器一般不应使用可登录的账号 xff0c 而应该使用SNMP协议获取服务器信息 记录一下CentOS 7配置SNMPv3服务的过程 安装SNMP服务 yum install net snmp
  • zabbix-agent安装

    最近新上了zabbix监控 xff0c 记录部署过程 一 Linux上安装 修改yum源配置 vim etc yum repos d lt your repo gt zabbix name 61 zabbix baseurl 61 http
  • SublimeText + Anaconda插件 打造 Python IDE

    目录 一 安装anaconda插件 二 创建构建环境 三 快捷键 自己尝试了SublimeText 43 Anaconda插件 xff0c 打造 Python IDE的方案是可行的 xff0c 记录配置过程 安装SublimeText的过程
  • 在Ubuntu22.4下搭建pytorch深度学习环境

    Anacnda安装 xff1a 这个在其他地方搜索 xff0c 可以搜到参考oknacUbuntu下安装Anaconda的步骤 xff08 带图 xff09 知乎 安装完毕source bashrc进入base环境 xff0c conda
  • leetcode算法题-两数之和

    解法一 xff1a 哈希表 class Solution def twoSum self nums List int target int gt List int hashmap 61 for ind num in enumerate nu
  • python 求列表中出现频率最高的元素

    def count l1 list return max set l1 key l1 count max set list key 61 list count 函数会根据关键词参数key 61 list count所指定带有一个参数的函数
  • 算法python回文数数学法

    class Solution def isPalindrome self x int gt bool if x lt 0 or x 10 61 61 0 and x 61 0 return False y 61 0 t 61 x while
  • MapReduce程序开发

    通过API操作之前要先了解几个基本知识 基本数据类型 Hadoop的基本数据类型和Java的基本数据类型是不一样的 xff0c 但是都存在对应的关系 如下图 如果需要定义自己的数据类型 xff0c 则必须实现Writable hadoop的