一个Spark DataFrame操作的例子

1 定义数据库连接

Class.forName("com.mysql.jdbc.Driver").newInstance()
val DB_URL_R = "jdbc:mysql://10.1.11.18/medical_waste?useSSL=false&characterEncoding=utf8"
val PROPERTIES = new java.util.Properties()
PROPERTIES.setProperty("user", "********")
PROPERTIES.setProperty("password", "********")

2 读取两个表

val rfidCardDF = spark.read.jdbc(DB_URL_R, "t_rfid_card", Array("card_type = 22"), PROPERTIES).select("card_id","card_label").cache()
val medicalWasteDF = spark.read.jdbc(DB_URL_R, "t_medical_waste", Array("YEAR(rec_ts) = 2017"), PROPERTIES).select("team_id","mw_weight").cache()

3 连接

val df2 = medicalWasteDF.join(rfidCardDF,medicalWasteDF("team_id") equalTo(rfidCardDF("card_id"))).drop("card_id").cache()

使用join,默认是left out join。条件判断是相等。然后删除掉一个重复的列card_id。

4 统计

df2.groupBy("card_label").agg(sum("mw_weight")).orderBy(col("sum(mw_weight)").desc).show()​

group by操作,生成一个新的数据集,增加了一列sum操作,生成一个默认列名sum(mw_weight)的列,然后倒序排个序。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容