1,概念
mapreduce 运算框架主要实现hadoop 的数据处理,数据处理中 流经过5个节点。
数据流:input -> spilt -> map -> shuffle -> reduce(最后reduce输出)
1.1,input:是将被运算的数据切成默认64M的快(block),方便后续运算。
1.2,split:切片,将 input 中的快按照行切成片(键值对),方便后续 map 运算。
1.3,map(开发):
对 split 的片(行)进行数据处理,处理成键值对。
workcount map 数据处理:将每行拆分成每一个单词作为输出键,个数设置为1 作为输出值
1.4,shuffle:
将所有的 map 运算结果都重新按照键分组,输出键值对。
wordcount 中 shuffle 将 map 的相同键的数据合并成一条,值是一个固定值为1的数组。
1.5,reduce(开发):
将shuffle 的结果集做数据处理。
wordcount 的数据处理:将键对应的值(值为1的数字)做累加,既得出我们的每个单词出现个数。
1.6,输出(output)
2,maprecue 开发
准备工作:
1)新建 Map/Reduce Project 项目 new -> project -> Map/Reduce Project :wordcountdemo
2)增加配置文件 core-site.xml,log4j.xml
新建 resource 文件夹放入配置文件:项目 -> new -> Source Folder
3)新建一个class WordCountJob(开发 map,开发 reduce,)
package org.kgc1803.demo;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 词频
* @author hduser
*
*/
public class WordCountJob {
/**
* 开发 map
* @author hduser
* 第一个参数:map 输入键的类型
* 第二个参数:map 输入值的类型
* 第三个参数:map 输出键的类型
* 第四个参数:map 输出值的类型
*/
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
/**
* 每一个切片会执行一次map方法
* wordcount map 把 malue 拆分成单个单词
* keyIn:是每一行的键
* valueIn:是每一行的值
* context:是上下文容器,用于将 map 的结果输出到下一步
* @throws InterruptedException
* @throws IOException
*/
public void map(Object keyIn,Text valueIn,Context context) throws IOException, InterruptedException{
//固定值1,作为输出值
IntWritable valueOut = new IntWritable(1);
Text keyOut = null;
//按空格,逗号,~~ 拆分
StringTokenizer token = new StringTokenizer(valueIn.toString());
//按照迭代器用法使用
while(token.hasMoreTokens()){
String key = token.nextToken();
keyOut = new Text(key);
context.write(keyOut, valueOut);
}
}
}
//开发reduce
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text keyIn,Iterable<IntWritable> valuesIn,Context context) throws IOException, InterruptedException{
Text keyOut = keyIn;
//输出值
IntWritable valueOut = new IntWritable();
int sum = 0;
//循环混洗后的数字数组,如[1.1.1.1.1]
for(IntWritable val:valuesIn){
sum += val.get(); //做累加
}
//将累加的结果转化为IntWritable
valueOut.set(sum);
//输出到下一步
context.write(keyOut, valueOut);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
//创建job 执行 job
//加载 hdfs 配置文件(配置 hdfs 访问入口)
Configuration conf = new Configuration();
//创建一个 job 并设置 job(运算作业)的主启动类
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountJob.class);
//设置 job 的 map 自定义静态类
job.setMapperClass(WordCountMapper.class);
//设置 job 的 reduce 自定义静态类
job.setReducerClass(WordCountReducer.class);
//配置最终输出(reduce)的输出键和值的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//mapreduce 作业需要的资源位置(总输入位置)
Path inputPath1 = new Path("hdfs://node1:9000/input/zsc/*.txt");
//Path inputPath2 = new Path("hdfs://node1:9000/input/zsc/2.txt");
FileInputFormat.addInputPath(job, inputPath1);
//FileInputFormat.addInputPath(job, inputPath2);
//mapreduce 作业结果的保存位置(总输出位置)
Path outputPath = new Path("hdfs://node1:9000/output/wc7");
FileOutputFormat.setOutputPath(job, outputPath);
//启动
System.exit(job.waitForCompletion(true)?0:1);
}
}
2.1,map 开发
map要求:
1)静态类
2)继承 hadoop 的 Mapper 父类
3)重写 map() 方法
2.2,reduce 开发
reduce 要求:
1)静态类
2)继承 hadoop 的 Reducer 父类
3)重写 reduce() 方法
2.3,创建并启动 job
1)加载 hdfs 配置文件(配置 hdfs 访问入口)
2)创建一个 job 并设置 job(运算作业)的主启动类
3)设置 job 的 map 自定义静态类
4)设置 job 的 reduce 自定义静态类
5)配置最终输出(reduce)的输出键和值的类型
6)mapreduce 作业需要的资源位置(总输入位置)
7)mapreduce 作业结果的保存位置(总输出位置)
8)启动
3,hdfs 的数据类型
1)字符串 Text,等同于 java 中的字符串,在hdfs 中Text类型是字节文件
Text -> String:
Text t 转换成 String: t.toString();
String -> Text:
Text t = new Text(字符串);
2)整型数字 IntWriteable,等同于 java 的 Integer
IntWriteable转 int:
IntWriteable a;
int b = a.get();
int 转 IntWriteable:
IntWriteable a = new IntWriteable(数字);
或
IntWriteable a =new IntWriteable();
a.set(数字);
3)长整型 LongWritable ,等同于java 的 Long
4,mapreduce 优化
1,小项目
音乐排行榜播放记录数据格式:
记录id | 记录id | 播放时间 | 歌手 |
1001 | music1 | 20180101 | sing1 |
1002 | music2 | 20180301 | sing2 |
1003 | music3 | 20180211 | sing1 |
1004 | music2 | 20180709 | sing2 |
1005 | music2 | 20180621 | sing2 |
1006 | music1 | 20180511 | sing1 |
统计信息:
1)如:
歌名 | 播放次数 |
music1 | 2 |
music2 | 3 |
music3 | 1 |
2)按照播放次数降序来显示步骤1的统计信息
歌名 | 播放次数 |
music2 | 3 |
music1 | 2 |
music3 | 1 |
2,排序
2.1,概念
mapreduce 的排序指的主要用于 mapreduce 中 key 的排序,默认按照 key升序排序,排序主要用于最终输出之前。
2.2,实现排序
2.2.1,定义一个排序类
1)定义一个静态排序类,继承需要的排序父类(我们样例使用的是数字排序父类)
2)重写排序父类中的核心方法
//排序,继承整型的排序
public static class MusicCountDesc extends IntWritable.Comparator{
//重写两个排序
public int compare(Object a,Object b){
//对比,默认是升序,- 取反
return -super.compare(a, b);
}
public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
2.2.2,配置排序类到job
//排序不用默认排序而是用我们自定义的排序类
scjob.setSortComparatorClass(MusicCountDesc.class);
3,SequenceFileOutputFormat 和 SequenceFileInputFormat
3.1,SequenceFileOutputFormat
主要用于 mapreduce 最终输出阶段 设置 输出的格式类型为SequenceFile 类型(字节对象型),类似于 ObjectOutputStream(对象输出流)
3.2,SequenceFileInputFormat
主要用于 mapreduce 录入的数据是 SequenceFile 类型,该类型数据必须设置 job 的 inputFormat 为SequenceFileInputFormat。
4,Combiner(合并)
4.1,概念:
map处理后的数据直接混洗,网络成本太大,如有1亿条数据,就需要将处理过的这1亿数据都传输给混洗节点。如果将map 处理后的数据做一下合并后,总数据量就会少很多,节省了网络开支。
4.2,语法:
//一般配置的都是reduce处理类
job.setCombinerClass(WordCountReducer.class);
5,Partitioner(分区)
5.1,概念
mapreduce job 混洗的数据交给 reduce task(任务),默认只有一个分区,所以只有一个 reduce task,一个 reduce task 产生一个输出文件。reduce 之前分区
5.2,语法
1)定义一个静态分区类
//分区
public static class MyPartitioner extends Partitioner<IntWritable, Text>{
public int getPartition(IntWritable key, Text arg1, int arg2) {
//key 值>=2
if(key.get() >= 2){
return 0;
}else {
return 1;
}
}
}
2)将自定义的静态分区类配置给job
3)设置 job 的 reduce task数量(即有几个分区)
//配置分区
scjob.setPartitionerClass(MyPartitioner.class);
//设置 reduce 数量
scjob.setNumReduceTasks(2);
6,job 和 task
一个job(作业)-> 多个map task(任务)-> shuffle -> reduce
每一个 reduce 一个输出文件
7,优化总代码如下:
package org.kgc1803.demo;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class WordCount {
/**
* 开发 map
* @author hduser
* 第一个参数:map 输入键的类型
* 第二个参数:map 输入值的类型
* 第三个参数:map 输出键的类型
* 第四个参数:map 输出值的类型
*/
public static class CountMapper extends Mapper<Object, Text, Text, IntWritable>{
/**
* 每一个切片会执行一次map方法
* wordcount map 把 malue 拆分成单个单词
* keyIn:是每一行的键
* valueIn:是每一行的值
* context:是上下文容器,用于将 map 的结果输出到下一步
* @throws InterruptedException
* @throws IOException
*/
int count = 0;
public void map(Object keyIn,Text valueIn,Context context) throws IOException, InterruptedException{
//固定值1,作为输出值
IntWritable valueOut = new IntWritable(1);
Text keyOut = null;
//按空格,逗号,~~ 拆分
StringTokenizer token = new StringTokenizer(valueIn.toString());
//按照迭代器用法使用
// while(token.hasMoreTokens()){
// String key = token.nextToken();
// keyOut = new Text(key);
// context.write(keyOut, valueOut);
// }
String[] str = valueIn.toString().split("\\s+");
if(str.length>=3 && count>0){
context.write(new Text(str[1]), valueOut);
}
count++;
}
}
//开发reduce
public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text keyIn,Iterable<IntWritable> valuesIn,Context context) throws IOException, InterruptedException{
Text keyOut = keyIn;
//输出值
IntWritable valueOut = new IntWritable();
int sum = 0;
//循环混洗后的数字数组,如[1.1.1.1.1]
for(IntWritable val:valuesIn){
sum += val.get(); //做累加
}
//将累加的结果转化为IntWritable
valueOut.set(sum);
//输出到下一步
context.write(keyOut, valueOut);
}
}
//反转键值
public static class SoutMusicMapper extends Mapper<Text, IntWritable , IntWritable, Text>{
public void map(Text keyIn,IntWritable valueIn,Context context) throws IOException, InterruptedException{
context.write(valueIn, keyIn);
}
}
//排序,继承整型的排序
public static class MusicCountDesc extends IntWritable.Comparator{
//重写两个排序
public int compare(Object a,Object b){
//对比,默认是升序,- 取反
return -super.compare(a, b);
}
public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
//分区
public static class MyPartitioner extends Partitioner<IntWritable, Text>{
public int getPartition(IntWritable key, Text arg1, int arg2) {
//key 值>=2
if(key.get() >= 2){
return 0;
}else {
return 1;
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
//创建job 执行 job
//加载 hdfs 配置文件(配置 hdfs 访问入口)
Configuration conf = new Configuration();
//创建一个 job 并设置 job(运算作业)的主启动类
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
//设置 job 的 map 自定义静态类
job.setMapperClass(CountMapper.class);
//设置 job 的 reduce 自定义静态类
job.setReducerClass(CountReducer.class);
//配置最终输出(reduce)的输出键和值的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//配置combin
job.setCombinerClass(CountReducer.class);
//配置分区
//job.setPartitionerClass(MyPartitioner.class);
//mapreduce 作业需要的资源位置(总输入位置)
Path inputPath1 = new Path("hdfs://node1:9000/input/zsc/3.txt");
//Path inputPath2 = new Path("hdfs://node1:9000/input/zsc/2.txt");
FileInputFormat.addInputPath(job, inputPath1);
//FileInputFormat.addInputPath(job, inputPath2);
//mapreduce 作业结果的保存位置(总输出位置)
Path outputPath = new Path("hdfs://node1:9000/output/wc5");
if(FileSystem.get(conf).exists(outputPath)){
FileSystem.get(conf).delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job, outputPath);
//设置输出内容是sequence 类似与 对象输出流
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//启动
//System.exit(job.waitForCompletion(true)?0:1);
if(job.waitForCompletion(true)){
//创建一个 job 并设置 job(运算作业)的主启动类
Job scjob = Job.getInstance(conf);
scjob.setJarByClass(WordCount.class);
//设置 job 的 map 自定义静态类
scjob.setMapperClass(SoutMusicMapper.class);
scjob.setInputFormatClass(SequenceFileInputFormat.class);
//配置最终输出(reduce)的输出键和值的类型
scjob.setOutputKeyClass(IntWritable.class);
scjob.setOutputValueClass(Text.class);
//mapreduce 作业需要的资源位置(总输入位置)
FileInputFormat.addInputPath(scjob, outputPath);
//mapreduce 作业结果的保存位置(总输出位置)
Path outputPath1 = new Path("hdfs://node1:9000/output/wc4");
if(FileSystem.get(conf).exists(outputPath1)){
FileSystem.get(conf).delete(outputPath1,true);
}
FileOutputFormat.setOutputPath(scjob, outputPath1);
//排序不用默认排序而是用我们自定义的排序类
scjob.setSortComparatorClass(MusicCountDesc.class);
//配置分区
scjob.setPartitionerClass(MyPartitioner.class);
//设置 reduce 数量
scjob.setNumReduceTasks(2);
System.exit(scjob.waitForCompletion(true)?0:1);
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)