MRJob 十分钟入门: 用 Python 轻松运行MapReduce

概览

前言

本教程取材翻译于mrjob v0.5.10 documentation。有删减。最近在学mapreduce, 用到mrjob,在网上没有找到好的中文教程,就自己翻译了一下官方文档的重点。

简介

mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。

安装

使用pip安装。

pip install mrjob

anaconda的使用者推荐使用conda安装。

conda install -c conda-forge mrjob 

第一个mrjob程序

在这里我们使用一个统计文本中字符数的程序fc.py作为例子。

from mrjob.job import MRJob

class FrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
    
    def reducer(self, key, values):
        yield key, sum(values)
    
if __name__ == '__main__':
    FrequencyCount.run()

要运行该程序,只需在命令行中运行

python fc.py testfile.txt

即可。其中testfile.txt可以是任意文本文件。
接下来我们简单地解释这段程序。 mrjob中所有的任务都是通过一个继承MRJob的类来定义的。在这个类中,可以包含mapper,combiner和reducer。这三个函数的参数均是一个(key, value)键值对。在本mapper函数中,键被忽略(写作_),值为文本的每一行line。在reducer中,对mapper生成的每一个键(chars,words,lines)求和,生成的和为对应的值输出。
另一个注意点是最后的if判断,该if判断是必须的。在这个if判断中,mrjob才明确我们的目标(job class)是什么。

MapReduce简介

mapreduce是一种用来在分布式系统上处理海量数据的系统。其基础是MapReduce: Simplified Data Processing on Large Clusters这篇论文。mapreduce将海量数据分成小的数据集,并行地进行相同的任务,最后将所有的子结果整理并合并成最终的结果。其中拆分数据进行相同的步骤称为mapper,后面合并整理的步骤称为reducer。而combiner可以看作是一个优化器,但不是必须的。

编写任务脚本

一步任务

一步任务(one step job)是最简单的mrjob脚本,前文中*第一个mrjob程序* fc.py 就是一个一步工作脚本。
要编写一步工作脚本,只需继承(subclass)MRjob类,并覆盖(override)mapper, combiner, reducer 等方法即可。

多步任务

在编写多步任务(Multi step job)时,需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list。
以下是一个多步任务的例子。在这个例子中,输入文件中的最高频词汇将被输出。

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):
    
    def steps(self):
        return[
            MRStep(mapper = self.mapper_get_words,
                   combiner = self.combiner_count_words,
                   reducer = self.reducer_count_words),
            MRStep(reducer = self.reducer_find_max_word)
        ]
    
    def mapper_get_words(self,_,line):
        #yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(),1)
    
    def combiner_count_words(self, word, counts):
        #optimization: count the words we have seen so far
        yield (word, sum(counts))
    
    def reducer_count_words(self, word, counts):
        #send all (num_occupences, word) pairs to same reducer
        #use sum(num_occupences) to get the total num of occupences of each word
        yield None, (sum(counts), word)
    
    def reducer_find_max_word(self, _, word_count_pairs):
        #none key in this function because in reducer_count_words we discard the key
        #each item of word_count_pairs is (count, word), yield one result: the value(word) of max count
        yield max(word_count_pairs)

#never forget       
if __name__ == '__main__':
    MRMostUsedWord.run()

在step方法中共返回4个mapper, combiner 和 reducer。

开始和结束

在任务的开始前和结束后,可以通过特定的方法进行设置:*_init() 和 *_final()方法,前面的 * 可以是mapper, combiner, reducer 任意一种。

命令行语句的使用

第一种用法是在任务前先单独运行一条命令行指令,通过将*_cmd设置为参数传入MRStep或在MRJob中覆盖同名方法。
另一种用法是用命令行指令过滤(filter)输入文件,方法是在MRStep中加入mapper_pre_filterreducer_pre_filter,或在MRJob类中覆盖同名的方法。mapper_pre_filter='grep "kitty"'就表示在mapper前,只输入含有kitty的行。

协议

协议(Protocol)主要是关于mrjob中数据的格式。每一个任务都有input protocol , output protocol 和 internal protocol。
每一个协议都有read()write()方法,read()将原始数据的字节转化为python使用的键值对,write()将python使用的键值对转化回字节。
input protocol用来将input的字节读入第一个mapper(当不存在mapper时读入第一个reducer),output protocol用来最后输出output,internal protocol是将一步的输出转化成下一步的输入。
以上三种协议在应用中都可以自己设置,同时使用者也可以编写完全不同的全新协议。

运行器Runner

简介

MRJob类可以将任务置于MapReduce框架下运行,而运行器Runner包装并提交任务,在不同的环境下运行任务,并向使用者报告运行结果。
通常情况下,使用者是通过命令行以及设置文件(configuration file)与运行器进行交互的。当使用者通过命令行运行程序时,程序会根据不同的参数--runner去创建不同的运行器,使任务运行在不同的环境。
使用者一般不需要手动创建运行器,当程序运行时,会自动为任务生成运行器。当然,使用者也可以用my_job.make_runner()手动创建运行器。

本地环境运行

要在本地运行任务,只需要使用如下代码。

python your_mr_job_sub_class.py <log_file_or_whatever> output

使用者也可以单独运行若干个步骤。

python your_mr_job_sub_class.py --mapper

这行代码只运行了任务中的mapper部分。

Hadoop集群环境运行

首先对Hadoop集群进行设置.
接下来在运行任务时加入-r Hadoop参数。

python your_mr_job_sub_class.py -r Hadoop input output

EMR环境运行

首先设置aws.
接下来在运行任务时加入-r emr参数。

python your_mr_job_sub_class.py -r emr input output

Dataproc环境运行

首先谷歌云平台Google Cloud Platform.
接下来在运行任务时加入-r dataproc参数。

python your_mr_job_sub_class.py -r dataproc input output

手动编写运行器脚本

使用者可以手动编写运行器脚本,并在这个脚本中用make_runner()运行Runner来调用其他脚本中的任务。
手动编写运行器也常被用在测试中。

mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    runner.run()
    for key, value in mr_job.parse_output(runner.cat_output()):
        ..# do something with the parsed output

在这段代码中,使用者实例化了一个MRJob,用make_runner创建了一个Runner,用runner.run()运行任务,并利用cat_output()将结果转化为一个字节流(bytes stream),最后用parse_output将字节流进行解析。
在手动编写运行器脚本时,必须格外注意,绝对不能将用来生成运行器的make_runner和描述任务的类文件(job class)放在一个文件中。
以下是一个错误的示范:

from mrjob.job import MRJob

class MyJob(MRJob):
    # (your job)

# no, stop, what are you doing?!?!
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
    runner.run()
    # ... etc

运行这段代码,将导致类似以下的报错信息出现

UsageError: make_runner() was called with --steps. This probably means you
            tried to use it from __main__, which doesn't work.

正确的做法是将你的任务放入一个脚本,将手动编写的运行器放入另一个。以下是对刚才错误示范进行修正的两个对应文件.

# job.py
from mrjob.job import MRJob

class MyJob(MRJob):
    # (your job)

if __name__ == '__main__':
    MyJob.run()
# run.py
from job import MyJob
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
    runner.run()
    # ... etc

后记

在官方文档中还包含Spark和emr在mrjob中的使用等内容,因为精力有限,没有写进这篇教程中。我是第一次进行这种翻译教程的工作,因为能力有限,不免有些疏漏。如果有翻译和内容错误之处,欢迎大家指正。

Comments
Write a Comment