pyspark:dataframe与rdd的一点小事

大纲

  1. 问题描述
  2. 解决方案
  3. 代码以及效果
  4. 总结

1.问题描述

要做的事情

从一堆房源hive表和hdfs数据中读取信息,并将同一id的信息整合到一起。共有5个hive表,2个hdfs文件;每个表所需操作的id数是千万数量级,每个表中字段20~200不等。

当前做法

用pyspark读取hive表以及hdfs的数据,并转换成rdd,然后用leftOuterJoin将信息整合;这样做需消耗至少30min,甚至1h的时间,速度太慢。本文就是针对该问题进行的优化。

2.解决方案

错误尝试

对于程序的性能问题,其实最开始的做法应该是打印每个步骤所需的时间,然后从最耗费时间的步骤开始。

但是我最开始并没有这样做,因为leftOuterJoin的操作是O(n^2), n为表的大小,因此我以为leftOuterJoin是最耗时的。于是我直接将leftOuterJoin换成了rdd.union(), 然后再reduceByKey的方式。但是当我将所有的leftOuterJoin操作改过来后,发现总的时间还是需要30min。

另外提醒下,当数据量过大时,用leftOuterJoin容易出现内存溢出的问题;当两个rdd数量级差别较大时,用reduceByKey容易出现数据倾斜的问题。如果是两个rdd取交集的话,建议先用filter过滤,然后再用reduceByKey进行合并操作。

找对方向

hive数据直接用dataframe操作

说实话,看到这个效果有点沮丧。不过,我在等待程序运行的过程中,发现将两个rdd用reduceByKey操作的时间并不长,大量的时间其实花在读取hive表并转成rdd的阶段。这时我才意识到,我应该打印出每步操作的时间,针对性地优化。很快,验证了我的想法,确实在转换rdd耗费了不少时间。

转换成rdd是为了做两张表合并的操作,然后我查资料发现,其实针对结构化的数据,dataframe和dataset比rdd处理更快;而数据从hive表中读出后本身就是dataframe的格式,其实完全没有必要转成rdd。实验后,果然省下不少时间。另外,用dataframe的join操作也比rdd的leftOuterJoin或者reduceByKey快很多。

其实dataframe就像SQL操作一样,自身带有join,left join, right join等操作,速度比rdd的left join快很多。但是具体原理还没有细究,在第3部分会介绍下dataframe的某些用法。

hdfs数据转换成dataframe

由于还有两份数据存在hdfs,因此有个问题:是将rdd转成dataframe操作更快呢?还是将dataframe转换成rdd更快呢?实验后发现,前者的时间更快,而且rdd转换成dataframe比反过来操作更快。(dataframe转成rdd我是用json格式存储的,怀疑与这个也有关系。)

存储dataframe

目前看到的dataframe的存储方式有3种,但没有进行比较:

  • 直接存成csv或者json格式
  • 转成rdd存储
  • 存到hive表

3.代码说明

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, SQLContext
from pyspark import StorageLevel
from pyspark.sql import Row

conf = (SparkConf().setAppName("spark_job_name")
            .set("spark.hadoop.validateOutputSpecs", "false")
            .set("spark.akka.frameSize", "300")
            .set("spark.driver.maxResultSize", "6g")
            )

3.1rdd与dataframe的相互转换

### rdd2dataframe
### 方法1
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Bob', age=5, height=80),Row(name='Cycy', age=10, height=80),Row(name='Cycy', age=10, height=80)])
df = rdd.toDF()
df.show()

### 方法2
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)
### 推荐使用第一种

### 如果数据是dict格式,可以使用下面的方式
info = {"a": 1, "b": 2, "c": 3}
new_row = Row(**info)
#Row(a=1, b=2, c=3)
new_row.asDict()
#{'a': 1, 'c': 3, 'b': 2}


### dataframe2rdd
_rdd = df.rdd.map(lambda x: x.asDict())
print _rdd.take(3)
#[{'age': 5, 'name': u'Alice', 'height': 80}, {'age': 5, 'name': u'Bob', 'height': 80}, {'age': 10, 'name': u'Cycy', 'height': 80}]

3.2 dataframe之join的用法

(接上段代码)

df1 = sc.parallelize([Row(name='Alice', score=78),Row(name='Bob', score=80),Row(name='Cycy', score=80)]).toDF()

### join的用法
#方法1
df_join = df.join(df1, df.name==df1.name)

#方法2
df_join = df.join(df1, ["name"])
#DataFrame[name: string, age: bigint, height: bigint, score: bigint]
#发现第二种方法只会得到4个字段,但是第一种会得到5个字段

3.3 groupBy的用法

df_g = df.groupBy("name").sum("height")
df_g.show()
'''
+-----+-----------+                                                             
| name|sum(height)|
+-----+-----------+
| Cycy|        160|
|  Bob|         80|
|Alice|         80|
+-----+-----------+

'''
import pyspark.sql.functions as sf
df_g1=rdd.groupBy("name").agg(sf.sum("height").alias('height_sum'))
df_g1.show()
'''
+-----+----------+                                                              
| name|height_sum|
+-----+----------+
| Cycy|       160|
|  Bob|        80|
|Alice|        80|
+-----+----------+

'''

3.4 dataframe保存到hdfs

本文尝试了3种存储的方式,直接存到hive表还没尝试。

###### 转换成rdd,以json格式保存
import decimal
def to_json(row):
    for k, v in row.items():
        if isinstance(v, decimal.Decimal):
            row[k] = float(v)
    return json.dumps(row, ensure_ascii=False).encode("utf-8")

rdd = df.rdd.map(lambda x: x.asDict()).map(to_json)
rdd.saveAsTextFile(output_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

###### 直接存成csv格式
df.write.csv(csv_output_path, header='true')

###### 直接存成parquet
df.write.parquet(output_path)

由于json不支持decimal类型,因此如果有decimal类型的数据,存成json之前要转换成float类型。

对比以上三种方式,数据量14372302 * 90,耗时情况如下:

-- rdd+json csv parquet
save 145.837 126.671 147.9598
write 6.07 18.1 ---

parquet方式的读取暂时有bug,还没解决。其他方式的读取可以参见pyspark系列--pyspark读写dataframe

目前采用dataframe转rdd,以json格式存储,完整的流程耗时:当hive表的数据量为100w+时,用时328.78s; 当数据量为1000w+时,用时408.02s。

当然,spark的运行速度还与spark的资源以及spark-submit的配置有关系。

3.5 未解决的问题

到这里,貌似所有问题都解决了,但是将所有流程串起来后,发现如果hive和hdfs两种来源的数据交叉操作的话,很容易报错,错误有如下几种:

###
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

###
pyspark.sql.utils.AnalysisException: u"Table or view not found:

###
pyspark.sql.utils.AnalysisException: u'org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.transport.TTransportException;'

现在还没弄清是什么原因,但是先hive后hdfs就没问题。

4.总结

  • dataframe更适合结构化的数据,spark还是有很多方法值得探索。
  • 找问题要有方法,不能臆测。其实上述讲到的错误尝试和正确方法花费的时间差不多,但是如果我从开始就打印出每个步骤的运行时间,我就可以省去错误尝试的时间。
  • 快速迭代离不开高效的数据处理。
  • 确定目标可行后,一定要寻找解决方案,你会发现方案比你想象的要多。

参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容