现有数据类型如下(部分)
A 202.106.196.115 手机 iPhone8 8000
B 202.106.0.20 服装 布莱奥尼西服 199
C 202.102.152.3 家具 婴儿床 2000
D 202.96.96.68 家电 电饭锅 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 奶粉 600
将此类型的数据先将ip转换成归属地,再求计算成交金额、计算商品分类、计算区域成交金额,并将结果写入到Redis中
代码如下:
package day10
import day09.IpUtils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
object OrderCount {
def main(args: Array[String]): Unit = {
//指定组名
val group="g001"
//创建SparkConf
val conf=new SparkConf().setMaster("local[4]").setAppName("OrderCount")
//创建SparkStreaming,并设置间隔时间
val ssc=new StreamingContext(conf,Duration(5000))
val broadcastRef=IpUtils.broadcastIpRules(ssc,"D:\\data\\ip\\ip.txt")
//指定消费者的名字
val topic="orders"
//(sparkstream的Task直连到kafka的分区上,使用更加底层的API,效率更高)
val brokerList="pro01:9092,pro02:9092,pro03:9092,pro04:9092"
//指定zookeeper的地址,后期更新消费的偏移量使用(以后可以使用Redis,Mysql来记录偏移量)
val zkQuorum="pro01:2181,pro02:2181,pro03:2181,pro04:2181"
//创建stream时使用的topic名字集合,SaprkStreaming可以同时消费多个topic
val topics=Set(topic)
//创建一个ZKGroupTopicDirs对象,其实是指定往zookeeper中写入数据的目录,用于保存偏移量
val topicDirs=new ZKGroupTopicDirs(group,topic)
//获取zookeeper中的路径“/g001/offsets/wordcount”
val zkTopicPath=s"${topicDirs.consumerOffsetDir}"
//准备kafka的参数
val kafkaParams=Map(
// "key.deserializer" -> classOf[StringDeserializer],
// "value.deserializer"->classOf[StringDeserializer],
// "deserializer.encoding"->"GB2312",//配置读取kafka中数据的编码
"metadata.broker.list"->brokerList,
"group.id"->group,
//从头开始读取数据
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
)
//zookeeper的host和ip,创建一个client,用于更新偏移量的
//是zookeeper的客户端,可以从zookeeper读取偏移量数据,并更新偏移量
val zkClient=new ZkClient(zkQuorum)
//查询该路径下是否有子节点(默认有子节点为我们自己保存不同partition时生成的)
// /g001/offsets/wordcount/0/10001"
// /g001/offsets/wordcount/1/30001"
// /g001/offsets/wordcount/2/10001"
val children=zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[(String,String)] =null
//如果zookeeper中有保存offset,我们会利用这个offset作为kafkaStream的起始位置
var fromOffsets:Map[TopicAndPartition,Long]=Map()
//如果保存过offset
if(children>0){
for(i<-0 until children){
// /g001/offsets/wordcount/0/10001"
// /g001/offsets/wordcount/
val partitionOffset:String =zkClient.readData[String](s"$zkTopicPath/${i}")
//owrdcount/0
val tp =TopicAndPartition(topic,i)
//将不同partition对应的offset增加到fromOffsets中
//wordcount/0->10001
fromOffsets+=(tp -> partitionOffset.toLong)
}
//Key:kafka的的key values:"hello tom hello jerry"
//这个会将kafka的消息进行transform,最终kafka的数据都会变成(kafka的的key,message)这样的tuple
val messageHandler=(mmd:MessageAndMetadata[String,String])=>(mmd.topic,mmd.message())
//通过kafkaUtils创建直连的Dstream(fromOffset参数的作用是:按照前面计算好了的偏移量继续消费数据)
//[String,String,StringDecoder,StringDecoder, (String,String)]
// key value key的解码方式 value的解码方式
kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
}else{
//如果未保存,根据kafkaParam的配置使用最新(largest)或者最旧的(smallest)Offset
kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}
//偏移量的范围
var offsetRanges=Array[OffsetRange]()
//直连方式只有在KafkaDstream的RDD中才能获取偏移量,那么就不能调用Dstream的Transformation
//所以只能在kafkaStream调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了
//依次迭代kafkaDstream中的kafkaRDD
kafkaStream.foreachRDD{ kafkaRDD=>
//判断当前的kafkaStream中的RDD是否有数据
if(!kafkaRDD.isEmpty()){
//只有kafkaRDD可以强转成HasOffsetRanges,并获取到偏移量
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
val lines: RDD[String] = kafkaRDD.map(_._2)
//整理数据
val fields = lines.map(_.split(" "))
//计算成交金额
Cacal.calculateIncome(fields)
//计算商品分类
Cacal.calculateItem(fields)
//计算区域成交金额
Cacal.calculateZone(fields,broadcastRef)
lines.foreachPartition(partition=>
partition.foreach(x=>{
println(x)
})
)
for(o<-offsetRanges){
// /g001/offsets/wordcount/2
val zkPath=s"${topicDirs.consumerOffsetDir}/${o.partition}"
//将partition的offset保存到zookeeper
// /g001/offsets/wordcount/2/10001"
ZkUtils.updatePersistentPath(zkClient,zkPath,o.untilOffset.toString)
}
}
}
//启动SparkStreaming程序
ssc start()
//等待优雅的退出(当前的任务结束完了再退出)
ssc.awaitTermination()
}
}
Conset类:
package day10
object Conset {
val TOTAL_INCOME="TOTAL_INCOME"
}
Cacal类:
package day10
import day01.ip.TestIp
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
object Cacal {
def calculateIncome(fields:RDD[Array[String]]) = {
//将数据进行计算写入到Redis
val priceRDD = fields.map(arr => {
val price = arr(4).toDouble
price
})
//reduce是一个Action,会把结果返回到Deiver端
//将当前批次的总金额返回
val sum = priceRDD.reduce(_+_)
//获取一个jedis连接
val conn = JedisDemo.getConnection()
//将历史值和当前的值进行累加
// conn.set(Conset.TOTAL_INCOME,sum.toString)
conn.incrByFloat(Conset.TOTAL_INCOME,sum)
//释放连接
conn.close()
}
//计算分类的成交金额
def calculateItem(fields:RDD[Array[String]]) = {
//对filed的map方法是在driver端调用的
val itemser: RDD[(String, Double)] = fields.map(arr => {
//商品分类
val item = arr(2)
//价格
val price = arr(4).toDouble
(item, price)
})
//按商品分类进行聚合
val reduced = itemser.reduceByKey(_+_)
//获取一个Jedis连接
//在driver端拿连接不好
// val conn=JedisDemo.getConnection()
//将当前批次累计的数据放入redis中
//foreachPartition是一个Action
reduced.foreachPartition(part=>{
//获取一个Jedis连接
val conn=JedisDemo.getConnection()
part.foreach(t=>{
conn.incrByFloat(t._1,t._2)
})
//将当前分区中的数据更新完后关闭连接
})
}
def calculateZone(fiels:RDD[Array[String]],broadcastRef:Broadcast[Array[(Long,Long,String)]])={
val provinceAndPrice = fiels.map(arr => {
val ip = arr(1)
val price = arr(4).toDouble
val ipNum = TestIp.ip2Long(ip)
//在Executor中获取到广播变量的全部规则
val allRules: Array[(Long, Long,String)] = broadcastRef.value
//二分法查找
val index: Int = TestIp.binarySearch(allRules, ipNum)
var province ="未知"
if (index != -1) {
province = allRules(index)._3
}
//省份,,订单金额
(province,1)
})
//按省份进行聚合
val reduced = provinceAndPrice.reduceByKey(_+_)
//将数据更新到Redis
reduced.foreachPartition(part=>{
val conn=JedisDemo.getConnection()
part.foreach(t=>{
conn.incrByFloat(t._1,t._2)
})
conn.close()
})
}
}
TestIp类:
package day01.ip
import scala.io.Source
class TestIp {
}
object TestIp{
def ip2Long(ip:String):Long={
val fragments=ip.split("[.]")
var ipNum =0L
for (i<-0 until fragments.length){
ipNum=fragments(i).toLong | ipNum <<8L
}
ipNum
}
def readRules(path:String):Array[(Long,Long,String)]={
val bf = Source.fromFile(path)
val lines = bf.getLines()
//对ip规则进行整理
val rules = lines.map(line => {
val fileds = line.split("[|]")
val startNum = fileds(2).toLong
val endNum = fileds(3).toLong
val province = fileds(6)
(startNum, endNum, province)
}).toArray
rules
}
def binarySearch(lines:Array[(Long,Long,String)],ip:Long):Int= {
var low =0
var high = lines.length -1
while (low <= high) {
val middle = (low + high) /2
if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
return middle
if (ip < lines(middle)._1)
high = middle -1
else {
low = middle +1
}
}
-1
}
def main(args: Array[String]): Unit = {
val rules: Array[(Long, Long,String)] =readRules("D:\\data\\ip\\ip.txt")
//将IP地址转换成十进制
val ipNum=ip2Long("111.198.38.182")
//查找
val i =binarySearch(rules,ipNum)
//根据脚本到rules中查找对应数据
val tp=rules(i)
val province=tp._3
println(province)
}
}
写完后在kafka创建一个orders的topic,并启动生产者写入数据
//我的kafka是连接zookeeper的,启动kafka之前也先启动zookeeper
而我们的消费者要获取到生产者写入的数据
然后我们在写一个小程序连接到redis查看我们的结果数据写入了没有
执行结果: