Kafka+redis+sparkstream整合

现有数据类型如下(部分)

A 202.106.196.115 手机 iPhone8 8000

B 202.106.0.20 服装 布莱奥尼西服 199

C 202.102.152.3 家具 婴儿床 2000

D 202.96.96.68 家电 电饭锅 1000

F 202.98.0.68 化妆品 迪奥香水 200

H 202.96.75.68 食品 奶粉 600

将此类型的数据先将ip转换成归属地,再求计算成交金额、计算商品分类、计算区域成交金额,并将结果写入到Redis中

代码如下:

package day10

import day09.IpUtils

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}

import org.I0Itec.zkclient.ZkClient

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderCount {

def main(args: Array[String]): Unit = {

//指定组名

    val group="g001"

    //创建SparkConf

    val conf=new SparkConf().setMaster("local[4]").setAppName("OrderCount")

//创建SparkStreaming,并设置间隔时间

    val ssc=new StreamingContext(conf,Duration(5000))

val broadcastRef=IpUtils.broadcastIpRules(ssc,"D:\\data\\ip\\ip.txt")

//指定消费者的名字

    val topic="orders"

    //(sparkstream的Task直连到kafka的分区上,使用更加底层的API,效率更高)

    val brokerList="pro01:9092,pro02:9092,pro03:9092,pro04:9092"

    //指定zookeeper的地址,后期更新消费的偏移量使用(以后可以使用Redis,Mysql来记录偏移量)

    val zkQuorum="pro01:2181,pro02:2181,pro03:2181,pro04:2181"

    //创建stream时使用的topic名字集合,SaprkStreaming可以同时消费多个topic

    val topics=Set(topic)

//创建一个ZKGroupTopicDirs对象,其实是指定往zookeeper中写入数据的目录,用于保存偏移量

    val topicDirs=new ZKGroupTopicDirs(group,topic)

//获取zookeeper中的路径“/g001/offsets/wordcount”

    val zkTopicPath=s"${topicDirs.consumerOffsetDir}"

    //准备kafka的参数

    val kafkaParams=Map(

// "key.deserializer" -> classOf[StringDeserializer],

//      "value.deserializer"->classOf[StringDeserializer],

//      "deserializer.encoding"->"GB2312",//配置读取kafka中数据的编码

"metadata.broker.list"->brokerList,

"group.id"->group,

//从头开始读取数据

      "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString

    )

//zookeeper的host和ip,创建一个client,用于更新偏移量的

//是zookeeper的客户端,可以从zookeeper读取偏移量数据,并更新偏移量

    val zkClient=new ZkClient(zkQuorum)

//查询该路径下是否有子节点(默认有子节点为我们自己保存不同partition时生成的)

//  /g001/offsets/wordcount/0/10001"

//  /g001/offsets/wordcount/1/30001"

//  /g001/offsets/wordcount/2/10001"

    val children=zkClient.countChildren(zkTopicPath)

var kafkaStream: InputDStream[(String,String)] =null

    //如果zookeeper中有保存offset,我们会利用这个offset作为kafkaStream的起始位置

    var  fromOffsets:Map[TopicAndPartition,Long]=Map()

//如果保存过offset

    if(children>0){

for(i<-0 until children){

//  /g001/offsets/wordcount/0/10001"

//  /g001/offsets/wordcount/

        val  partitionOffset:String =zkClient.readData[String](s"$zkTopicPath/${i}")

//owrdcount/0

        val tp =TopicAndPartition(topic,i)

//将不同partition对应的offset增加到fromOffsets中

//wordcount/0->10001

        fromOffsets+=(tp -> partitionOffset.toLong)

}

//Key:kafka的的key  values:"hello tom hello jerry"

//这个会将kafka的消息进行transform,最终kafka的数据都会变成(kafka的的key,message)这样的tuple

      val messageHandler=(mmd:MessageAndMetadata[String,String])=>(mmd.topic,mmd.message())

//通过kafkaUtils创建直连的Dstream(fromOffset参数的作用是:按照前面计算好了的偏移量继续消费数据)

//[String,String,StringDecoder,StringDecoder,    (String,String)]

//  key value key的解码方式  value的解码方式

      kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)

}else{

//如果未保存,根据kafkaParam的配置使用最新(largest)或者最旧的(smallest)Offset

      kafkaStream=KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

}

//偏移量的范围

    var offsetRanges=Array[OffsetRange]()

//直连方式只有在KafkaDstream的RDD中才能获取偏移量,那么就不能调用Dstream的Transformation

//所以只能在kafkaStream调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了

//依次迭代kafkaDstream中的kafkaRDD

    kafkaStream.foreachRDD{ kafkaRDD=>

//判断当前的kafkaStream中的RDD是否有数据

  if(!kafkaRDD.isEmpty()){

//只有kafkaRDD可以强转成HasOffsetRanges,并获取到偏移量

  offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

val lines: RDD[String] = kafkaRDD.map(_._2)

//整理数据

  val fields = lines.map(_.split(" "))

//计算成交金额

  Cacal.calculateIncome(fields)

//计算商品分类

  Cacal.calculateItem(fields)

//计算区域成交金额

  Cacal.calculateZone(fields,broadcastRef)

lines.foreachPartition(partition=>

partition.foreach(x=>{

println(x)

})

)

for(o<-offsetRanges){

//  /g001/offsets/wordcount/2

    val  zkPath=s"${topicDirs.consumerOffsetDir}/${o.partition}"

    //将partition的offset保存到zookeeper

//  /g001/offsets/wordcount/2/10001"

    ZkUtils.updatePersistentPath(zkClient,zkPath,o.untilOffset.toString)

}

}

}

//启动SparkStreaming程序

    ssc start()

//等待优雅的退出(当前的任务结束完了再退出)

    ssc.awaitTermination()

}

}

Conset类:

package day10

object Conset {

val TOTAL_INCOME="TOTAL_INCOME"

}

Cacal类:

package day10

import day01.ip.TestIp

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.rdd.RDD

object Cacal {

def calculateIncome(fields:RDD[Array[String]]) = {

//将数据进行计算写入到Redis

    val priceRDD = fields.map(arr => {

val price = arr(4).toDouble

price

})

//reduce是一个Action,会把结果返回到Deiver端

//将当前批次的总金额返回

    val sum = priceRDD.reduce(_+_)

//获取一个jedis连接

    val conn = JedisDemo.getConnection()

//将历史值和当前的值进行累加

//    conn.set(Conset.TOTAL_INCOME,sum.toString)

    conn.incrByFloat(Conset.TOTAL_INCOME,sum)

//释放连接

    conn.close()

}

//计算分类的成交金额

  def calculateItem(fields:RDD[Array[String]]) = {

//对filed的map方法是在driver端调用的

    val itemser: RDD[(String, Double)] = fields.map(arr => {

//商品分类

      val item = arr(2)

//价格

      val price = arr(4).toDouble

(item, price)

})

//按商品分类进行聚合

val reduced = itemser.reduceByKey(_+_)

//获取一个Jedis连接

//在driver端拿连接不好

//    val conn=JedisDemo.getConnection()

//将当前批次累计的数据放入redis中

//foreachPartition是一个Action

    reduced.foreachPartition(part=>{

//获取一个Jedis连接

      val conn=JedisDemo.getConnection()

part.foreach(t=>{

conn.incrByFloat(t._1,t._2)

})

//将当前分区中的数据更新完后关闭连接

    })

}

def calculateZone(fiels:RDD[Array[String]],broadcastRef:Broadcast[Array[(Long,Long,String)]])={

val provinceAndPrice = fiels.map(arr => {

val ip = arr(1)

val price = arr(4).toDouble

val ipNum = TestIp.ip2Long(ip)

//在Executor中获取到广播变量的全部规则

    val allRules: Array[(Long, Long,String)] = broadcastRef.value

//二分法查找

    val index: Int = TestIp.binarySearch(allRules, ipNum)

var province ="未知"

    if (index != -1) {

province = allRules(index)._3

}

//省份,,订单金额

    (province,1)

})

//按省份进行聚合

  val reduced = provinceAndPrice.reduceByKey(_+_)

//将数据更新到Redis

  reduced.foreachPartition(part=>{

val conn=JedisDemo.getConnection()

part.foreach(t=>{

conn.incrByFloat(t._1,t._2)

})

conn.close()

})

}

}

TestIp类:

package day01.ip

import scala.io.Source

class TestIp {

}

object TestIp{

def ip2Long(ip:String):Long={

val fragments=ip.split("[.]")

var ipNum =0L

    for (i<-0 until fragments.length){

ipNum=fragments(i).toLong | ipNum <<8L

    }

ipNum

}

def  readRules(path:String):Array[(Long,Long,String)]={

val bf = Source.fromFile(path)

val lines = bf.getLines()

//对ip规则进行整理

    val rules = lines.map(line => {

val fileds = line.split("[|]")

val startNum = fileds(2).toLong

val endNum = fileds(3).toLong

val province = fileds(6)

(startNum, endNum, province)

}).toArray

rules

}

def binarySearch(lines:Array[(Long,Long,String)],ip:Long):Int= {

var low =0

    var high = lines.length -1

    while (low <= high) {

val middle = (low + high) /2

      if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))

return middle

if (ip < lines(middle)._1)

high = middle -1

      else {

low = middle +1

      }

}

-1

  }

def main(args: Array[String]): Unit = {

val rules: Array[(Long, Long,String)] =readRules("D:\\data\\ip\\ip.txt")

//将IP地址转换成十进制

    val ipNum=ip2Long("111.198.38.182")

//查找

    val i =binarySearch(rules,ipNum)

//根据脚本到rules中查找对应数据

    val tp=rules(i)

val  province=tp._3

println(province)

}

}

写完后在kafka创建一个orders的topic,并启动生产者写入数据


//我的kafka是连接zookeeper的,启动kafka之前也先启动zookeeper

而我们的消费者要获取到生产者写入的数据


然后我们在写一个小程序连接到redis查看我们的结果数据写入了没有


连接redis代码

执行结果:


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容