从Structured Streaming的Dataframe中选取列,有以下几种方式:
-
df.select("columnNameStr")
,eg.
val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
.select("subsId", "durationForDay")
-
df.select(df("columnNameStr"))
, eg.
val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
val subsHourDevice = hourDevice.select(hourDevice("subsId"), hourDevice("durationForHour"))
val queryHour = subsHourDevice.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
-
df.drop("columnNameStr")
,通过drop删除不需要的列
完整的示例如下:
package com.spark.sunny.structuredstreaming
import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
case class CdrDto(eventSrcId : String, city : String, billingCycleId : String, subsId : String, custId : String,
acctId : String, billingNumber : String, usage : String, duration : String, eventBeginTime : String,
cellId : String, roamArea : String, pdpIndex : String, serviceType : String, imsi : String, imei : String
)
/**
* <Description> <br>
*
* @author Sunny<br>
* @taskId: <br>
* @version 1.0<br>
* @createDate 2018/06/19 19:45 <br>
* @see com.whalecloud.iot.cmp.streaming.receiver <br>
*/
object CdrReceiver {
def main(args: Array[String]): Unit = {
// val jdbcHostname = "10.45.82.76"
// val jdbcPort = 3306
// val jdbcDatabase ="cmpcc"
// val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
// val jdbcUsername = "dcv"
// val jdbcPassword = "DCVsmart$123"
// val driverClass = "com.mysql.jdbc.Driver"
// import java.util.Properties
// val connectionProperties = new Properties()
//
// connectionProperties.put("user", s"${jdbcUsername}")
// connectionProperties.put("password", s"${jdbcPassword}")
// connectionProperties.setProperty("Driver", driverClass)
val spark = SparkSession
.builder()
.appName("cmp-streaming")
.master("local")
.getOrCreate()
// val iotSubs = spark.read.jdbc(jdbcUrl, "iot_subs", connectionProperties)
// iotSubs.show()
import spark.implicits._
val schema = Encoders.product[CdrDto].schema
val lines = spark
.readStream
.format("json")
.schema(schema)
.load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
val beginTimeCdr = lines
.withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyy-MM-dd HH:mm:ss")))
.withColumn("eventBeginHour", substring($"eventBeginTime", 0, 13))
.withColumn("eventBeginDay", substring($"eventBeginTime", 0, 10))
val hourCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginHour", "1 hour", "1 hour"),$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
val queryHour = hourCdr.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
val dayCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginDay", "1 day", "1 day"),$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
val queryDay = dayCdr.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
queryHour.awaitTermination()
queryDay.awaitTermination()
}
}