为了资源共用,我们的数据和上一篇文章一样,这个实战案例的技术和思想在现实开发的需求中还是比较常见的,以广告为例子,需求是:
需求:统计各广告最近1小时内的点击量趋势
一、Kafka 消费主题的数据
[root@cdh101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic luchangyin --from-beginning
二、代码的实现
2.1 消费Kafka的源是数据:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.text.SimpleDateFormat
import java.util.Date
/**
* Desc: 需求:统计各广告最近1小时内的点击量趋势,每6s更新一次(各广告最近1小时内各分钟的点击量)
* -采集周期: 3s
* -最近一小时: 抽口的长度为1小时
* -每6s更新一次:窗口滑动的步长
* -各分钟的点击量 ((advId,hhmm),1)
*/
object RealTime_App02 {
def main(args: Array[String]): Unit = {
//创建配置文件对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
//创建SparkStreaming执行的上下文
val ssc = new StreamingContext(conf, Seconds(3))
//kafka参数声明
val brokers = "cdh101:9092,cdh102:9092,cdh103:9092"
val topic = "luchangyin"
val group = "cloudera_mirrormaker"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val autooffsetreset = "latest"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.AUTO_OFFSET_RESET_DOC -> autooffsetreset,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
//设置检查点目录
ssc.checkpoint("D:\\MySoftware\\StudySoftware\\MyIdea\\luchangyin2021\\MyFirstBigScreen\\TestFSLJavaDemon\\src\\main\\ck2")
//创建DS
val kafkaDS: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Set(topic), kafkaParams)
)
//从kafka的kv值中取value 1616683286749,华东,上海,102,1
val dataDS = kafkaDS.map(_.value())
dataDS.print()
// 2.2 定义窗口大小以及滑动的步长以及对结构进行转换聚合
ssc.start()
ssc.awaitTermination()
}
}
运行结果:2.2 对数据结构进行转换聚合:
// 2.2 定义窗口大小以及滑动的步长以及对结构进行转换聚合
val windowDS:DStream[String] = dataDS.window(Seconds(6),Seconds(3))
//对结构进行转换 (advId_hhmm,1)
val mapDS: DStream[(String, Int)] = windowDS.map{
line => {
val fields: Array[String] = line.split(",")
val timeStmp: Long = fields(0).toLong
val day: Date = new Date(timeStmp)
//定义SimpleDateFormat对日期进行转换
val sdf = new SimpleDateFormat("mm:ss")
val time: String = sdf.format(day)
(fields(4) +"_"+ time, 1)
}
}
//对数据进行聚合
val resDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
resDS.print() // (5_44:05,10)
最终的运行结果为:到此为止,这个案例就实现成了,其实还是挺简单的哦,好了,开搞吧少年。。。