Structured Streaming筛选出需要的列

从Structured Streaming的Dataframe中选取列,有以下几种方式:

  1. df.select("columnNameStr"),eg.
val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
      .select("subsId", "durationForDay")
  1. df.select(df("columnNameStr")), eg.
val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
    val subsHourDevice = hourDevice.select(hourDevice("subsId"), hourDevice("durationForHour"))

    val queryHour = subsHourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()
  1. df.drop("columnNameStr"),通过drop删除不需要的列

完整的示例如下:

package com.spark.sunny.structuredstreaming

import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

case class CdrDto(eventSrcId : String, city : String, billingCycleId : String, subsId : String, custId : String,
                  acctId : String, billingNumber : String, usage : String, duration : String, eventBeginTime : String,
                  cellId : String, roamArea : String, pdpIndex : String, serviceType : String, imsi : String, imei : String
                 )
/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/19 19:45 <br>
  * @see com.whalecloud.iot.cmp.streaming.receiver <br>
  */
object CdrReceiver {
  def main(args: Array[String]): Unit = {
//    val jdbcHostname = "10.45.82.76"
//    val jdbcPort = 3306
//    val jdbcDatabase ="cmpcc"
//    val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
//    val jdbcUsername = "dcv"
//    val jdbcPassword = "DCVsmart$123"
//    val driverClass = "com.mysql.jdbc.Driver"
//    import java.util.Properties
//    val connectionProperties = new Properties()
//
//    connectionProperties.put("user", s"${jdbcUsername}")
//    connectionProperties.put("password", s"${jdbcPassword}")
//    connectionProperties.setProperty("Driver", driverClass)

    val spark = SparkSession
        .builder()
        .appName("cmp-streaming")
        .master("local")
        .getOrCreate()

//    val iotSubs = spark.read.jdbc(jdbcUrl, "iot_subs", connectionProperties)
//    iotSubs.show()

    import spark.implicits._

    val schema = Encoders.product[CdrDto].schema
    val lines =  spark
      .readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")

    val beginTimeCdr = lines
      .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyy-MM-dd HH:mm:ss")))
      .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 13))
      .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 10))

    val hourCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginHour", "1 hour", "1 hour"),$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    val queryHour = hourCdr.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    val dayCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginDay", "1 day", "1 day"),$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

    val queryDay = dayCdr.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    queryHour.awaitTermination()
    queryDay.awaitTermination()

  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容