如何使用Python为Hadoop编写一个简单的MapReduce程序

Python012

如何使用Python为Hadoop编写一个简单的MapReduce程序,第1张

MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助我们用非Java的编程语言使用MapReduce,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。PythonCodeMap:mapper.py#!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,line.split())#increasecountersforwordinwords:#writetheresultstoSTDOUT(standardoutput)#whatweoutputherewillbetheinputforthe#Reducestep,i.e.theinputforreducer.py##tab-delimitedthetrivialwordcountis1print'%s\t%s'%(word,1)Reduce:reducer.py#!/usr/bin/envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count={}#inputcomesfromSTDINforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically##thisstepisNOTrequired,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialHadoop#wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)CCodeMap:Mapper.c#include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE]while(fgets(buffer,BUF_SIZE-1,stdin)){intlen=strlen(buffer)if(buffer[len-1]=='\n')buffer[len-1]=0char*querys=index(buffer,'')char*query=NULLif(querys==NULL)continuequerys+=1/*nottoinclude'\t'*/query=strtok(buffer,"")while(query){printf("%s\t1\n",query)query=strtok(NULL,"")}}return0}h>h>h>h>Reduce:Reducer.c#include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[]){charstrLastKey[BUFFER_SIZE]charstrLine[BUFFER_SIZE]intcount=0*strLastKey='\0'*strLine='\0'while(fgets(strLine,BUFFER_SIZE-1,stdin)){char*strCurrKey=NULLchar*strCurrNum=NULLstrCurrKey=strtok(strLine,DELIM)strCurrNum=strtok(NULL,DELIM)/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){strcpy(strLastKey,strCurrKey)}if(strcmp(strCurrKey,strLastKey)){printf("%s\t%d\n",strLastKey,count)count=atoi(strCurrNum)}else{count+=atoi(strCurrNum)}strcpy(strLastKey,strCurrKey)}printf("%s\t%d\n",strLastKey,count)/*flushthecount*/return0}h>h>h>h>首先我们调试一下源码:chmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Reducer.c-oReducerchmod+xMapperchmod+xReducerecho"foofooquuxlabsfoobarquux"|./Mapper|./Reducerbar1foo2labs1quux1foo1quux1你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.在Hadoop中运行程序首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapReduce程序,供php程序员参考:Map:mapper.php#!/usr/bin/php$word2count=array()//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line))//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY)//increasecountersforeach($wordsas$word){$word2count[$word]+=1}}//writetheresultstoSTDOUT(standardoutput)//whatweoutputherewillbetheinputforthe//Reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL}?>Reduce:mapper.php#!/usr/bin/php$word2count=array()//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//removeleadingandtrailingwhitespace$line=trim($line)//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line)//convertcount(currentlyastring)toint$count=intval($count)//sumcountsif($count>0)$word2count[$word]+=$count}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialHadoop//wordcountexamplesksort($word2count)//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL}?>作者:马士华发表于:2008-03-05

Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。

首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce,Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。

我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。

Python Code

Map: mapper.py

#!/usr/bin/env python

import sys

# maps words to their counts

word2count = {}

# input comes from STDIN (standard input)

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

# split the line into words while removing any empty strings

words = filter(lambda word: word, line.split())

# increase counters

for word in words:

# write the results to STDOUT (standard output)

# what we output here will be the input for the

# Reduce step, i.e. the input for reducer.py

#

# tab-delimitedthe trivial word count is 1

print '%s\t%s' % (word, 1)

Reduce: reducer.py

#!/usr/bin/env python

from operator import itemgetter

import sys

# maps words to their counts

word2count = {}

# input comes from STDIN

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

# parse the input we got from mapper.py

word, count = line.split()

# convert count (currently a string) to int

try:

count = int(count)

word2count[word] = word2count.get(word, 0) + count

except ValueError:

# count was not a number, so silently

# ignore/discard this line

pass

# sort the words lexigraphically

#

# this step is NOT required, we just do it so that our

# final output will look more like the official Hadoop

# word count examples

sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

# write the results to STDOUT (standard output)

for word, count in sorted_word2count:

print '%s\t%s'% (word, count)

C Code

Map: Mapper.c

#include

#include

#include

#include

#define BUF_SIZE2048

#define DELIM "\n"

int main(int argc, char *argv[]){

char buffer[BUF_SIZE]

while(fgets(buffer, BUF_SIZE - 1, stdin)){

int len = strlen(buffer)

if(buffer[len-1] == '\n')

buffer[len-1] = 0

char *querys = index(buffer, ' ')

char *query = NULL

if(querys == NULL) continue

querys += 1/* not to include '\t' */

query = strtok(buffer, " ")

while(query){

printf("%s\t1\n", query)

query = strtok(NULL, " ")

}

}

return 0

}

h>h>h>h>

Reduce: Reducer.c

#include

#include

#include

#include

#define BUFFER_SIZE 1024

#define DELIM "\t"

int main(int argc, char *argv[]){

char strLastKey[BUFFER_SIZE]

char strLine[BUFFER_SIZE]

int count = 0

*strLastKey = '\0'

*strLine = '\0'

while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){

char *strCurrKey = NULL

char *strCurrNum = NULL

strCurrKey = strtok(strLine, DELIM)

strCurrNum = strtok(NULL, DELIM)/* necessary to check error but.... */

if( strLastKey[0] == '\0'){

strcpy(strLastKey, strCurrKey)

}

if(strcmp(strCurrKey, strLastKey)){

printf("%s\t%d\n", strLastKey, count)

count = atoi(strCurrNum)

}else{

count += atoi(strCurrNum)

}

strcpy(strLastKey, strCurrKey)

}

printf("%s\t%d\n", strLastKey, count)/* flush the count */

return 0

}

h>h>h>h>

首先我们调试一下源码:

chmod +x mapper.py

chmod +x reducer.py

echo "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.py

bar 1

foo 3

labs1

quux2

g++ Mapper.c -o Mapper

g++ Reducer.c -o Reducer

chmod +x Mapper

chmod +x Reducer

echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer

bar 1

foo 2

labs1

quux1

foo 1

quux1

你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.

在Hadoop中运行程序

首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.

bin/hadoop dfs -copyFromLocal /tmp/doc doc

运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapReduce的Job.

bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/Mapper\

-reducer /home/hadoop/Reducer -input doc/* -output c-output -jobconf mapred.reduce.tasks=1

additionalConfSpec_:null

null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming

packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null

08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1

08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]

08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003

08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:

08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003

08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003

08/03/04 19:03:14 INFO streaming.StreamJob: map 0% reduce 0%

08/03/04 19:03:15 INFO streaming.StreamJob: map 33% reduce 0%

08/03/04 19:03:16 INFO streaming.StreamJob: map 100% reduce 0%

08/03/04 19:03:19 INFO streaming.StreamJob: map 100% reduce 100%

08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003

08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output

bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py\

-reducer /home/hadoop/reducer.py -input doc/* -output python-output -jobconf mapred.reduce.tasks=1

additionalConfSpec_:null

null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming

packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null

08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1

08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]

08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004

08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:

08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004

08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004

08/03/04 19:05:42 INFO streaming.StreamJob: map 0% reduce 0%

08/03/04 19:05:48 INFO streaming.StreamJob: map 33% reduce 0%

08/03/04 19:05:49 INFO streaming.StreamJob: map 100% reduce 0%

08/03/04 19:05:52 INFO streaming.StreamJob: map 100% reduce 100%

08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004

08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output

当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。

当Job工作完成后我们能够在c-output和python-output看到一些文件

bin/hadoop dfs -ls c-output

输入下面的命令我们能够看到我们运行完MapReduce的结果

bin/hadoop dfs -cat c-output/part-00000

用Hadoop Streaming运行MapReduce会比较用Java的代码要慢,因为有两方面的原因:

使用 Java API >>C Streaming >>Perl Streaming 这样的一个流程运行会阻塞IO.

不像Java在运行Map后输出结果有一定数量的结果集就启动Reduce的程序,用Streaming要等到所有的Map都运行完毕后才启动Reduce

如果用Python编写MapReduce的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapReduce API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.

下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-mapreduce-program-in-php.html页面中摘下的用php编写的MapReduce程序,供php程序员参考:

Map: mapper.php

#!/usr/bin/php

$word2count = array()

// input comes from STDIN (standard input)

while (($line = fgets(STDIN)) !== false) {

// remove leading and trailing whitespace and lowercase

$line = strtolower(trim($line))

// split the line into words while removing any empty string

$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY)

// increase counters

foreach ($words as $word) {

$word2count[$word] += 1

}

}

// write the results to STDOUT (standard output)

// what we output here will be the input for the

// Reduce step, i.e. the input for reducer.py

foreach ($word2count as $word =>$count) {

// tab-delimited

echo $word, chr(9), $count, PHP_EOL

}

?>

Reduce: mapper.php

#!/usr/bin/php

$word2count = array()

// input comes from STDIN

while (($line = fgets(STDIN)) !== false) {

// remove leading and trailing whitespace

$line = trim($line)

// parse the input we got from mapper.php

list($word, $count) = explode(chr(9), $line)

// convert count (currently a string) to int

$count = intval($count)

// sum counts

if ($count >0) $word2count[$word] += $count

}

// sort the words lexigraphically

//

// this set is NOT required, we just do it so that our

// final output will look more like the official Hadoop

// word count examples

ksort($word2count)

// write the results to STDOUT (standard output)

foreach ($word2count as $word =>$count) {

echo $word, chr(9), $count, PHP_EOL

}

?>

作者:马士华 发表于:2008-03-05