第一关:成绩统计
任务描述
本关任务:使用Map/Reduce
计算班级中年龄最大的学生。
相关知识
为了完成本关任务,你需要掌握:1.什么是MapReduce
,2.如何使用MapReduce
进行运算。
什么是MapReduce
MapReduce
是一种可用于数据处理的编程模型,我们现在设想一个场景,你接到一个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T
,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。
如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map
),处理完毕之后发送到一台机器上进行合并(merge
),再计算合并之后的数据,归纳(reduce
)并输出。
这就是一个比较完整的MapReduce
的过程了。
如何使用MapReduce进行运算
我们通过一个示例,来体验Map/Reduce
的使用。
我们从一个问题入手:目前我们想统计两个文本文件中,每个单词出现的次数。
首先我们在当前目录下创建两个文件:
创建file01
输入内容:
mkdir input
echo "Hello World Bye World" >> input/file01
cat input/file01
创建file02
输入内容:
touch file02
echo "Hello Hadoop Goodbye Hadoop" >> input/file02
cat input/file02
编写文件mapper.py
和reducer.py
:
vim mapper.py
输入字母i,进入编辑状态,输入以下内容:
#! /usr/bin/python3
import sys
def main():
# 从标准输入流中接受数据行,对每一行调用mapper函数来处理
for line in sys.stdin:
line = line.strip()
mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
words = line.split(' ')
for word in words:
if len(word.strip()) == 0:
continue
print("%s\t%s" % (word, 1))
if __name__ == '__main__':
main()
保存文件并退出编辑 1.敲“ESC”键,编辑状态; 2.输入冒号“:”然后输入小写字母"wq",并敲回车键,就可以保存文件并返回命令行了。
继续用上面的方法编写文件reducer.py
,内容如下:
#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
print("%s\t%s" % (k, sum(values)))
def main():
current_key = None
values = []
_key, _value = '', 0
for line in sys.stdin:
line = line.strip()
_key, _value = line.split('\t', 1)
_value = eval(_value)
if current_key == _key:
values.append(_value)
else:
if current_key:
reducer(current_key, values)
values = []
values.append(_value)
current_key = _key
# 不要忘记最后一个键值对
if current_key == _key:
reducer(current_key, values)
if __name__ == '__main__':
main()
保存并退出。
调试程序
hadoop-streaming可以在本地调试运行,也可以在hadoop集群上运行。一般在本地调试通过后在放到集群运行。
本地调试
在命令行输入以下命令即可在本地调试:
cat input/file* |python3 mapper.py |sort -k1,1|python3 reducer.py
其中cat是显示文件内容; sort -k1,1完成maprduce的排序功能,表示对内容按照第一列排序。
集群调试
接下来在hadoop集群上运行次程序:
# 启动hadoop集群:start-all.sh
# 在集群上创建文件夹,并上传数据文件至集群
hadoop fs -mkdir /user/test
hadoop fs -mkdir /user/test/input
hadoop fs -put input/file01 /user/test/input
hadoop fs -put input/file02 /user/test/input
# 删除目标目录
hadoop fs -rm -r /user/test/output
# 执行程序,以下10-15行是一条命令,\是命令的换行符,应该一起输入。
hadoop jar \
/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.4.jar \
-mapper mapper.py \
-reducer reducer.py \
-input /user/test/input/ \
-output /user/test/output > log.txt
# 查看生成的文件
hadoop fs -ls /user/test/output
# 如果程序运行成功,应该生成_SUCCESS和part-00000两个文件,第一个表示运行成功,第二个就是生成的输出。
#可以使用cat命令查看:
hadoop fs -cat /user/test/output/part-00000
# 如果没有得到正确结果或没有生成输出文件,可以通过log.txt来查错:
cat log.txt | more
# 最后删除上传的文件和生成的结果
hadoop fs -rm -r /user/test/input
hadoop fs -rm -r /user/test/output
代码解释
示例中,Map/Reduce
程序总共分为两块即:Map,Reduce
,Map
负责处理输入文件的内容。
mapper
方法,它以空格为分隔符将一行切分为若干tokens
,之后,输出< <word>, 1>
形式的键值对。
对于示例中的第一个输入,map
输出是: < Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个输入,map
输出是: < Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
第一个map
的输出是: < Bye, 1>
< Hello, 1>
< World, 2>
第二个map
的输出是: < Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
reduce
收到的数据是这样的:
< Bye , [1]>
< GoodBye , [1]>
< Hadoop , [1,1]>
< Hello , [1,1]>
< World , [1,1]>
Reducer
中的reduce
方法 仅是将每个key
(本例中就是单词)出现的次数求和。 hadoop-streaming
只来自各个mapper
的键值对按照键排序,不会合并,因此我们需要自己将排序后的键值对合并成<键, 值列表>
的形式后再发给reduce
程序处理,recuder.py
中的main()
就是为了实现这个功能。
因此这个作业的输出就是: < Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
请同学们先熟悉上面的流程,然后再进入下面的任务。
编程要求
使用MapReduce
计算班级每个学生的最好成绩,输入文件路径为/user/test/input
,请将计算后的结果输出到/user/test/output/
目录下。
测试说明
输入文件在你每次点击评测的时候,平台会为你创建,无需你自己创建,只需要启动HDFS
,编写python
代码即可。
输入文件的数据格式如下: 张三 12
李四 13
张三 89
李四 92
...
依照如上格式你应该输出:
张三 89
李四 92
答案:
mapper.py:
def mapper(line):
########## begin ############
group = line.split('\\n')
for people in group:
if len(people.strip()) == 0:
continue
name, age = people.split(' ')
########### End ################
mapper函数输入的是一行一行的数据,对每一行数据用‘\n’进行分割得到的是<姓名 年龄>这样的字符串,再次分割得到name = 姓名,age = 年龄 ,输出。
reducer.py:
# 找出values的最大值,并按name\tmax_age的形式输出。
def reducer(k, values):
##### Begin #########
print("%s\t%s" % (k,max(values)))
##### End #########
reducer函数的输入是<姓名,年龄数组>,要求最大的年龄只需要用max(values)即可,然后输出。
第二关:文件内容合并去重
任务描述
本关任务:使用Map/Reduce
编程实现文件合并和去重操作。
相关知识
通过上一小节的学习我们了解了MapReduce
大致的使用方式,本关我们来了解一下Mapper
类,Reducer
类和调用方法。
编程要求
接下来我们通过一个练习来巩固学习到的MapReduce
知识吧。
对于两个输入文件,即文件file1
和文件file2
,请编写MapReduce
程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3
。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:
- 第一列按学号排列;
- 学号相同,按
x,y,z
排列; - 输入文件路径为:
/user/tmp/input/
; - 输出路径为:
/user/tmp/output/
。
注意:输入文件后台已经帮你创建好了,不需要你再重复创建。
测试说明
程序会对你编写的代码进行测试: 输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。 下面是输入文件和输出文件的一个样例供参考。
输入文件file1
的样例如下: 20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件file2
的样例如下: 20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根据输入文件file1
和file2
合并得到的输出文件file3
的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
mapper.py:
def mapper(line):
########## Begin ###############
items = line.split('\\n')
for item in items:
key,value = item.split()
print("%s\t%s" % (key,value))
########### End #############
mapper函数中,因为数据是一行一行的处理,没有办法实现去重,所以去重的任务只能由reducer函数来完成。
reducer.py
def reducer(k, values):
############ Begin ################
value = sorted(list(set(values)))
for v in value:
print("%s\t%s" % (k,v))
############ End ################
mapper函数中,因为数据是一行一行的处理,没有办法实现去重,所以去重的任务只能由reducer函数来完成。
第三关:信息挖掘 - 挖掘父子关系:
任务描述
本关任务:对给定的表格进行信息挖掘。
编程要求
你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:
- 孙子在前,祖父在后;
- 输入文件路径:
/user/reduce/input
; - 输出文件路径:
/user/reduce/output
。
测试说明
程序会对你编写的代码进行测试: 下面给出一个child-parent
的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。
输入文件内容如下: child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
输出文件内容如下:
grand_child grand_parent
Mark Jesse
Mark Alice
Philip Jesse
Philip Alice
Jone Jesse
Jone Alice
Steven Jesse
Steven Alice
Steven Frank
Steven Mary
Jone Frank
Jone Mary
mapper.py
def mapper(line):
############### Begin ############
items = line.split("\\n")
for item in items:
child,parent = item.split()
print("%s\t%s" % (child,"p-"+parent))
print("%s\t%s" % (parent,"c-"+child))
############### End #############
由于mapper只能一行一行的处理数据,所以按照<子女,父母><父母,子女>的形式进行两次输出,并且在第二个值前加上‘c-’或‘p-’以区分这个人到底是父母还是子女。
reducer.py
def reducer(k, values):
############## Begin ################
grandc = []
grandp = []
for value in values:
if value[:2] == 'c-':
grandc.append(value[2:])
elif value[:2] == 'p-':
grandp.append(value[2:])
for c in grandc:
for p in grandp:
print("%s\t%s" % (c,p))
############## End #################
reducer函数进行的工作就是将得到的数据进行整合,根据数据前面的‘c-’,‘p-’来判断这个人是父母还是子女,父母放入grandp列表中,子女放入grandc列表中(放入列表时不要忘记去掉前面的‘c-’和‘p-’),最后将其对应的关系输出。
这就是本次作业所有的内容了,希望大家能从中获得一些什么。蟹蟹
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)