一、原理介绍
- 概述
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:
采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer)
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
- Hadoop Streaming原理
Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。
如果一个文件(可执行或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把该文件作为一个单独进程启动,mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。
对于reducer,类似。
以上是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。 - Streaming优点
发效率高,便于移植。只要按照标准输入输出格式进行编程,就可以满足hadoop要求。因此单机程序稍加改动就可以在集群上进行使用。 同样便于测试-只要按照 cat input | mapper | sort | reducer > output 进行单机测试即可。如果单机测试通过,大多数情况是可以在集群上成功运行的,只要控制好内存就好了。
提高程序效率-有些程序对内存要求较高,如果用java控制内存毕竟不如C/C++。
Streaming不足
1.Streaming中的mapper和reducer默认只能向标准输出写数据,不能方便地处理多路输出Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(6)-partitioner:用户自定义的partitioner程序
(7)-combiner:用户自定义的combiner程序(必须用java实现)
(8)-D:作业的一些属性(以前用的是-jonconf),具体有:
1)mapred.map.tasks:map task数目
2)mapred.reduce.tasks:reduce task数目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
另外,Hadoop本身还自带一些好用的Mapper和Reducer:
(1) Hadoop聚集功能
Aggregate提供一个特殊的reducer类和一个特殊的combiner类,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一组value的序列。用户可以使用Aggregate定义一个mapper插件类,这个类用于为mapper输入的每个key/value对产生“可聚合项”。Combiner/reducer利用适当的聚合器聚合这些可聚合项。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的选取(类似于Unix中的‘cut’)
Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduc帮助用户高效处理文本数据,就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab),可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。
二、shell脚本实现实例
- 首先准备测试文件
用下面的脚本生成NG大小的test.txt
#! /bin/sh
while [ "1" == "1" ]
do
echo "noe two three" >> test.txt
done
- 将测试文件上传到HDFS文件系统
[hadoop@master ~]$ hdfs dfs -mkdir /test
[hadoop@master ~]$ hdfs dfs -put test.txt /test/
[hadoop@master ~]$ hdfs dfs -ls /test
Found 1 items
-rw-r--r-- 2 hadoop supergroup 4880841000 2017-03-20 10:45 /test/test.txt
- mapper脚本代码
#! /bin/bash
while read LINE; do
for word in $LINE
do
#-e使得\t转义(escape)为tab
echo -e "$word\t1"
done
done
- reducer脚本代码
#! /bin/sh
count=0
started=0
word=""
while read LINE;do
newword=`echo $LINE | cut -d ' ' -f 1`
if [ "$word" != "$newword" ];then
[ $started -ne 0 ] && echo -e "$word\t$count"
word=$newword
count=1
started=1
else
count=$(( $count + 1 ))
fi
done
echo -e "$word\t$count"
- 执行任务
hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input test -output output2 -mapper mapper.sh -reducer reducer.sh -file mapper.sh -file reducer.sh
- 可以在slave节点上看到相关的进程,这些mapper进程在master节点上是不存在的
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7822 hadoop 20 0 103m 1628 1076 R 100.0 0.0 2:53.34 mapper.sh
7819 hadoop 20 0 103m 1632 1076 R 100.0 0.0 2:53.33 mapper.sh
7836 hadoop 20 0 103m 1636 1076 R 100.0 0.0 2:53.21 mapper.sh
7833 hadoop 20 0 103m 1628 1076 R 100.0 0.0 2:53.23 mapper.sh
29993 root 20 0 164g 346m 10m R 100.2 0.3 113:27.07 gpu_executor
7653 hadoop 20 0 912m 224m 18m S 81.4 0.2 2:34.85 java
7650 hadoop 20 0 899m 229m 18m S 80.4 0.2 2:33.40 java
7651 hadoop 20 0 922m 235m 18m S 79.4 0.2 2:32.60 java
7652 hadoop 20 0 915m 237m 18m S 79.1 0.2 2:31.61 java
28961 root 20 0 564m 9m 6252 S 18.8 0.0 23:43.42 TaskTracker
31606 hadoop 20 0 1838m 338m 18m S 4.3 0.3 0:25.58 java
6102 liuhao 20 0 931m 28m 18m S 1.6 0.0 132:25.21 knotify4
- map完成后,reduce阶段会看到reduce进程
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
29993 root 20 0 164g 346m 10m R 97.7 0.3 222:36.66 gpu_executor
28961 root 20 0 564m 9m 6252 S 19.7 0.0 46:10.45 TaskTracker
17016 hadoop 20 0 103m 1664 1100 S 11.8 0.0 0:33.40 reducer.sh
- 相关问题
- 首先遇到“找不到或无法加载类hadoop-streaming-2.7.3.jar”的问题,后来发现是因为命令丢了“jar”这个参数
- 然后又遇到job发不下去,一直卡在
INFO mapreduce.Job: Running job: job_1489999749396_0004
后来将slaves节点的hostname也修正为IP映射表内对应的名字,解决?
- 运行阶段卡在reduce
17/03/20 19:50:29 INFO mapreduce.Job: map 74% reduce 0%
17/03/20 19:50:30 INFO mapreduce.Job: map 77% reduce 0%
17/03/20 19:50:33 INFO mapreduce.Job: map 84% reduce 0%
17/03/20 19:50:36 INFO mapreduce.Job: map 91% reduce 0%
17/03/20 19:50:39 INFO mapreduce.Job: map 97% reduce 0%
17/03/20 19:50:40 INFO mapreduce.Job: map 98% reduce 0%
17/03/20 19:50:41 INFO mapreduce.Job: map 100% reduce 0%
17/03/20 19:50:50 INFO mapreduce.Job: map 100% reduce 67%
根据一位外国友人的说明,在reduce阶段 ,0-33%阶段是 shuffle 阶段,就是根据键值 来讲本条记录发送到指定的reduce,这个阶段应该是在map还没有完全完成的时候就已经开始了,因为我们会看到map在执行到一个百分比后reduce也启动了,这样做也提高了程序的执行效率。
34%-65%阶段是sort阶段,就是reduce根据收到的键值进行排序。map阶段也会发生排序,map的输出结果是以键值为顺序排序后输出,可以通过只有map阶段处理的输出来验证(以前自己验证过,貌似确有这么回事,大家自己再验证下,免得我误人子弟啊)。
66%-100%阶段是处理阶段,这个阶段才是真正的处理阶段,如果程序卡在这里,估计就是你的reduce程序有问题了。
索性等了一晚上,第二天终于有动静了
17/03/21 07:01:53 INFO mapreduce.Job: map 100% reduce 77%
和上面的记录对比发现,从%67到%77用了11个小时!这明显是reduce程序效率太慢了。也可能是数据倾斜问题。中间也试过增加reducer的数量,但无果。最终我索性减少了输入文件的行数,使其只有三行:
one two three
one two three
one two three
然后重新运行程序,瞬间得到了结果:
[hadoop@master ~]$ hdfs dfs -cat output2/part-00002
one\t3
three\t3
two\t3
可见,结果是正确的。
Python版本
- mapper脚本
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
- reducer脚本
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
- 单机测试
[hadoop@master ~]$ echo "one two three" | ./mapper.py | sort | ./reducer.py
one 1
three 1
two 1
- 运行
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output3 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
令人诧异的是很快就执行完了,难道真的是shell脚本不适合做类似统计这样的事情吗?
[hadoop@master ~]$ hdfs dfs -ls output3
Found 4 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:12 output3/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 14 2017-03-22 10:12 output3/part-00000
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:12 output3/part-00001
-rw-r--r-- 2 hadoop supergroup 24 2017-03-22 10:12 output3/part-00002
[hadoop@master ~]$ hdfs dfs -cat output3/part-00002
noe 1166202
two 1166202
[hadoop@master ~]$ hdfs dfs -cat output3/part-00000
three 1166202
[hadoop@master ~]$