MRJob 十分钟入门: 用 Python 轻松运行 MapReduce

概览

前言

本教程取材翻译于mrjob v0.5.10 documentation。有删减。最近在学mapreduce, 用到mrjob,在网上没有找到好的中文教程,就自己翻译了一下官方文档的重点。

简介

mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。

安装

使用pip安装。

pip install mrjob

anaconda的使用者推荐使用conda安装。

conda install -c conda-forge mrjob 

第一个mrjob程序

在这里我们使用一个统计文本中字符数的程序fc.py作为例子。

from mrjob.job import MRJob

class FrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
    
    def reducer(self, key, values):
        yield key, sum(values)
    
if __name__ == '__main__':
    FrequencyCount.run()

要运行该程序,只需在命令行中运行

python fc.py testfile.txt

即可。其中testfile.txt可以是任意文本文件。
接下来我们简单地解释这段程序。 mrjob中所有的任务都是通过一个继承MRJob的类来定义的。在这个类中,可以包含mapper,combiner和reducer。这三个函数的参数均是一个(key, value)键值对。在本mapper函数中,键被忽略(写作_),值为文本的每一行line。在reducer中,对mapper生成的每一个键(chars,words,lines)求和,生成的和为对应的值输出。
另一个注意点是最后的if判断,该if判断是必须的。在这个if判断中,mrjob才明确我们的目标(job class)是什么。

MapReduce简介

mapreduce是一种用来在分布式系统上处理海量数据的系统。其基础是MapReduce: Simplified Data Processing on Large Clusters这篇论文。mapreduce将海量数据分成小的数据集,并行地进行相同的任务,最后将所有的子结果整理并合并成最终的结果。其中拆分数据进行相同的步骤称为mapper,后面合并整理的步骤称为reducer。而combiner可以看作是一个优化器,但不是必须的。

编写任务脚本

一步任务

一步任务(one step job)是最简单的mrjob脚本,前文中第一个mrjob程序 fc.py 就是一个一步工作脚本。
要编写一步工作脚本,只需继承(subclass)MRjob类,并覆盖(override)mapper, combiner, reducer 等方法即可。

多步任务

在编写多步任务(Multi step job)时,需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list。
以下是一个多步任务的例子。在这个例子中,输入文件中的最高频词汇将被输出。

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):
    
    def steps(self):
        return[
            MRStep(mapper = self.mapper_get_words,
                   combiner = self.combiner_count_words,
                   reducer = self.reducer_count_words),
            MRStep(reducer = self.reducer_find_max_word)
        ]
    
    def mapper_get_words(self,_,line):
        #yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(),1)
    
    def combiner_count_words(self, word, counts):
        #optimization: count the words we have seen so far
        yield (word, sum(counts))
    
    def reducer_count_words(self, word, counts):
        #send all (num_occupences, word) pairs to same reducer
        #use sum(num_occupences) to get the total num of occupences of each word
        yield None, (sum(counts), word)
    
    def reducer_find_max_word(self, _, word_count_pairs):
        #none key in this function because in reducer_count_words we discard the key
        #each item of word_count_pairs is (count, word), yield one result: the value(word) of max count
        yield max(word_count_pairs)

#never forget       
if __name__ == '__main__':
    MRMostUsedWord.run()

在step方法中共返回4个mapper, combiner 和 reducer。

开始和结束

在任务的开始前和结束后,可以通过特定的方法进行设置:*_init() 和 *_final()方法,前面的 * 可以是mapper, combiner, reducer 任意一种。

命令行语句的使用

第一种用法是在任务前先单独运行一条命令行指令,通过将*_cmd设置为参数传入MRStep或在MRJob中覆盖同名方法。
另一种用法是用命令行指令过滤(filter)输入文件,方法是在MRStep中加入mapper_pre_filterreducer_pre_filter,或在MRJob类中覆盖同名的方法。mapper_pre_filter='grep "kitty"'就表示在mapper前,只输入含有kitty的行。

协议

协议(Protocol)主要是关于mrjob中数据的格式。每一个任务都有input protocol , output protocol 和 internal protocol。
每一个协议都有read()write()方法,read()将原始数据的字节转化为python使用的键值对,write()将python使用的键值对转化回字节。
input protocol用来将input的字节读入第一个mapper(当不存在mapper时读入第一个reducer),output protocol用来最后输出output,internal protocol是将一步的输出转化成下一步的输入。
以上三种协议在应用中都可以自己设置,同时使用者也可以编写完全不同的全新协议。

运行器Runner

简介

MRJob类可以将任务置于MapReduce框架下运行,而运行器Runner包装并提交任务,在不同的环境下运行任务,并向使用者报告运行结果。
通常情况下,使用者是通过命令行以及设置文件(configuration file)与运行器进行交互的。当使用者通过命令行运行程序时,程序会根据不同的参数--runner去创建不同的运行器,使任务运行在不同的环境。
使用者一般不需要手动创建运行器,当程序运行时,会自动为任务生成运行器。当然,使用者也可以用my_job.make_runner()手动创建运行器。

本地环境运行

要在本地运行任务,只需要使用如下代码。

python your_mr_job_sub_class.py <log_file_or_whatever> output

使用者也可以单独运行若干个步骤。

python your_mr_job_sub_class.py --mapper

这行代码只运行了任务中的mapper部分。

Hadoop集群环境运行

首先对Hadoop集群进行设置.
接下来在运行任务时加入-r Hadoop参数。

python your_mr_job_sub_class.py -r Hadoop input output

EMR环境运行

首先设置aws.
接下来在运行任务时加入-r emr参数。

python your_mr_job_sub_class.py -r emr input output

Dataproc环境运行

首先谷歌云平台Google Cloud Platform.
接下来在运行任务时加入-r dataproc参数。

python your_mr_job_sub_class.py -r dataproc input output

手动编写运行器脚本

使用者可以手动编写运行器脚本,并在这个脚本中用make_runner()运行Runner来调用其他脚本中的任务。
手动编写运行器也常被用在测试中。

mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    runner.run()
    for key, value in mr_job.parse_output(runner.cat_output()):
        ..# do something with the parsed output

在这段代码中,使用者实例化了一个MRJob,用make_runner创建了一个Runner,用runner.run()运行任务,并利用cat_output()将结果转化为一个字节流(bytes stream),最后用parse_output将字节流进行解析。
在手动编写运行器脚本时,必须格外注意,绝对不能将用来生成运行器的make_runner和描述任务的类文件(job class)放在一个文件中。
以下是一个错误的示范:

from mrjob.job import MRJob

class MyJob(MRJob):
    # (your job)

# no, stop, what are you doing?!?!
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
    runner.run()
    # ... etc

运行这段代码,将导致类似以下的报错信息出现

UsageError: make_runner() was called with --steps. This probably means you
            tried to use it from __main__, which doesn't work.

正确的做法是将你的任务放入一个脚本,将手动编写的运行器放入另一个。以下是对刚才错误示范进行修正的两个对应文件.

# job.py
from mrjob.job import MRJob

class MyJob(MRJob):
    # (your job)

if __name__ == '__main__':
    MyJob.run()

# run.py
from job import MyJob
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
    runner.run()
    # ... etc

后记

在官方文档中还包含Spark和emr在mrjob中的使用等内容,因为精力有限,没有写进这篇教程中。我是第一次进行这种翻译教程的工作,因为能力有限,不免有些疏漏。如果有翻译和内容错误之处,欢迎大家指正。
如果用微信手机端的阅读体验不佳,欢迎点击链接,来我的博客阅读。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 做数据分析最好的语言当然要数Python,虽然Hadoop由JAVA写成,但Python也可以很好地操控他。O’R...
    C就要毕业了阅读 51,441评论 4 43
  • 摘要:大数据计算服务(MaxCompute)的功能详解和使用心得 点此查看原文:http://click.aliy...
    猫耳呀阅读 1,411评论 0 0
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 583评论 0 1
  • MapReduce框架结构## MapReduce是一个用于大规模数据处理的分布式计算模型MapReduce模型主...
    Bloo_m阅读 3,730评论 0 4
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139