Spark开发--Spark SQL--内置函数(十二)

文档地址:http://spark.apache.org/docs/latest/api/sql/index.html

一、常用函数

org.apache.spark.sql.functions类。
内置函数基本都在这个类里面。包括聚合函数,集合函数,日期时间函数,字符串函数,数学函数,排序函数,窗口函数等。约有299个函数。
测试数据:

{"EMPNO": 7369,"ENAME": "SMITH","JOB": "CLERK","MGR": 7902,"HIREDATE": "1980-12-17 00:00:00","SAL": 800.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7499,"ENAME": "ALLEN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-20 00:00:00","SAL": 1600.00,"COMM": 300.00,"DEPTNO": 30}
{"EMPNO": 7521,"ENAME": "WARD","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-22 00:00:00","SAL": 1250.00,"COMM": 500.00,"DEPTNO": 30}
{"EMPNO": 7566,"ENAME": "JONES","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-04-02 00:00:00","SAL": 2975.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7654,"ENAME": "MARTIN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-28 00:00:00","SAL": 1250.00,"COMM": 1400.00,"DEPTNO": 30}
{"EMPNO": 7698,"ENAME": "BLAKE","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-05-01 00:00:00","SAL": 2850.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7782,"ENAME": "CLARK","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-06-09 00:00:00","SAL": 2450.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7788,"ENAME": "SCOTT","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1987-04-19 00:00:00","SAL": 1500.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7839,"ENAME": "KING","JOB": "PRESIDENT","MGR": null,"HIREDATE": "1981-11-17 00:00:00","SAL": 5000.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7844,"ENAME": "TURNER","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-08 00:00:00","SAL": 1500.00,"COMM": 0.00,"DEPTNO": 30}
{"EMPNO": 7876,"ENAME": "ADAMS","JOB": "CLERK","MGR": 7788,"HIREDATE": "1987-05-23 00:00:00","SAL": 1100.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7900,"ENAME": "JAMES","JOB": "CLERK","MGR": 7698,"HIREDATE": "1981-12-03 00:00:00","SAL": 950.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7902,"ENAME": "FORD","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1981-12-03 00:00:00","SAL": 3000.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7934,"ENAME": "MILLER","JOB": "CLERK","MGR": 7782,"HIREDATE": "1982-01-23 00:00:00","SAL": 1300.00,"COMM": null,"DEPTNO": 10}

测试方法:
以编程的方式使用内置函数:

import org.apache.spark.sql.functions._
scala> df.select(lower(col("ename")).as("name"), col("sal")).show()
+------+------+
|  name|   sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
|miller|1300.0|
+------+------+

以SQL的方式使用:

df.createOrReplaceTempView("emp")
scala> spark.sql("select lower(ename) as name,sal from emp").show()
+------+------+
|  name|   sal|
+------+------+
| smith| 800.0|
| allen|1600.0|
| james| 950.0|
|  ford|3000.0|
|miller|1300.0|
+------+------+

1. 字符串函数

1) concat对于字符串进行拼接

concat(str1, str2, ..., strN) - Returns the concatenation of str1, str2, ..., strN.

scala>  spark.sql("SELECT concat('Spark', 'SQL')").show
+------------------+
|concat(Spark, SQL)|
+------------------+
|          SparkSQL|
+------------------+

2) concat_ws在拼接的字符串中间添加某种格式

concat_ws(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by sep.
# 在单词之间添加空格。
scala>  spark.sql("SELECT concat_ws(' ', 'Spark', 'SQL')").show
+------------------------+
|concat_ws( , Spark, SQL)|
+------------------------+
|               Spark SQL|
+------------------------+

3) decode转码

decode(bin, charset) - Decodes the first argument using the second argument character set.

scala>  spark.sql("SELECT decode(encode('abc', 'utf-8'), 'utf-8')").show
+---------------------------------+
|decode(encode(abc, utf-8), utf-8)|
+---------------------------------+
|                              abc|
+---------------------------------+

4) encode设置编码格式

encode(str, charset) - Encodes the first argument using the second argument character set.

scala>  spark.sql("SELECT encode('abc', 'utf-8')").show
+------------------+
|encode(abc, utf-8)|
+------------------+
|        [61 62 63]|
+------------------+

5) format_string/printf 格式化字符串

format_string(strfmt, obj, ...) - Returns a formatted string from printf-style format strings.

scala>  spark.sql("SELECT format_string('Hello World %d %s', 100, 'days')").show
+-------------------------------------------+
|format_string(Hello World %d %s, 100, days)|
+-------------------------------------------+
|                       Hello World 100 days|
+-------------------------------------------+

6) initcap将每个单词的首字母变为大写,其他字母小写; lower全部转为小写,upper大写

initcap(str) - Returns str with the first letter of each word in uppercase. All other letters are in lowercase. Words are delimited by white space.

scala>  spark.sql("SELECT initcap('sPark sql')").show
+------------------+
|initcap(sPark sql)|
+------------------+
|         Spark Sql|
+------------------+

7) length返回字符串的长度

scala>  spark.sql("SELECT length('Spark SQL ')").show
+------------------+
|length(Spark SQL )|
+------------------+
|                10|
+------------------+

8) levenshtein编辑距离(将一个字符串变为另一个字符串的距离)

levenshtein(str1, str2) - Returns the Levenshtein distance between the two given strings.

scala>  spark.sql("SELECT levenshtein('kitten', 'sitting')").show
+----------------------------+
|levenshtein(kitten, sitting)|
+----------------------------+
|                           3|
+----------------------------+

9) lpad返回固定长度的字符串,如果长度不够,用某种字符补全,rpad右补全

lpad(str, len, pad) - Returns str, left-padded with pad to a length of len. If str is longer than len, the return value is shortened to len characters.

scala>  spark.sql("SELECT lpad('hi', 5, '??')").show
+---------------+
|lpad(hi, 5, ??)|
+---------------+
|          ???hi|
+---------------+

10) ltrim去除空格或去除开头的某些字符,rtrim右去除,trim两边同时去除

ltrim(str) - Removes the leading space characters from str.

ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string

scala>  spark.sql("SELECT ltrim('    SparkSQL   ')").show
+----------------------+
|ltrim(    SparkSQL   )|
+----------------------+
|           SparkSQL   |
+----------------------+

scala>  spark.sql("SELECT ltrim('Sp', 'SSparkSQLS')").show
+---------------------+
|ltrim(SSparkSQLS, Sp)|
+---------------------+
|              arkSQLS|
+---------------------+

11) regexp_extract 正则提取某些字符串,regexp_replace正则替换

# \转义需\\\\
scala>  spark.sql("SELECT regexp_extract('100-200', '(\\\\d+)-(\\\\d+)', 1) as str").show
+---+
|str|
+---+
|100|
+---+

scala>  spark.sql("SELECT regexp_extract('foothebar', 'foo(.*?)(bar)',1) as str").show
+---+
|str|
+---+
|the|
+---+

spark.sql("SELECT regexp_replace('100-200', '(\\\\d+)', 'num') as str").show

12) repeat复制给的字符串n次

scala>  spark.sql("SELECT repeat('123', 2)").show
+--------------+
|repeat(123, 2)|
+--------------+
|        123123|
+--------------+

13) instr返回截取字符串的位置/locate

instr(str, substr) - Returns the (1-based) index of the first occurrence of substr in str.

scala>  spark.sql("SELECT instr('SparkSQL', 'SQL')").show
+--------------------+
|instr(SparkSQL, SQL)|
+--------------------+
|                   6|
+--------------------+

scala>  spark.sql(" SELECT locate('bar', 'foobarbar')").show
+-------------------------+
|locate(bar, foobarbar, 1)|
+-------------------------+
|                        4|
+-------------------------+

14) space 在字符串前面加n个空格

space(n) - Returns a string consisting of n spaces.

scala>  spark.sql("SELECT concat(space(2), '1')").show
+-------------------+
|concat(space(2), 1)|
+-------------------+
|                  1|
+-------------------+

15) split以某些字符拆分字符串

split(str, regex) - Splits str around occurrences that match regex.

scala>  spark.sql("SELECT split('oneAtwoBthreeC', '[ABC]')").show
+----------------------------+
|split(oneAtwoBthreeC, [ABC])|
+----------------------------+
|         [one, two, three, ]|
+----------------------------+

16) substr截取字符串,substring_index

scala>  spark.sql("SELECT substr('Spark SQL', 5)").show
+-----------------------------------+
|substring(Spark SQL, 5, 2147483647)|
+-----------------------------------+
|                              k SQL|
+-----------------------------------+

scala>  spark.sql("SELECT substr('Spark SQL', -3)").show
+------------------------------------+
|substring(Spark SQL, -3, 2147483647)|
+------------------------------------+
|                                 SQL|
+------------------------------------+

scala>  spark.sql("SELECT substr('Spark SQL', 5, 1)").show
+--------------------------+
|substring(Spark SQL, 5, 1)|
+--------------------------+
|                         k|
+--------------------------+

scala>  spark.sql("SELECT substring_index('www.apache.org', '.', 2)").show
+-------------------------------------+
|substring_index(www.apache.org, ., 2)|
+-------------------------------------+
|                           www.apache|
+-------------------------------------+

17) translate 替换某些字符串为

scala>  spark.sql("SELECT translate('AaBbCc', 'abc', '123')").show
+---------------------------+
|translate(AaBbCc, abc, 123)|
+---------------------------+
|                     A1B2C3|
+---------------------------+

18) get_json_object

get_json_object(json_txt, path) - Extracts a json object from path.

scala>  spark.sql("SELECT get_json_object('{\"a\":\"b\"}', '$.a')").show
+-------------------------------+
|get_json_object({"a":"b"}, $.a)|
+-------------------------------+
|                              b|
+-------------------------------+

19) .unhex

unhex(expr) - Converts hexadecimal expr to binary.

scala>  spark.sql("SELECT decode(unhex('537061726B2053514C'), 'UTF-8')").show
+----------------------------------------+
|decode(unhex(537061726B2053514C), UTF-8)|
+----------------------------------------+
|                               Spark SQL|
+----------------------------------------+

20) .to_json

to_json(expr[, options]) - Returns a json string with a given struct value
#对象
scala>  spark.sql("SELECT to_json(named_struct('a', 1, 'b', 2))").show
+---------------------------------------+
|structstojson(named_struct(a, 1, b, 2))|
+---------------------------------------+
|                          {"a":1,"b":2}|
+---------------------------------------+
# 日期
scala>  spark.sql("SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))").show(false)
+---------------------------------------------------------------------------+
|structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd')))|
+---------------------------------------------------------------------------+
|{"time":"26/08/2015"}                                                      |
+---------------------------------------------------------------------------+

#  列表
scala>  spark.sql("SELECT to_json(array(named_struct('a', 1, 'b', 2)))").show
+----------------------------------------------+
|structstojson(array(named_struct(a, 1, b, 2)))|
+----------------------------------------------+
|                               [{"a":1,"b":2}]|
+----------------------------------------------+

# 对象嵌套
scala>  spark.sql("SELECT to_json(map('a', named_struct('b', 1)))").show
+-----------------------------------------+
|structstojson(map(a, named_struct(b, 1)))|
+-----------------------------------------+
|                            {"a":{"b":1}}|
+-----------------------------------------+
# 对象
scala>  spark.sql("SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)))").show
+----------------------------------------------------------+
|structstojson(map(named_struct(a, 1), named_struct(b, 2)))|
+----------------------------------------------------------+
|                                           {"[1]":{"b":2}}|
+----------------------------------------------------------+

# 数值类型
scala>  spark.sql("SELECT to_json(map('a', 1))").show
+------------------------+
|structstojson(map(a, 1))|
+------------------------+
|                 {"a":1}|
+------------------------+

# 列表
scala>  spark.sql("SELECT to_json(array((map('a', 1))))").show
+-------------------------------+
|structstojson(array(map(a, 1)))|
+-------------------------------+
|                      [{"a":1}]|
+-------------------------------+

2. 聚合函数

数据使用测试数据。
创建DF:

// 需要导入 spark sql 内置的函数包
import org.apache.spark.sql.functions._
// val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
// 注册为临时视图
empDF.createOrReplaceTempView("emp")
empDF.show()
1) avg与mean

内置的求平均数的函数。

scala> spark.sql("select id from aaa").select(avg("id")).show

scala> spark.sql("select id from aaa").select(mean("id")).show

empDF.select(avg("sal")).show()
2) count

计数

scala> spark.sql("select id from aaa").select(count("id")).show

// 计算员工人数
empDF.select(count("ename")).show()
3) countDistinct

去重计数

scala> spark.sql("select id from aaa").select(countDistinct("id")).show

// 计算姓名不重复的员工人数
empDF.select(countDistinct("deptno")).show()
4) min & max

最小值 & 最大值
获取 DataFrame 中指定列的最小值或者最大值。

scala> spark.sql("select id from aaa").select(max("id")).show

scala> spark.sql("select id from aaa").select(min("id")).show

empDF.select(min("sal"),max("sal")).show()

5)sum & sumDistinct

求和以及求指定列所有不相同的值的和。

scala> spark.sql("select id from aaa").select(sum("id")).show

empDF.select(sum("sal")).show()
empDF.select(sumDistinct("sal")).show()

6) approx_count_distinct计算去重后的值的大约个数

通常在使用大型数据集时,你可能关注的只是近似值而不是准确值,这时可以使用 approx_count_distinct 函数,并可以使用第二个参数指定最大允许误差。
jdz参数是用来规定值之间的相似度(精度值),值与值之间的相似度(精度)达到 jdz, 则将其看作是一样的值。jdz越小说明值与值之间越相似。控制在 0到0.4(不包括)之间。

scala> spark.sql("select id from aaa").select(approx_count_distinct ("id",0.1)).show

empDF.select(approx_count_distinct ("ename",0.1)).show()
7) first & last

获取 DataFrame 中指定列的第一个值或者最后一个值。

empDF.select(first("ename"),last("job")).show()
8) 聚合数据到集合
scala>  empDF.agg(collect_set("job"), collect_list("ename")).show()
输出:
+--------------------+--------------------+
|    collect_set(job)| collect_list(ename)|
+--------------------+--------------------+
|[MANAGER, SALESMA...|[SMITH, ALLEN, WA...|
+--------------------+--------------------+

3. 数学函数

Spark SQL 中还支持多种数学聚合函数,用于通常的数学计算,以下是一些常用的例子:

1)方差
// 计算总体方差var_pop、均方差var_samp、总体标准差stddev_pop、样本标准差stddev_samp
empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show()
2)计算偏度和峰度
// 计算偏度skewness和峰度kurtosis
empDF.select(skewness("sal"), kurtosis("sal")).show()
3)皮尔逊相关系数
// 计算两列的皮尔逊相关系数、样本协方差、总体协方差。
(这里只是演示,员工编号和薪资两列实际上并没有什么关联关系)
empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"),covar_pop("empno", "sal")).show()

4. 分组聚合

1) 简单分组
empDF.groupBy("deptno", "job").count().show()
//等价 SQL
spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show()
输出:
+------+---------+-----+
|deptno|      job|count|
+------+---------+-----+
|    10|PRESIDENT|    1|
|    30|    CLERK|    1|
|    10|  MANAGER|    1|
|    30|  MANAGER|    1|
|    20|    CLERK|    2|
|    30| SALESMAN|    4|
|    20|  ANALYST|    2|
|    10|    CLERK|    1|
|    20|  MANAGER|    1|
+------+---------+-----+
2) 分组聚合
empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show()
// 等价语法
empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show()
// 等价 SQL
spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show()
输出:
+------+----+------+
|deptno|人数|总工资|
+------+----+------+
|    10|   3|8750.0|
|    30|   6|9400.0|
|    20|   5|9375.0|
+------+----+------+
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容