python mapreduce函数_用python写MapReduce函数——以WordCount为例

2023-05-16

尽管Hadoop框架是用java写的,可是Hadoop程序不限于java,能够用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。css

例子的目的是统计输入文件的单词的词频。html

输入:文本文件

输出:文本(每行包括单词和单词的词频,二者之间用'\t'隔开)

1. Python MapReduce 代码

使用python写MapReduce的“诀窍”是利用Hadoop流的API,经过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。java

咱们惟一须要作的是利用Python的sys.stdin读取输入数据,并把咱们的输出传送给sys.stdout。Hadoop流将会帮助咱们处理别的任何事情。python

1.1 Map阶段:mapper.pyshell

在这里,咱们假设把文件保存到hadoop-0.20.2/test/code/mapper.pysegmentfault

#!/usr/bin/env python

importsysfor line insys.stdin:

line=line.strip()

words=line.split()for word inwords:print "%s\t%s" % (word, 1)

文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出 1。在咱们的例子中,咱们让随后的Reduce阶段作统计工做。ruby

为了是脚本可执行,增长mapper.py的可执行权限app

chmod +x hadoop-0.20.2/test/code/mapper.py

1.2 Reduce阶段:reducer.py框架

在这里,咱们假设把文件保存到hadoop-0.20.2/test/code/reducer.py分布式

#!/usr/bin/env python

from operator importitemgetterimportsys

current_word=None

current_count=0

word=Nonefor line insys.stdin:

line=line.strip()

word, count= line.split('\t', 1)try:

count=int(count)exceptValueError: #count若是不是数字的话,直接忽略掉continue

if current_word ==word:

current_count+=countelse:ifcurrent_word:print "%s\t%s" %(current_word, current_count)

current_count=count

current_word=wordif word == current_word: #不要忘记最后的输出print "%s\t%s" % (current_word, current_count)

文件会读取mapper.py 的结果做为reducer.py 的输入,并统计每一个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增长reducer.py的可执行权限

chmod +x hadoop-0.20.2/test/code/reducer.py

细节:split(chara, m),第二个参数的做用,下面的例子很给力

str = 'server=mpilgrim&ip=10.10.10.10&port=8080'

print str.split('=', 1)[0] #1表示=只截一次print str.split('=', 1)[1]print str.split('=')[0]print str.split('=')[1]

输出

server

mpilgrim&ip=10.10.10.10&port=8080

server

mpilgrim&ip

1.3 测试代码(cat data | map | sort | reduce)

这里建议你们在提交给MapReduce job以前在本地测试mapper.py 和reducer.py脚本。不然jobs可能会成功执行,可是结果并不是本身想要的。

功能性测试mapper.py 和 reducer.py

[rte@hadoop-0.20.2]$cd test/code

[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py

foo1

foo1

quux1

labs1

foo1

bar1

quux1

[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py

bar1

foo3

labs1

quux2

细节:sort -k1,1  参数何意?

-k, -key=POS1[,POS2]     键以pos1开始,以pos2结束

有时候常常使用sort来排序,须要预处理把须要排序的field语言在最前面。实际上这是

彻底没有必要的,利用-k参数就足够了。

好比sort all

1 4

2 3

3 2

4 1

5 0

若是sort -k 2的话,那么执行结果就是

5 0

4 1

3 2

2 3

1 4

2. 在Hadoop上运行python代码

2.1 数据准备

下载如下三个文件的

我把上面三个文件放到hadoop-0.20.2/test/datas/目录下

2.2 运行

把本地的数据文件拷贝到分布式文件系统HDFS中。

bin/hadoop dfs -copyFromLocal /test/datas hdfs_in

查看

bin/hadoop dfs -ls

结果

drwxr-xr-x - rte supergroup 0 2014-07-05 15:40 /user/rte/hdfs_in

查看具体的文件

bin/hadoop dfs -ls /user/rte/hdfs_in

执行MapReduce job

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \-file test/code/mapper.py -mapper test/code/mapper.py \-file test/code/reducer.py -reducer test/code/reducer.py \-input /user/rte/hdfs_in/*-output /user/rte/hdfs_out

实例输出

f9369212fcec192d8181abe414a31f3c.png

查看输出结果是否在目标目录/user/rte/hdfs_out

bin/hadoop dfs -ls /user/rte/hdfs_out

输出

Found 2 items

drwxr-xr-x - rte supergroup 0 2014-07-05 20:51 /user/rte/hdfs_out2/_logs

-rw-r--r-- 2 rte supergroup 880829 2014-07-05 20:51 /user/rte/hdfs_out2/part-00000

查看结果

bin/hadoop dfs -cat /user/rte/hdfs_out2/part-00000

输出

237c6dc36ae82698634a77f5774f8fa1.png

以上已经达成目的了,可是能够利用python迭代器和生成器优化

3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码

3.1 python中的迭代器和生成器

3.2 优化Mapper 和 Reducer代码

mapper.py

#!/usr/bin/env python

importsysdefread_input(file):for line infile:yieldline.split()def main(separator='\t'):

data=read_input(sys.stdin)for words indata:for word inwords:print "%s%s%d" % (word, separator, 1)if __name__ == "__main__":

main()

reducer.py

#!/usr/bin/env python

from operator importitemgetterfrom itertools importgroupbyimportsysdef read_mapper_output(file, separator = '\t'):for line infile:yield line.rstrip().split(separator, 1)def main(separator = '\t'):

data= read_mapper_output(sys.stdin, separator =separator)for current_word, group ingroupby(data, itemgetter(0)):try:

total_count= sum(int(count) for current_word, count ingroup)print "%s%s%d" %(current_word, separator, total_count)exceptvalueError:pass

if __name__ == "__main__":

main()

细节:groupby

from itertools importgroupbyfrom operator importitemgetter

things= [('2009-09-02', 11),

('2009-09-02', 3),

('2009-09-03', 10),

('2009-09-03', 4),

('2009-09-03', 22),

('2009-09-06', 33)]

sss=groupby(things, itemgetter(0))for key, items insss:printkeyfor subitem initems:printsubitemprint '-' * 20

结果

>>>

2009-09-02

('2009-09-02', 11)

('2009-09-02', 3)

--------------------

2009-09-03

('2009-09-03', 10)

('2009-09-03', 4)

('2009-09-03', 22)

--------------------

2009-09-06

('2009-09-06', 33)

--------------------

groupby(things, itemgetter(0)) 以第0列为排序目标

groupby(things, itemgetter(1))以第1列为排序目标

groupby(things)以整行为排序目标

4. 参考

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

python mapreduce函数_用python写MapReduce函数——以WordCount为例 的相关文章

  • 为什么应用程序会在 sys.exit 命令之后显示?

    我正在关注这个教程 http zetcode com tutorials pyqt4 firstprograms http zetcode com tutorials pyqt4 firstprograms 在第一个示例中 我不明白为什么应
  • 通过 Python 与 Windows 控制台应用程序交互

    我在 Windows 上使用 python 2 5 我希望通过 Popen 与控制台进程交互 我目前有一小段代码 p Popen console app exe stdin PIPE stdout PIPE issue command 1
  • 使用 Flask SQLAlchemy 进行表(模型)继承

    我遵循了这个建议question https stackoverflow com questions 1337095 sqlalchemy inheritance但我仍然收到此错误 sqlalchemy exc NoForeignKeysE
  • Python中#和"""注释的区别

    开始用 Python 编程 我看到一些带有注释的脚本 and comments 这两种评论方式有什么区别 最好的事情就是阅读PEP 8 Python 代码风格指南 https www python org dev peps pep 0008
  • 同情因子简单关系

    我在 sympy 中有一个简单的因式分解问题 无法解决 我在 sympy 处理相当复杂的积分方面取得了巨大成功 但我对一些简单的事情感到困惑 如何得到 phi 2 2 phi phi 0 phi 0 2 8 因式分解 phi phi 0 2
  • 运行源代码中包含 Unicode 字符的 Python 2.7 代码

    我想运行一个在源代码中包含 unicode utf 8 字符的 Python 源文件 我知道这可以通过添加评论来完成 coding utf 8 在一开始的时候 但是 我希望不使用这种方法来做到这一点 我能想到的一种方法是以转义形式编写 un
  • Keras,如何获取每一层的输出?

    我已经用 CNN 训练了一个二元分类模型 这是我的代码 model Sequential model add Convolution2D nb filters kernel size 0 kernel size 1 border mode
  • 用于打印 C/C++ 文件的所有函数定义的 Python 脚本

    我想要一个 python 脚本来打印 C C 文件中定义的所有函数的列表 e g abc c定义两个函数为 void func1 int func2 int i printf d i return 1 我只想搜索文件 abc c 并打印其中
  • 在 C# 中实例化 python 类

    我已经用 python 编写了一个类 我想通过 IronPython 将其包装到 net 程序集中 并在 C 应用程序中实例化 我已将该类迁移到 IronPython 创建了一个库程序集并引用了它 现在 我如何真正获得该类的实例 该类看起来
  • Python3模拟用另一个函数替换函数

    如何使用 python 中的另一个函数来模拟一个函数 该函数也将提供一个模拟对象 我有类似以下操作的代码 def foo arg1 arg2 r bar arg1 does interesting things 我想替换的实现bar函数 让
  • 打印一个 Jupyter 单元中定义的所有变量

    有没有一种更简单的方法来以漂亮的方式显示单个单元格中定义的所有变量的名称和值 我现在做的方式是这样的 但是当有30个或更多变量时我浪费了很多时间 您可以使用whos http ipython readthedocs io en stable
  • python 语言环境奇怪的错误。这究竟是怎么回事?

    所以今天我升级到了 bazaar 2 0 2 我开始收到这条消息 顺便说一句 我在雪豹上 bzr warning unknown locale UTF 8 Could not determine what text encoding to
  • Docker:通过 Gunicorn 运行 Flask 应用程序 - Worker 超时?表现不佳?

    我正在尝试创建一个用Python Flask编写的新应用程序 由gunicorn运行 然后进行dockerized 我遇到的问题是 docker 容器内的性能非常差 不一致 我最终得到了响应 但我不明白为什么性能会下降 有时我会在日志中看到
  • keras 预测内存交换无限期增加

    我使用keras实现了一个分类程序 我有一大组图像 我想使用 for 循环来预测每个图像 然而 每次计算新图像时 交换内存都会增加 我尝试删除预测函数内部的所有变量 并且我确信该函数内部存在问题 但内存仍然增加 for img in ima
  • 指定 Parquet 属性 pyspark

    如何在 PySpark 中指定 Parquet 块大小和页面大小 我到处搜索 但找不到任何有关函数调用或导入库的文档 根据火花用户档案 https mail archives apache org mod mbox spark user 2
  • pandas apply:函数名是否带引号的区别

    简单数据框定义示例 df pd DataFrame A 2 4 1 B 8 4 1 C 6 2 7 df A B C 0 2 8 6 1 4 4 2 2 1 1 7 尝试理解以下块中函数参数调用的差异 df apply sum df app
  • 如何在 Pandas 数据框中用 NaN 替换一系列值?

    我有一个巨大的数据框 我应该如何用 NaN 替换一系列值 200 100 数据框 您可以使用pd DataFrame mask https pandas pydata org pandas docs stable generated pan
  • Synapse Notebook 参考 - 使用参数从另一个笔记本调用 Synapse Notebook

    我有一个带有参数的突触笔记本 我试图从另一个笔记本调用该笔记本 我正在使用 run 命令 我应该如何将参数从基本笔记本传递到正在调用的笔记本 另外 对我来说 上述答案不起作用 作为对此问题的单独解决方案 下面是一个答案 打开笔记本并转到最右
  • 如何获取所有Python标准库模块的列表?

    我想要类似的东西sys builtin module names标准库除外 其他不起作用的事情 sys modules 只显示已经加载的模块 sys prefix 包含非标准库模块并且似乎无法在 virtualenv 内工作的路径 我想要这
  • 将笔记本生成的 HTML 片段转换为 LaTeX 和 PDF

    在我的笔记本里有时会有 from IPython display import display HTML display HTML h3 The s is important h3 question of the day 但当我后来将笔记本

随机推荐