PySpark 读写 MySQL

SparkSQL 有着强大的与其他存储介质交互的功能,其中就包括MySQL,这里简单介绍一下我在工作中用到的使用 PySpark 读写 MySQL 的使用。

写出到 MySQL

df_weekmile.write.mode("append") \
.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.save()

df_wekmile 是之前生成的 spark 的 DataFrame,不多说了。mode 可以根据自己的需要设置 append 或者 overwrite。这里要注意 DataFrame 的字段数与字段类型,要与 mysql 中的对应,否则无法写入。

读取 MySQL 数据

df_total_trip = spark.read.format("jdbc") \
.option("url","jdbc:mysql://ip地址:端口号/库名称") \
.option("dbtable","表名称") \
.option("user","用户名") \
.option("password","密码") \
.load()

df_total_trip.show()

可以看到,和写入其实非常类似,只不过换了几个方法。 将之前的 write 方法换成了 read 方法,最后的 save 换成了 load。 读取后直接就生成了 Spark 的 DataFrame。

但是上述这种把整个 MySQL 的表都读进来了,有时候我们可能只需要几个字段,或者是需要符合特定条件的数据,这种读取方式也是支持的。

按条件读取
dbtable = "(select * from mysql_table where mileage_tag = 4000) tmp"

    df_total_trip = spark.read.format("jdbc") \
    .option("url","jdbc:mysql://ip地址:端口号/库名称") \
    .option("dbtable",dbtable) \
    .option("user","用户名") \
    .option("password","密码") \
    .load()

    df_total_trip.show()

其实按条件读取,就是将自己的逻辑提前写好,然后写成一张临时表的格式,作为 dbtable 参数的值。而且这个临时表是必须的,不能直接写如: select ...... 这样的语句,必须在外面套上一层。

在工作中,我的数据源部分来自 MySQL 表,部分来自 hive 表,但是对于 SparkSQL 来说,处理这样的多个数据源的情况还是 so easy。

也许有更高效的读取 MySQL 的方法,如果发现,会更新到这里,如果有大神愿意指点一二,那真是感激不尽。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。