MapReduce编程之连接Join

2023-05-16

------------本文笔记整理自《Hadoop海量数据处理:技术详解与项目实战》范东来

一、设计思路

HDFS上存放两个文件,一个记录了学生基本信息(姓名,学号),文件名“student_info.txt”,文件内容为:

Jenny    00001

Hardy    00002

Bardley  00003

...

另一个文件记录了学生的选课信息(学号,课程名),文件名“student_class_info.txt”,文件内容为:

00001   Chinese

00001   Math

00002   Music

00002   Math

00003   Physic

...

 现在要对这两个文件进行join操作,得到结果为:

Jenny    Chinese

Jenny    Math

Hardy    Music

Hardy    Math

Bardley  Physic

...

该操作和SQL中的join操作类似,具体实现方式:先在map阶段读入student_info.txt和student_class_info.txt两个文件,并将学号作为输出键,姓名或课程在附加上文件名后作为输出值;再在reduce阶段对map中间结果进行笛卡尔乘积。

二、代码实现

1.Mapper类

package com.hadoop.mr.join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/*
 * JoinMapper类
 * map输入信息:
 * 文件1:student_info.txt
 * Jenny    00001
 * Hardy    00002
 * Bradley  00003
 * ...
 * 文件2:student_class_info.txt
 * 00001    Chinese
 * 00001    Math
 * 00002    Music
 * 00002    Math
 * 00003    Physic
 * ...
 * Mapper处理后输出的中间结果:
 * 00001    Jenny	student_info.txt
 * 00001    Chinese	student_class_info.txt
 * 00001    Math	student_class_info.txt
 * 00002    Hardy	student_info.txt
 * 00002    Music	student_class_info.txt
 * 00002    Math	student_class_info.txt
 * 00003    Bradley	student_info.txt
 * 00003    Physic	student_class_info.txt
 */
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{

	//定义文件名称标识
	private static final String LEFT_FILENAME = "student_info.txt";
	private static final String RIGHT_FILENAME = "student_class_info.txt";
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		//从输入分片信息中取得文件路径
		//FileSplit 是 抽象类InputSplit 的实现类,记录了文件的具体切片信息。
		String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
		//文件标识
		String fileFlag = null;
		//输出键(学号)
		String outKey = null;
		//输出值(姓名 或 课程)
		String outValue = null;
		//行记录的信息
		String[] infos = value.toString().split(" ");
		
		//判断行记录所来自的文件
		if (filePath.contains(LEFT_FILENAME)) {
			fileFlag = LEFT_FILENAME;
			outKey = infos[1];
			outValue = infos[0];
		} 
		else if (filePath.contains(RIGHT_FILENAME)) {
			fileFlag = RIGHT_FILENAME;
			outKey = infos[0];
			outValue = infos[1];
		}
		
		//输出键值对,并在值上标记文件名
		context.write(new Text(outKey), new Text(outValue + "\t" + fileFlag));
	}
	
}

 2.Reducer类

package com.hadoop.mr.join;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * JoinReducer类
 * Reducer的输入数据(Mapper的输出中间结果):
 * 00001    Jenny	student_info.txt
 * 00001    Chinese	student_class_info.txt
 * 00001    Math	student_class_info.txt
 * 00002    Hardy	student_info.txt
 * 00002    Music	student_class_info.txt
 * 00002    Math	student_class_info.txt
 * 00003    Bradley	student_info.txt
 * 00003    Physic	student_class_info.txt
 * ...
 * Reducer的输出结果(join结果)
 * Jenny    Chinese
 * Jenny    Math
 * Hardy    Music
 * Hardy    Math
 * Bardley  Physic
 * ...
 * 
 */
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
	
	//定义文件名称标识
	private static final String LEFT_FILENAME = "student_info.txt";
	private static final String RIGHT_FILENAME = "student_class_info.txt";
	
        private static int num = 0;

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		
                //计数reduce调用次数,输出key
		{
			num++;
			System.out.println(num + " " +key);
		}

		//学生姓名
		String studentName = null;
		//学生课程名数组
		List<String> studentClassNames = new ArrayList<String>();
		
		//根据文件名标识信息,将姓名、课程归类
		for (Text value : values) {
			String[] infos = value.toString().split("\t"); 
			if(LEFT_FILENAME.equals(infos[1])) {
				studentName = infos[0];
			}
			else if (RIGHT_FILENAME.equals(infos[1])){
				studentClassNames.add(infos[0]);
			}
		}
                //去除无法建立内连接的信息
		if (studentName == null || studentClassNames.size() == 0) {
			return;
		}
		
		//将姓名-课程 键值对遍历输出
		for (int i = 0; i < studentClassNames.size(); i++) {
			context.write(new Text(studentName), new Text(studentClassNames.get(i)));
		}
		
	}

}

3.驱动类

package com.hadoop.mr.join;

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * join的驱动类
 */
public class MR_Join {

	public static void main(String[] args) throws IOException,
		InterruptedException, ClassNotFoundException {
		
		//加载hadoop配置信息
		Configuration conf = new Configuration();
		
		//初始化作业信息
		Job job = Job.getInstance(conf, "MR_Join");
		job.setJarByClass(MR_Join.class);
		//设置Mapper/Reducer类型
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);
		//设置输出键值对类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
                //设置reduce任务个数(即分区数上限)
		//job.setNumReduceTasks(2);
		//设置文件输入/输出路径
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//提交作业
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

三、打包Jar,并运行程序

1.将com.hadoop.mr.join包右键导出为 JAR file,命名为:"MR_Join.jar";
2.利用Windows的cmd或者PowerShell(推荐)将JAR文件上传到Linux服务器
  命令如下:(在JAR文件目录下执行)
  > scp MR_Join.jar root@remoteIP:~/myJars/mapreduce/
 (其中remoteIP为远程服务器IP)
3.启动hadoop
  --创建学生信息输入文件
  > cd ~/myJars/mapreduce/
  > touch student_info.txt
  > vi student_info.txt
  按键"i",进入编辑模式,向student_info.txt文件中输入内容,如下:
  Jenny    00001
  Hardy    00002
  Bradley  00003
  按键"ESC"-->"shift q"-->输入"wq!",回车,保存
  --查看文件
  > cat student_info.txt
  --创建学生选课文件 
  > touch student_class_info.txt
  > vi student_class_info.txt
  按键"i",进入编辑模式,向student_class_info.txt文件中输入内容,如下:
  00001    Chinese
  00001    Math
  00002    Music
  00002    Math
  00003    Physic
  按键"ESC"-->"shift q"-->输入"wq!",回车,保存
  --查看文件
  > cat student_class_info.txt
  --在HDFS中创建输入文件目录
  > hadoop fs -mkdir /user/hadoop/joininput
  --在HDFS中查看输入文件目录
  > hadoop fs -ls /user/hadoop/joininput
  --将本地的两个文件拷贝到HDFS的输入目录中(在"~/myJars/mapreduce/"下执行)
  --可以多个文件一起传输
  > hadoop fs -copyFromLocal student_info.txt student_class_info.txt /user/hadoop/joininput/
4.执行JAR,运行程序
  命令如下:(在JAR文件目录"~/myJars/mapreduce/"下执行)
  > hadoop jar MR_Join.jar com.hadoop.mr.join.MR_Join /user/hadoop/joininput /user/hadoop/joinoutput
  运行过程中,屏幕会输出执行过程,直到完成
5.查看单词统计结果
  成功执行完后,目录"/user/hadoop/joinoutput/"下会产生两个文件
  /user/hadoop/joinoutput/_SUCCESS    --成功执行完的空标识文件
  /user/hadoop/joinoutput/part-r-00000 --作业输出结果文件
  --查看输出文件
  > hadoop fs -cat /user/hadoop/joinoutput/part-r-00000
  Jenny    Chinese
  Jenny    Math
  Hardy    Music
  Hardy    Math
  Bardley  Physic
  <此即为join结果>

四、结果分析 

1.查看日志文件,发现:
  MapReduce任务为两个输入文件各创建了一个Mapper类来读入数据。
  一共2个map任务(对应2个文件分片),1个reduce任务(默认)。
  (因为两个文件都很小,所以一个文件就是一个输入切片)
  注:reduce任务可通过job.setNumReduceTasks(2);来设置。
2.如果在MR_Join类中增加代码 job.setNumReduceTasks(2);,将reduce任务设置成2个(默认1个),
  那么,会发现,输出结果文件变成了两个part-r-00000和part-r-00001
  文件part-r-00000中:(分区一)
  Jenny    Math
  Jenny    Chinese
  Bardley  Physic
  文件part-r-00001中:(分区二)
  Hardy    Math
  Hardy    Music
  可见 1.reduce任务个数是分区个数的上限
       (因为求解分区时hadoop默认对key进行hash分区,对reduce个数求余)
      2.一个reduce任务对应一个输出文件
3.通过在JoinReducer类中增加reduce方法调用次数的计数输出代码
  (注:程序的标准输出和错误输出不会在控制台上打印,会分别输出到对应任务的日志文件stdout和stderr中)
  并在下列两个日志(最后两个日志是reduce任务日志)中
http://master:50070/logs/userlogs/application_id_0005/container_id_0005_01_000004/stdout
    输出:
    1 00001
    2 00003
http://master:50070/logs/userlogs/application_id_0005/container_id_0005_01_000005/stdout
    输出:
    1 00002
  可见:同一个分区中是包含不同key的记录的,Reducer任务会为每一个key调用一次reduce方法。

五、拓展链接(不同实现方式)

1.MapReduce:实现join的几种方法

2.MapReduce实现join操作

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce编程之连接Join 的相关文章

  • Jetson带CUDA编译的opencv4.5安装教程与踩坑指南,cmake配置很重要!

    环境 xff1a Jetson NX 43 Jetpack4 4 43 Ubuntu18 04 0 jtop前后对比1 安装教程2 踩坑指南2 1 cv2 error OpenCV 4 4 0 tmp pip build 2do2xloi
  • C++ 获取系统当前时间

    C 43 43 获取系统当前时间 c 43 43 time函数 C 43 43 的日期和时间函数获取系统当前时间实例大体思路具体使用与解析四 回顾与梳理 c 43 43 time函数 C 43 43 的日期和时间函数 C 43 43 标准库
  • KAFKA结构图

    转载 https blog csdn net sillyzhangye article details 86181323 utm medium 61 distribute pc relevant none task blog BlogCom
  • vue-企业微信绑定和解绑

    一 企业微信绑定解绑 项目中需要对账号进行企业微信的绑定和解绑 企业微信页面显示 xff1a span class token operator lt span FormItem label span class token operato
  • cannot currently show the desktop 树莓派 vnc viewer 无法显示的解决方法

    文章由来 2021 04 24 xff0c 周六 xff0c 一个人在家没什么事情 xff0c 突然发现了自己的树莓派4B xff0c 那个时候就买来的时候用了一会 xff0c 今天突然想用它搭一个服务器 那个时候玩的时候资料没有整理 xf
  • AI就是闭上眼想要一份凉皮,睁开眼就会有一份凉皮摆在眼前

    回答这个问题之前 xff0c 先听下这段对话 xff1a 你愿意让别人帮你干活吗 xff1f 愿意 xff01 那么你愿意让别人干你的活吗 xff1f 愿意 最后问你 xff0c 你愿意让别人替代你吗 xff1f 作为一个对AI没有过深的了
  • 什么是主线程?

    主线程 是执行主 main 方法的线程 单线程程序 java程序中只有一个线程 执行从main方法开始 从上到下依次执行 主线程的概念 JVM执行main方法 main方法会进入到栈内存 JVM会找操作系统开辟一条main方法通向cpu的执
  • Python爬虫Xpath方法返回值有[]引号等问题解决方法

    在搞Python爬虫的时候会使用到Xpath方法 xff0c 比如在爬取某个链接的时候返回值会是 39 http www baidu com demo jpg 39 示例链接 而后面的代码会因为有这几个字符串而受到影响 解决方法 xff1a
  • error: no configuration has been provided, try setting KUBERNETES_MASTER environment variable

    k8s报error no configuration has been provided try setting KUBERNETES MASTER environment variable错误解决方法1 18 1版本 使用vi编辑器打开
  • ubuntu 安装PHP+Apache

    安装Apache sudo apt install apache2 安装PHP 7 4 Ubuntu 20 04默认安装的版本 sudo apt install php libapache2 mod php 安装完成重启Apache服务 s
  • ASCII码判断大小

    ASCII码的值大小排列 xff1a a z gt A Z gt 0 9 在小写字母中z最大 xff0c a最小 在大写字母中Z最大 xff0c A最小 在0到9中 9最大 xff0c 0最小 故选D选项
  • 二进制运算相关的题目

    将二进制数01011010 转换成十进制数 xff0c 然后将十进制数 x 2 得到的值将其化为二进制数 01011010 十进制数为90 180 二进制数为10110100 故选C选项
  • 重装Ubuntu系统及系列软件安装

    重装Ubuntu系统及系列软件安装 1 安装ubuntu20 04 03系统下载Ubuntu20 04 03系统更换系统apt的源 2 CUDA和cudnn安装下载CUDA安装包并安装 3 pycharm的安装 4 conda的安装 5 兼
  • Qt中UI对象只能在主线程中操作,那如何在子线程中操作UI呢

    为什么在Qt中UI对象只能在主线程中操作 在Qt中 xff0c UI对象只能在主线程中操作 xff0c 因为Qt采用了事件循环机制 xff0c UI事件 xff08 如鼠标点击 键盘输入 xff09 都是在主线程中处理的 如果在子线程中直接
  • python学习记录

    python学习记录 python学习python运行顺序类的使用模块导入import不同级模块导入模块导入的重名问题 xff1a 注意的点 修饰器 正则表达式 python学习 本文是关于python的学习记录文章 比较基本 比较潦草 p
  • 案例二:基于MapReduce分词统计

    案例二 xff1a 基于MapReduce的分词统计 一 准备条件 xff1a 1 数据源 xff1a 六个 txt 诗经文件 2 Hadoop集群环境 五 功能实现 前提准备 xff1a 引入类库将Hadoop包解压 xff0c 并进入相
  • RHEL8.3 配置VNC 远程

    一 安装VNC 选取比较流行的tigervnc dnf install tigervnc span class token operator span server tigervnc span class token operator sp
  • 一次使用Dockerfile构建tomcat镜像时遇到的Linux内核问题

    当前版本 span class token punctuation span root 64 zyrox test alpine images span class token punctuation span span class tok
  • Java中Lambda表达式的使用

    Lambda表达式是Java SE 8中一个重要的新特性 lambda表达式允许你通过表达式来代替功能接口 lambda表达式就和方法一样 它提供了一个正常的参数列表和一个使用这些参数的主体 Lambda表达式还增强了集合库 Java SE
  • 用mscomm控件编写串口通信出现error reading comm device错误-已解决

    一 问题描述 用mscomm控件编写串口通信过程中 xff0c 在消息响应函数OnComm函数下接受串口发送过来的数据时 xff0c 在get Input 函数会出现error reading comm devece 错误 具体代码如下 x

随机推荐

  • 使用Spring Security后,页面iframe加载不出来

    错误 Refused to display http localhost 8080 console in a frame because it set X Frame Options to deny 解决办法 授权的时候开启iframe的加
  • 软件工程笔记八__面向对象

    1 面向对象方法四要点 xff08 1 xff09 对象 xff1a 客观世界有各种对象组成 xff0c 任何事物都是对象 xff0c 比如一支笔 xff0c 一张纸 xff0c 复杂的对象可由简单的对象以某种方式组合而成 对象分解取代功能
  • C++如何获取当前时间

    导读 文章首先介绍了使用C库的接口来获取当前时间的方法 xff0c 然后介绍了使用C 43 43 11标准库中的函数来获取当前时间的方法 此外 xff0c 文章还介绍了使用函数strftime来格式化时间字符串的方法 xff0c 并列举了常
  • Java笔记(一):volatile、synchronized关键字

    volatile关键字 volatile字面意思为易变的 不稳定的 xff0c 事实上也正是如此 这个关键字的作用就是告诉编译器 xff0c 只要是被此关键字修饰的变量都是易变的 xff0c 不稳定的 主要是volatile所修饰的变量是直
  • 读取cv.VideoCapture(0)的frame帧转变为PIL.Image图片格式时遇到的坑

    在项目中遇到一个问题是想将VideoCapture 读到的frame图片转变成PIL的Image图片格式 坑1 xff1a BGR还是RGB模式 xff1f 兴冲冲地在在网上找到一段代码 xff0c 将opencv中的imread 图像转成
  • linux命令记录

    linux三剑客 xff1a grep xff0c awk xff0c sed 1 grep命令 2 awk命令 linux中的awk命令是一种处理文本的工具 AWK命名来源于三位创始人的家族名称首字母 可以分行对文本进行处理 其命令格式如
  • [joysticker]使用Ubuntu读取USB手柄/方向盘的输出控制

    摘要 xff1a 在淘宝上买到的游戏手柄 USB卖家只给了Windows下的驱动 xff0c 本来以为Ubuntu下没有驱动 xff0c 没想到网上早已经有人用cpp开发出了USB手柄的驱动 xff0c 搜索很多博客的方法终于从手柄拿到数据
  • [numpy问题]The truth value of an array with more than one element is ambiguous.

    问题描述 xff1a 在进行Hough圆变换时 xff0c 需要输出一个圆的坐标 xff1a circles 61 cv2 span class hljs preprocessor HoughCircles span canny cv2 s
  • [PyQt5]点击主窗口弹出另一个窗口

    1 先使用Qt designer设计两个窗口 xff0c 一个是主窗口 xff0c 一个是子窗口 其中主窗口是新建 Main Window 子窗口是Dialog窗体 两个窗口不能是同一类型 否则会崩溃 并保存为EyeTracking mai
  • matlab的for循环

    https blog csdn net zhyoulun article details 78606382
  • 【总结】自然语言处理(NLP)算法:概述与分类

    摘要 xff1a NLP概述 主要参考自然语言处理 xff08 NLP xff09 知识结构总结和知乎上的一些问答 目录 NLP界神级人物NLP知识结构 1 概述2 形式语言与自动机3 语言模型4 概率图模型 xff0c 生成模型与判别模型
  • 【Win10】【开始菜单打不开】任务栏修复

    真是 xff0c 朋友说一句 xff0c 电脑出问题了你有本事别重装系统 我记住了 作为一个 强迫症患者 xff0c 最近电脑C盘内存快满以及翻墙代理污染了一部分ipv4网站就让我很毛 xff0c 很想重装系统 但是我忍住了 根据网上的教程
  • 「PyQt5」使用Qtdesigner设计好界面后写一个驱动程序

    使用Python写界面最方便的就是Pycharm 43 Qtdesigner 用Qt designer画用户界面比较容易 xff0c 保存成 ui文件然后转化成 py即可 这里 xff0c 我们展示下一步运行程序让这个界面显示 xff1a
  • 「git」Linux下将文件都上传到github上

    最近在整理自己平时写的一些代码 xff0c 第一选择就是上传到github上作为一个备份和说明 xff0c 防止自己遗忘 上次用版本控制已经是好几个月前了 xff0c 所以难免有些生疏 所以就从新按照CSDN大佬们的基础教程重新操作了一遍
  • 高质量嵌入式Linuxc编程

    第一天 xff1a 根目录 命令 注意 xff1a 命令和参数要区分开 xff0c 他们之间要有空格 cd 进入目录 ls 列举目录内容 ls a 列举所有文件包括隐藏的文件 所有隐藏的文件都是以点 xff08 xff09 开始的 ls l
  • 读写位宽不同的FIFO,数据输入输出顺序是怎么样的?BRAM又如何呢?

    原文地址 xff1a https wenku baidu com view 7d7cf156284ac850ac0242b6 html 对于BRAM xff1a 1 xff09 写位宽小于读位宽 xff1a 先入存低位 xff0c 后入存高
  • apache httpd在centos上手动安装

    Apache Bench手动安装 简介httpd及依赖包安装ab扩充最大并发量 简介 apache bench简称 xff08 ab xff09 可以做压力测试 xff0c 本文介绍手动安装方法 httpd及依赖包安装 以下包因为存在依赖关
  • Cartographer最新版完整安装教程(2020.8.7成功安装)

    2020 8 7更 xff1a 春节之后重装了系统 xff0c 重新安装Cartographer又遇到了困难 xff0c 发现之前的教程naive xff0c 历尽千辛万苦今天终于安装成功 xff0c 而且更加简单方便 xff0c 给大家作
  • 没有可用的软件包××,但是它被其他的软件包引用了——解决方法

    在ubuntu下安装gcc xff1a sudo apt install gcc 谁知这么简单的命令居然不成功 解决方法 xff1a sudo apt get update 待更新完毕后再次输入安装命令即可
  • MapReduce编程之连接Join

    本文笔记整理自 Hadoop海量数据处理 xff1a 技术详解与项目实战 范东来 一 设计思路 HDFS上存放两个文件 xff0c 一个记录了学生基本信息 xff08 姓名 xff0c 学号 xff09 xff0c 文件名 student