学习Hadoop MapReduce与WordCount例子分析

2023-05-16

/*
MapReduce框架一直围绕着key-value这样的数据结构,下面以官方自带的WordCount为例子,自己分析MapReduce的工作机制。MapReduce可以分为Map和Reduce过程,
代码实现了两个类,分别是继承Mapper和Reduceer,Mapper类里面有map接口,Reduceer类有reduce接口,对于统计单词这个例子来说,MapReduce会把文件以行为
拆分对象,每分析一行就会调用Mapper类里面的map接口,然后map接口里面的代码由程序员实现其逻辑,然后把map接口处理完的结果输送给Reduceer的reduce的接
口,中间还可以插入一个combiner的接口用于对map接口的数据进行中间结果处理再丢给reduce做最终的汇总。具体流程看代码注释。
*/

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
    /*
    Mapper他是一个模板类,Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,
    KEYIN 输入key的类型,VALUEIN输入value的类型
    KEYOUT 输出key的类型,VALUEOUT输出value的类型
    四个类型决定了map接口的输入与输出类型

    比较形象地描述key,value,在map,combiner,reduce流转的
    (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

    其中还有规定,就是KEY和VALUE类型必须是实现了Writeable接口的,KEY类型还需要额外实现WritableComparable接口

    通常在Mapper模板里面,KEYIN是不需要特指定的,就用基类Object就可以了,VAULEIN指定为Text,这个Text是<pre name="code" class="java">    org.apache.hadoop.io.Text,这个Text已经满足了实现Writeable接口的条件了,在这个例子里面VALUE就是文件的行内
    容,所以定义类型为Text。
    对于KEYOUT和VALUEOUT,作为输出key类型和value类型,这里定义为Text和IntWritable,keyout就是需要统计单词个数
    的单词,IntWriteable就是对应某个单词的次数,其实这个就是一个Int类型,为了符合接口需要所以就基础了Writeable
    Context它是一个贯通map接口<-->combiner接口<-->reduce接口的上下文数据,在map接口里面,单词对应次数会保存在context
    里面,到了reduce接口,MapReduce会把之前map的context用key对应结果集合的形式给reduce接口。
    */

    private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

    /*
    下面是对两个文件统计单词调用map接口之后的context结果
    For the given sample input the first map emits:
    < Hello, 1>
    < World, 1>
    < Bye, 1>
    < World, 1>

    The second map emits:
    < Hello, 1>
    < Hadoop, 1>
    < Goodbye, 1>
    < Hadoop, 1>
    */


    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /*
    Reduceer也是一个类模板,跟Mapper一样需要指定KEYIN,VALUEIN,KEYOUT,VALUEOUT,
    其中KEYIN和VALUEIN必须跟Mapper的KEYOUT,VALUEOUT一一对应,因为map接口输出的结果key->value
    就是reduce接口的输入,只是MapReduce框架把map接口里面相同的key变成一个key->values
    的values集合,所以在reduce接口里面KEYIN是Text也就是单词,VALUEOUT是IntWriteable集合的
    迭代器Interable<IntWriteable>,context就是reduce的输出结果了

    */

    private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

    /*
    在例子里面,还指定了combiner,其实cominer和reduce都是同一个接口reduce,第一次调用reduce接口是combiner过程,把每个文件
    的单词做了key->value 到 key->values的汇总,结果如下
    The output of the first map:
    < Bye, 1>
    < Hello, 1>
    < World, 2>

    The output of the second map:
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 1>
    */


    /*
    第二次调用reduce接口,就是reduce的过程,把combiner处理过的中间结果做一次最终的汇总
    < Bye, 1>
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 2>
    < World, 2>
    */

    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf, "word count"); 
        job.setJarByClass(WordCount.class); 
        job.setMapperClass(TokenizerMapper.class); 
        job.setCombinerClass(IntSumReducer.class); 
        job.setReducerClass(IntSumReducer.class); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class); 
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    }
} 


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

学习Hadoop MapReduce与WordCount例子分析 的相关文章

  • MongoDB:在没有并行性的情况下使用 MapReduce 有什么意义?

    Quoting http www mongodb org display DOCS MapReduce MapReduce Parallelism http www mongodb org display DOCS MapReduce Ma
  • Python - Map/Reduce - 如何在使用 DISCO 计数单词示例中读取 JSON 特定字段

    我正在按照 DISCO 示例来计算文件中的单词数 将单词数作为 Map Reduce 作业 http discoproject org doc disco start tutorial html 我对此工作没有任何问题 但是我想尝试从包含
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • Spark scala - 按数组列分组[重复]

    这个问题在这里已经有答案了 我对 Spark Scala 很陌生 感谢你的帮助 我有一个数据框 val df Seq a a1 Array x1 x2 a b1 Array x1 a c1 Array x2 c c3 Array x2 a
  • hadoop2.2.0追加文件发生AlreadyBeingCreatedException

    我遇到了一个关于hadoop2 2 0追加操作的问题 我通过 HDFS java API 将一些字节附加到 hdfs 文件 首先 如果在附加操作之前文件不存在 我将创建目标文件 代码如下 String fileUri hdfs hadoop
  • hive创建表的多个转义字符

    我正在尝试将带有管道分隔符的 csv 加载到配置单元外部表 数据值包含单引号 双引号 括号等 使用 Open CSV 版本 2 3 测试文件 csv id name phone 1 Rahul 123 2 Kumar s 456 3 Nee
  • 如何将Hive数据表迁移到MySql?

    我想知道如何将日期从 Hive 转移到 MySQL 我看过有关如何将 Hive 数据移动到 Amazon DynamoDB 的示例 但没有看到有关如何将 Hive 数据移动到 MySQL 等 RDBMS 的示例 这是我在 DynamoDB
  • 如何在 Hadoop 中将 String 对象转换为 IntWritable 对象

    我想转换String反对IntWritableHadoop 中的对象 任何过程都可以进行转换 IntWritable value new IntWritable Integer parseInt someString 并处理以下可能性par
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m
  • 在映射器的单个输出上运行多个减速器

    我正在使用地图缩减实现左连接功能 左侧有大约 6 亿条记录 右侧有大约 2300 万条记录 在映射器中 我使用左连接条件中使用的列来创建键 并将键值输出从映射器传递到减速器 我遇到性能问题 因为两个表中的值数量都很高的映射器键很少 例如分别
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • Sqoop - 绑定到 YARN 队列

    因此 使用 MapReduce v2 您可以使用绑定到某些 YARN 队列来管理资源和优先级 基本上通过使用 hadoop jar xyz jar D mapreduce job queuename QUEUE1 input output
  • 更改 Spark Streaming 中的输出文件名

    我正在运行一个 Spark 作业 就逻辑而言 它的性能非常好 但是 当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时 输出文件的名称格式为 part 00000 part 00001 等 有没有办法更改输出文件名 谢谢
  • Hadoop 推测任务执行

    在Google的MapReduce论文中 他们有一个备份任务 我认为这与Hadoop中的推测任务是一样的 推测任务是如何实现的 当我启动一项推测任务时 该任务是从一开始就作为较旧且缓慢的任务开始 还是从较旧的任务到达的位置开始 如果是这样
  • 如何在 MapReduce 作业中导入自定义模块?

    我有一个 MapReduce 作业定义在main py 它导入了lib模块来自lib py 我使用 Hadoop Streaming 将此作业提交到 Hadoop 集群 如下所示 hadoop jar usr lib hadoop mapr
  • pyspark.sql.utils.AnalysisException:u'Path不存在

    我正在使用标准 hdfs 与 amazon emr 运行 Spark 作业 而不是 S3 来存储我的文件 我在 hdfs user hive warehouse 有一个配置单元表 但当我的 Spark 作业运行时找不到它 我配置了 Spar

随机推荐

  • 网络安全产品认知——边界防护

    边界防护的安全理念 边界防护 网络边界 具有不同安全级别的网络之间的分界线都可以定义为网络边界 网络边界防护 xff1a 针对不同网络环境所设置的安全防御措施 企业网络常见边界 企业内部网络与外部网络 企业部门之间 gt 业务类型 重要部门
  • python列表

    目录 1 列表 xff08 list 线性表 xff09 2 定义一个列表 1 直接用 2 用list 3 常见的方法 1 append object 向列表尾部追加元素 2 insert index object 向指定位置 xff08
  • kubernetes应用flannel失败

    按照官网给的命令 kubectl apply f https raw githubusercontent com coreos flannel master Documentation kube flannel yml 回头查看k8s的运行
  • 腾讯祭出大招VasSonic,让你的H5页面首屏秒开!

    作者简介 xff1a 陈志兴 xff0c 腾讯SNG增值产品部高级工程师 xff0c 主要负责手Q个性化业务 手Q WebView等项目 喜欢阅读优秀的开源项目 xff0c 听听音乐 xff0c 偶尔也会打打竞技类游戏 本文根据作者在201
  • 直流电机和步进电机-第1季第12部分-朱有鹏-专题视频课程

    直流电机和步进电机 第1季第12部分 1966人已学习 课程介绍 本课程是 朱有鹏老师单片机完全学习系列课程 第1季第12个课程 xff0c 主要讲解了直流电机和步进电机 xff0c 其中步进电机是关键 xff0c 通过学习让大家初步掌握步
  • 无需后台接入?带你玩转VasSonic 2.0里的Local Server

    腾讯手Q增值团队于今年8月份正式开源了VasSonic xff0c 一个轻量级高性能的Hybrid框架 VasSonic框架使用并行加载 动态缓存 增量更新等手段 xff0c 实现了终端H5页面的秒开 xff0c 对用户体验的优化做的非常极
  • gcc中的-w -W和-Wall选项

    今天在看一个makefile时看到了gcc W Wall 这句 xff0c 不明其理 xff0c 专门查看了gcc的使用手册 w的意思是关闭编译时的警告 xff0c 也就是编译后不显示任何warning xff0c 因为有时在编译之后编译器
  • VNC Connect使用参数填充VNC配置文件

    VNC Server xff0c VNC Viewer和支持程序由参数控制 xff0c 为大多数用户提供了合适的默认值 您可以通过为参数指定新值来配置程序 xff1a 1 在程序启动之前 2 在启动时在命令行上 3 程序运行时 xff0c
  • arXiv Journal 2021-01-11

    想来想去 xff0c 觉得还是把每次在arXiv上扫过的文章简单记录下来 2021 01 11 hep ph 2 papershep th 2 papershep lat 1 paper hep ph 2 papers Title QCD
  • HJ28 素数伴侣

    描述 题目描述 若两个正整数的和为素数 xff0c 则这两个正整数称之为 素数伴侣 xff0c 如2和5 6和13 xff0c 它们能应用于通信加密 现在密码学会请你设计一个程序 xff0c 从已有的 N xff08 N 为偶数 xff09
  • ZRAM SWAP

    1 ZRAM 1 1 zram的理解 ZRAM xff08 压缩内存 xff09 的意思是说在内存中开辟一块区域压缩数据 就是说假设原来150MB的可用内存现在可以放下180MB的东西 本身不会提高内存容量和运行速度 只是让后台程序更少被系
  • 最简单的神经网络--BP神经网络介绍

    今天从网上看到一篇介绍BP神经网络的文章 xff0c 感觉非常好 xff0c 转载保存 转载地址 xff1a https blog csdn net weixin 40432828 article details 82192709
  • 【没有哪个港口是永远的停留~ 论文解读】SphereFace

    论文 xff1a SphereFace Deep Hypersphere Embedding for Face Recognition 代码 xff1a at https github com wy1iu sphereface 摘要 本文讨
  • 【没有哪个港口是永远的停留~ 论文解读】AM - softmax

    论文 xff1a Additive Margin Softmax for Face Verification 代码 xff1a https github com happynear AMSoftm 相似论文 xff1a CosFace La
  • 串口通信和RS485-第1季第13部分-朱有鹏-专题视频课程

    串口通信和RS485 第1季第13部分 5373人已学习 课程介绍 本课程是 朱有鹏老师单片机完全学习系列课程 第1季第13个课程 xff0c 主要讲解了串行通信UART及其扩展RS485 本课程很重要 xff0c 因为串口通信是我们接触的
  • 每天一分钟玩转golang:基础类型之浮点型(二)

    大家好 xff0c 我是加摩斯 xff0c 觉得文章有帮助的小伙伴 xff0c 记得一键三连哟 xff5e 申明 xff1a 本系列两天更新一篇 xff0c 纯原创 xff0c 转载前请与我沟通 Go使用两种浮点型变量来存储小数 xff0c
  • Linux Deploy踩坑指南之二:开启zram块设备

    参考 xff1a https sleeplessbeastie eu 2021 03 17 how to use compressed ram based block devices 当android设备有相对充足的ram xff0c 就可
  • 一文彻底搞懂webpack devtool

    为什么需要Source Map 首先根据谷歌开发者文档的介绍 xff0c Source Map一般与下列类型的预处理器搭配使用 xff1a 转译器 xff08 Babel xff09 编译器 xff08 TypeScript xff09 M
  • DOCKER默认虚拟网卡IP地址与局域网冲突解决

    一 背景 docker启动时默认会创建一个docker0网桥 xff0c 它在内核层连通了其他的物理或虚拟网卡 xff0c 相当于将所有容器和其主机都放到同一个网络 但是部署在内网中的IP段存在有此网段的IP时 xff0c 会导致冲突 xf
  • 学习Hadoop MapReduce与WordCount例子分析

    MapReduce框架一直围绕着key value这样的数据结构 xff0c 下面以官方自带的WordCount为例子 xff0c 自己分析MapReduce的工作机制 MapReduce可以分为Map和Reduce过程 xff0c 代码实