今天闲来无事,发现很久之前机器上还有hadoop的安装文件,于是来尝试一下MapReduce的例子,顺便来考验一下自己曾经失去的python开发经验 :(
例子是一串输入 "a b c d a b c",所谓Map和Reduce,其实是两个计算过程,第一个过程就是map,它用来拆分任务,罗列详细统计结果,代码如下:
import sys
def read_input(file):
for line in file:
yield line.split()
def main():
data = read_input(sys.stdin)
for words in data:
for word in words:
print("%s%s%d"%(word, '\t', 1)) #按照一定的格式统计每一个字符出现的次数,可以重复
if __name__ == '__main__':
main()
在python解释器中尝试一下
[training@localhost training_materials]$ echo "a b c d a b c" | python2.7 hdfs_map.py
a 1
b 1
c 1
d 1
a 1
b 1
c 1
于是再来讨论第二个过程,reduce就是把结果分组进行汇总,得到最终的统计结果!
import sys
from operator import itemgetter
from itertools import groupby
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main():
data = read_mapper_output(sys.stdin)
for current_word, group in groupby(data, itemgetter(0)): #分组统计
total_count = sum(int(count) for current_word, count in group)
print ("%s%s%d"%(current_word, '\t', total_count))
if __name__ == '__main__':
main()
再次运行来看结果吧!
[training@localhost training_materials]$ echo "a b c d a b c" | python2.7 hdfs_map.py | sort -k1,1 | python2.7 hdfs_reduce.py
a 2
b 2
c 2
d 1
顺便要说一下yield的概念,这个其实是一个迭代器,用来在迭代过程之中记录next记录的优化方案!