Learning Spark [6] - Spark SQL高级函数

collect, collect_list, collect_set

collect常用的有两个函数:collect_list(不去重)和collect_set(去重)

# build a test dataframe
df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
                   'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
                   'value':[1, 2, 3, 4, 5, 6]})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
df.view()
+-----+-----+-----+
|type1|type2|value|
+-----+-----+-----+
|   a1|   b1|    1|
|   a1|   b2|    2|
|   a1|   b3|    3|
|   a2|   b4|    4|
|   a2|   b5|    5|
|   a2|   b6|    6|
+-----+-----+-----+

collect_list

spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|[b4, b5, b5]|
|   a1|[b1, b2, b3]|
+-----+------------+

collect_set

spark.sql('''SELECT type1, COLLECT_SET(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|    [b4, b5]|
|   a1|[b1, b3, b2]|
+-----+------------+

collect后返回的是一个数组,可以通过array[x]来调用数据。通过这点我们可以进行透视表的操作,类似定义array[0] as a1, array[1] as a2...

explode

explode的定义是将数组的每个数据展开,如下我们就可以将上面的dataframe还原为最初的样式。

spark.sql('''SELECT type1, EXPLODE(type2) as type2
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|   a2|   b4|
|   a2|   b5|
|   a2|   b5|
|   a1|   b1|
|   a1|   b2|
|   a1|   b3|
+-----+-----+

posexplode可以在拆分列的同时,增加一列序号

spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+-----+
|type1|index|type2|
+-----+-----+-----+
|   a2|    0|   b4|
|   a2|    1|   b5|
|   a2|    2|   b5|
|   a1|    0|   b1|
|   a1|    1|   b2|
|   a1|    2|   b3|
+-----+-----+-----+

但是如果表内有如下两个一一对应的数组,我们该如何拆分呢?

+-----+------------+---------+
|type1|       type2|    value|
+-----+------------+---------+
|   a2|[b4, b5, b5]|[4, 5, 6]|
|   a1|[b1, b2, b3]|[1, 2, 3]|
+-----+------------+---------+

按照直觉,我们尝试分别explode()

spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);

解决这个问题,我们需要使用LATERAL VIEW

lateral view

lateral view可以理解为创建了一个表,然后JOIN到了查询的表上,这样就避免了两个生成器的问题

spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a
             LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
             LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
             WHERE type_index = value_index -- 避免为笛卡尔积
             ''').show()

split

split则是将一个字符串根据分隔符,变化为一个数组

df = pd.DataFrame({'type1':['a', 'b', 'c'],
                   'type2':['1_2_3', '1_23', '_1']})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
spark.sql('''SELECT * FROM collect_test''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|    a|1_2_3|
|    b| 1_23|
|    c|   _1|
+-----+-----+
spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
+-----+-------------------+
|type1|splited_type2|
+-----+-------------------+
|    a|          [1, 2, 3]|
|    b|            [1, 23]|
|    c|              [, 1]|
+-----+-------------------+

transform

transform会引用一个函数在数组的每个元素上,返回一个数列

schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] 
t_c = spark.createDataFrame(t_list, schema) 
t_c.createOrReplaceTempView("tC")
t_c.show()
+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
              FROM tC """).show()
+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+

filter

filter为通过条件删选,返回一个数列

spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp 
              FROM tC """).show()
+--------------------+---------+
|             celsius|high_temp|
+--------------------+---------+
|[35, 36, 32, 30, ...| [40, 42]|
|[31, 32, 34, 55, 56]| [55, 56]|
+--------------------+---------+

exists

exists为判断是否包含该元素,返回一个布尔值

spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp 
              FROM tC """).show()
+--------------------+------------+
|             celsius|is_high_temp|
+--------------------+------------+
|[35, 36, 32, 30, ...|        true|
|[31, 32, 34, 55, 56]|        true|
+--------------------+------------+

reduce

reduce为通过两个函数,将数组聚合为一个值,然后对该值进行运算

spark.sql(""" SELECT celsius, 
                     reduce(celsius
                            , (t, acc) -> ((t * 9) div 5) + 32 + acc
                            , acc -> (acc div size(celsius))) as avgFahrenheit 
              FROM tC """).show()
+--------------------+-------------+ 
|             celsius|avgFahrenheit| 
+--------------------+-------------+ 
|[35, 36, 32, 30, ...|           96| 
|[31, 32, 34, 55, 56]|          105| 
+--------------------+-------------+

其他函数

Spark SQL高级函数 part1

Spark SQL高级函数 part2

Spark SQL高级函数 part3

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容