SparkSQL读写JDBC数据

一、使用IDEA sparksql读取jdbc数据源
首先看一下mysql中的数据:

mysql> use test;

mysql> create table emp(empno int, ename varchar(100),job varchar(100),mgr int, hiredate varchar(100), sal double, comm double, deptno int);

mysql> load data infile '/usr/local/mysql/data/emp.txt' replace into table emp fields terminated by'\t';

mysql> create table dept(deptno int, dname varchar(100),loc varchar(100));   

mysql> load data infile '/usr/local/mysql/data/dept.txt' replace into table dept fields terminated by'\t';      

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept           |
| emp            |
| user           |
+----------------+
3 rows in set (0.00 sec)

mysql> select * from emp;
+-------+--------+-----------+------+------------+-------+------+--------+
| empno | ename  | job       | mgr  | hiredate   | sal   | comm | deptno |
+-------+--------+-----------+------+------------+-------+------+--------+
|  7369 | SMITH  | CLERK     | 7902 | 1980-12-17 |   800 |    0 |     20 |
|  7499 | ALLEN  | SALESMAN  | 7698 | 1981-2-20  |  1600 |  300 |     30 |
|  7521 | WARD   | SALESMAN  | 7698 | 1981-2-22  |  1250 |  500 |     30 |
|  7566 | JONES  | MANAGER   | 7839 | 1981-4-2   |  2975 |    0 |     20 |
|  7654 | MARTIN | SALESMAN  | 7698 | 1981-9-28  |  1250 | 1400 |     30 |
|  7698 | BLAKE  | MANAGER   | 7839 | 1981-5-1   |  2850 |    0 |     30 |
|  7782 | CLARK  | MANAGER   | 7839 | 1981-6-9   |  2450 |    0 |     10 |
|  7788 | SCOTT  | ANALYST   | 7566 | 1987-4-19  |  3000 |    0 |     20 |
|  7839 | KING   | PRESIDENT |    0 | 1981-11-17 |  5000 |    0 |     10 |
|  7844 | TURNER | SALESMAN  | 7698 | 1981-9-8   |  1500 |    0 |     30 |
|  7876 | ADAMS  | CLERK     | 7788 | 1987-5-23  |  1100 |    0 |     20 |
|  7900 | JAMES  | CLERK     | 7698 | 1981-12-3  |   950 |    0 |     30 |
|  7902 | FORD   | ANALYST   | 7566 | 1981-12-3  |  3000 |    0 |     20 |
|  7934 | MILLER | CLERK     | 7782 | 1982-1-23  |  1300 |    0 |     10 |
|  8888 | HIVE   | PROGRAM   | 7839 | 1988-1-23  | 10300 |    0 |   NULL |
+-------+--------+-----------+------+------------+-------+------+--------+
15 rows in set (0.00 sec)

IDEA代码如下:

import org.apache.spark.sql.SparkSession

object ExtDSApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("ExtDSApp")
      .master("local[2]")
      .getOrCreate()

val emp = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load().show()

    spark.stop()
  }
}

运行报错:

Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

找不到jdbc驱动,所以需要在加入驱动jar包

也可以在pom文件中直接添加依赖

<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

加入jar包之后再运行,报错:

Exception in thread "main" java.sql.SQLException: null,  message from server: "Host '192.168.137.1' is not allowed to connect to this MySQL server"

没有权限访问数据库,需要开放数据库的访问权限

mysql> grant all privileges on test.* to root@'192.168.137.251' identified by '123456';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)

再次运行,读取成功

+-----+------+---------+----+----------+-------+------+------+
|empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
| 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
| 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
+-----+------+---------+----+----------+-------+------+------+

二、使用spark-shell测试

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar 
scala> val empDF = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load()
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala> empDF.show
+-----+------+---------+----+----------+-------+------+------+
|empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
| 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
| 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
| 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
| 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
| 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
| 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
| 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
| 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
| 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
+-----+------+---------+----+----------+-------+------+------+

三、SparkSQL/Hive中数据与JDBC中数据做聚合
SparkSQL中数据如下:

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|     dept|      false|
| default|      emp|      false|
+--------+---------+-----------+

scala> spark.sql("select * from dept").show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPREATIONS|  BOSTON|
+------+----------+--------+

下面和mysql中的emp表做join(上文中已经把mysql中的emp表转成empDF)

scala> val deptDF = spark.table("dept")
deptDF: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]

scala> deptDF.show
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPREATIONS|  BOSTON|
+------+----------+--------+

scala> empDF.join(deptDF, empDF.col("deptno")===deptDF.col("deptno")).show
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
|empno| ename|      job| mgr|  hiredate|   sal|  comm|deptno|deptno|     dname|     loc|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|    30|     SALES| CHICAGO|
| 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|    30|     SALES| CHICAGO|
| 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|    30|     SALES| CHICAGO|
| 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
| 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7839|  KING|PRESIDENT|   0|1981-11-17|5000.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
| 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|   0.0|    30|    30|     SALES| CHICAGO|
| 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
| 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+

四、从sparksql写到mysql中

scala> val empDF = spark.table("emp");
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()

查看mysql

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept           |
| emp            |
| emp_sparksql   |
| user           |
+----------------+
4 rows in set (0.00 sec)

再写一次会报错:表已经存在

scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
org.apache.spark.sql.AnalysisException: Table or view 'test.emp_sparksql' already exists. SaveMode: ErrorIfExists.;

需要加入mode参数

scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()

还有mode("append")可以在有需要的时候使用
这样写入之后有个区别:

mysql> desc emp_sparksql;
+----------+---------+------+-----+---------+-------+
| Field    | Type    | Null | Key | Default | Extra |
+----------+---------+------+-----+---------+-------+
| empno    | int(11) | YES  |     | NULL    |       |
| ename    | text    | YES  |     | NULL    |       |
| job      | text    | YES  |     | NULL    |       |
| mgr      | int(11) | YES  |     | NULL    |       |
| hiredate | text    | YES  |     | NULL    |       |
| salary   | double  | YES  |     | NULL    |       |
| comm     | double  | YES  |     | NULL    |       |
| deptno   | int(11) | YES  |     | NULL    |       |
+----------+---------+------+-----+---------+-------+
8 rows in set (0.00 sec)

mysql> desc emp;
+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno    | int(11)      | YES  |     | NULL    |       |
| ename    | varchar(100) | YES  |     | NULL    |       |
| job      | varchar(100) | YES  |     | NULL    |       |
| mgr      | int(11)      | YES  |     | NULL    |       |
| hiredate | varchar(100) | YES  |     | NULL    |       |
| sal      | double       | YES  |     | NULL    |       |
| comm     | double       | YES  |     | NULL    |       |
| deptno   | int(11)      | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

数据类型出现了变化,可以加入一个option指定每列的数据类型

scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp1_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").option("createTableColumnTypes", "ename varchar(100),job varchar(100), hiredate varchar(100)").save()

mysql> desc emp1_sparksql;
+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno    | int(11)      | YES  |     | NULL    |       |
| ename    | varchar(100) | YES  |     | NULL    |       |
| job      | varchar(100) | YES  |     | NULL    |       |
| mgr      | int(11)      | YES  |     | NULL    |       |
| hiredate | varchar(100) | YES  |     | NULL    |       |
| salary   | double       | YES  |     | NULL    |       |
| comm     | double       | YES  |     | NULL    |       |
| deptno   | int(11)      | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

附:table函数的源码:

/**
   * Returns the specified table/view as a `DataFrame`.
   *
   * @param tableName is either a qualified or unqualified name that designates a table or view.
   *                  If a database is specified, it identifies the table/view from the database.
   *                  Otherwise, it first attempts to find a temporary view with the given name
   *                  and then match the table/view from the current database.
   *                  Note that, the global temporary view database is also valid here.
   * @since 2.0.0
   */
  def table(tableName: String): DataFrame = {
    table(sessionState.sqlParser.parseTableIdentifier(tableName))
  }

五、sql的方式读取JDBC数据

[hadoop@hadoop000 bin]$ ./spark-sql --master local[2] --driver-class-path ~/software/mysql-connector-java-5.1.27.jar
spark-sql>CREATE TEMPORARY VIEW emp_mysql USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://hadoop000:3306/",dbtable "test.emp",user 'root', password '123456');
Time taken: 0.517 seconds
18/09/26 02:53:17 INFO thriftserver.SparkSQLCLIDriver: Time taken: 0.517 seconds
spark-sql> show tables;
default dept    false
default emp     false
        emp_mysql       true
Time taken: 0.142 seconds, Fetched 3 row(s)
spark-sql> select* from emp_mysql;
7369    SMITH   CLERK   7902    1980-12-17      800.0   0.0     20
7499    ALLEN   SALESMAN        7698    1981-2-20       1600.0  300.0   30
7521    WARD    SALESMAN        7698    1981-2-22       1250.0  500.0   30
7566    JONES   MANAGER 7839    1981-4-2        2975.0  0.0     20
7654    MARTIN  SALESMAN        7698    1981-9-28       1250.0  1400.0  30
7698    BLAKE   MANAGER 7839    1981-5-1        2850.0  0.0     30
7782    CLARK   MANAGER 7839    1981-6-9        2450.0  0.0     10
7788    SCOTT   ANALYST 7566    1987-4-19       3000.0  0.0     20
7839    KING    PRESIDENT       0       1981-11-17      5000.0  0.0     10
7844    TURNER  SALESMAN        7698    1981-9-8        1500.0  0.0     30
7876    ADAMS   CLERK   7788    1987-5-23       1100.0  0.0     20
7900    JAMES   CLERK   7698    1981-12-3       950.0   0.0     30
7902    FORD    ANALYST 7566    1981-12-3       3000.0  0.0     20
7934    MILLER  CLERK   7782    1982-1-23       1300.0  0.0     10
8888    HIVE    PROGRAM 7839    1988-1-23       10300.0 0.0     NULL
Time taken: 2.844 seconds, Fetched 15 row(s)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容