一、 数据准备
Spark SQL 的多表连接,需要预先准备测试数据。分别创建员工和部门的 Datafame,并注册为临时视图,代码如下:
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")
val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
两表的主要字段如下:
emp 员工表
|-- ENAME: 员工姓名
|-- DEPTNO: 部门编号
|-- EMPNO: 员工编号
|-- HIREDATE: 入职时间
|-- JOB: 职务
|-- MGR: 上级编号
|-- SAL: 薪资
|-- COMM: 奖金
dept 部门表
|-- DEPTNO: 部门编号
|-- DNAME: 部门名称
|-- LOC: 部门所在城市
数据:dept.json
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
{"DEPTNO": 20,"DNAME": "RESEARCH","LOC": "DALLAS"}
{"DEPTNO": 30,"DNAME": "SALES","LOC": "CHICAGO"}
{"DEPTNO": 40,"DNAME": "OPERATIONS","LOC": "BOSTON"}
数据:emp.json
{"EMPNO": 7369,"ENAME": "SMITH","JOB": "CLERK","MGR": 7902,"HIREDATE": "1980-12-17 00:00:00","SAL": 800.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7499,"ENAME": "ALLEN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-20 00:00:00","SAL": 1600.00,"COMM": 300.00,"DEPTNO": 30}
{"EMPNO": 7521,"ENAME": "WARD","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-22 00:00:00","SAL": 1250.00,"COMM": 500.00,"DEPTNO": 30}
{"EMPNO": 7566,"ENAME": "JONES","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-04-02 00:00:00","SAL": 2975.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7654,"ENAME": "MARTIN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-28 00:00:00","SAL": 1250.00,"COMM": 1400.00,"DEPTNO": 30}
{"EMPNO": 7698,"ENAME": "BLAKE","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-05-01 00:00:00","SAL": 2850.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7782,"ENAME": "CLARK","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-06-09 00:00:00","SAL": 2450.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7788,"ENAME": "SCOTT","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1987-04-19 00:00:00","SAL": 1500.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7839,"ENAME": "KING","JOB": "PRESIDENT","MGR": null,"HIREDATE": "1981-11-17 00:00:00","SAL": 5000.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7844,"ENAME": "TURNER","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-08 00:00:00","SAL": 1500.00,"COMM": 0.00,"DEPTNO": 30}
{"EMPNO": 7876,"ENAME": "ADAMS","JOB": "CLERK","MGR": 7788,"HIREDATE": "1987-05-23 00:00:00","SAL": 1100.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7900,"ENAME": "JAMES","JOB": "CLERK","MGR": 7698,"HIREDATE": "1981-12-03 00:00:00","SAL": 950.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7902,"ENAME": "FORD","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1981-12-03 00:00:00","SAL": 3000.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7934,"ENAME": "MILLER","JOB": "CLERK","MGR": 7782,"HIREDATE": "1982-01-23 00:00:00","SAL": 1300.00,"COMM": null,"DEPTNO": 10}
二、连接类型
Spark 中支持多种连接类型:
- Inner Join : 内连接;
- Full Outer Join : 全外连接;
- Left Outer Join : 左外连接;
- Right Outer Join : 右外连接;
- Left Semi Join : 左半连接;
- Left Anti Join : 左反连接;
- Natural Join : 自然连接;
-
Cross (or Cartesian) Join : 交叉 (或笛卡尔) 连接。
其中内,外连接,笛卡尔积均与普通关系型数据库中的相同,如下图所示:
这里解释一下左半连接和左反连接,这两个连接等价于关系型数据库中的IN
和NOT IN
字句:
-- LEFT SEMI JOIN
SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno
-- 等价于如下的 IN 语句
SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)
-- LEFT ANTI JOIN
SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno
-- 等价于如下的 IN 语句
SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)
所有连接类型的示例代码如下:
2.1 INNER JOIN
// 1.定义连接表达式
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2.连接查询
empDF.join(deptDF,joinExpression).select("ename","dname").show()
// 等价 SQL 如下:
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
2.2 FULL OUTER JOIN
empDF.join(deptDF, joinExpression, "outer").show()
spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
2.3 LEFT OUTER JOIN
empDF.join(deptDF, joinExpression, "left_outer").show()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
2.4 RIGHT OUTER JOIN
empDF.join(deptDF, joinExpression, "right_outer").show()
spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
2.5 LEFT SEMI JOIN
empDF.join(deptDF, joinExpression, "left_semi").show()
spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
2.6 LEFT ANTI JOIN
empDF.join(deptDF, joinExpression, "left_anti").show()
spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()
2.7 CROSS JOIN
empDF.join(deptDF, joinExpression, "cross").show()
spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
2.8 NATURAL JOIN
自然连接是在两张表中寻找那些数据类型和列名都相同的字段,然后自动地将他们连接起来,并返回所有符合条件的结果。
spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()
以下是一个自然连接的查询结果,程序自动推断出使用两张表都存在的 dept 列进行连接,其实际等价于:
spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
由于自然连接常常会产生不可预期的结果,所以并不推荐使用。
三、连接的执行
在对大表与大表之间进行连接操作时,通常都会触发 Shuffle Join,两表的所有分区节点会进行 All-to-All 的通讯,这种查询通常比较昂贵,会对网络 IO 会造成比较大的负担。
而对于大表和小表的连接操作,Spark 会在一定程度上进行优化,如果小表的数据量小于 Worker Node 的内存空间,Spark 会考虑将小表的数据广播到每一个 Worker Node,在每个工作节点内部执行连接计算,这可以降低网络的 IO,但会加大每个 Worker Node 的 CPU 负担。
是否采用广播方式进行 Join 取决于程序内部对小表的判断,如果想明确使用广播方式进行 Join,则可以在 DataFrame API 中使用 broadcast 方法指定需要广播的小表:
empDF.join(broadcast(deptDF), joinExpression).show()