工作中,我们常常会遇到数据统计的需求,如果数据在DB里,那么写SQL即可。但是如果DB满足不了需求,那我们就需要统计日志,但是对于高度服务化的互联网应用,基本都是分布式的服务,日志基本也都是分布式的,这意味着Shell 和 Python统计的方式,弊端很多,首先数据聚合就是很大的问题。
所幸,公司有Hadoop平台,以前做日志的统计都是在Hadoop上写Mapreduce,但是后来发现,对于简单的日志统计和过滤,写一个Mapreduce成本确实很高,同时编程模型不是很灵活,实现一个并行或者多次迭代的场景的数据统计确实比较麻烦。所以果断投入了Spark的怀抱。
今天碰见一个需求,需要对日志中的用户注册渠道数据进行统计,因为需要统计很多天的数据,同时渠道也有很多,写MapReduce还是比较复杂,主要还是惧怕代码量和反复打包上传。
所以用Spark简单的实现了一个统计场景,难度不高于经典“Word Count”,写出来只为记录自己学习的点滴:
import pyspark as sp;
from pyspark import SparkConf;
import time;
def countReg():
spConf = SparkConf()
context = sp.SparkContext(appName="regCount")
regFile = context.textFile("/home/xxx/xxx/xxx/*");
firstFileterd = regFile.filter(lambda a: "reg_mobile" in a.encode("UTF-8") or "reg_email" in a.encode("UTF-8") or "reg_dynamic_mobile" in a.encode("UTF-8")).cache();
filterMobileFile = firstFileterd.filter(lambda a: "reg_mobile" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[0]=="reg_mobile");
filterEmailFile = firstFileterd.filter(lambda a: "reg_email" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[0]=="reg_email");
dynamicMobileFile = firstFileterd.filter(lambda a: "reg_dynamic_mobile" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[3]=="null");
mobileCollect = filterMobileFile.map(mapfunc).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
emailCollect = filterEmailFile.map(mapfunc).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
dynamicMobileCollect = dynamicMobileFile.map(mapfuncForDynamic).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
print("-------------------------------mobile---------------------------------->\n")
for line in mobileCollect:
print(line)
print("--------------------------------email--------------------------------->\n")
for line in emailCollect:
print(line)
print("---------------------------------dynamic_mobile-------------------------------->\n")
for line in dynamicMobileCollect:
print(line)
def mapfunc(line):
timeStamp = line.encode("UTF-8").split("\001")[-1]
if(timeStamp!='' and timeStamp!=None):
timeStamp = long(timeStamp)/1000
timeRaw = time.localtime(timeStamp)
datestr = time.strftime("%Y-%m-%d",timeRaw)
return (datestr,1)
else:
return ("error",1)
def mapfuncForDynamic(line):
timeStamp = line.encode("UTF-8").split("\001")[-2]
if(timeStamp!='' and timeStamp!=None):
timeStamp = long(timeStamp)/1000
timeRaw = time.localtime(timeStamp)
datestr = time.strftime("%Y-%m-%d",timeRaw)
return (datestr,1)
else:
return ("error",1)
if __name__=="__main__":
countReg();
上述是对三个来源的注册数据进行统计,虽然不难,但是能说明Spark比Hadoop的好处:
- 代码简单,编程比较灵活
- 脚本开发,不用反复打包
- MaprReduce实现上述场景要么起三个Job,要么重写Partitioner,分配到不同的Reducer。Spark实现起来就比较简单了。
因为之前比较习惯Python开发,但是对于数据开发方向的话,Python还是比较不合适的,对各种存储支持的不是很好,对SparkStreaming支持的也不是很好。所以以后准备学习Scala去开发Spark,毕竟是Spark的构建语言。