(十一)SparkStreaming数据零丢失--使用jdbc存储offset

1.MySQL创建存储offset的表格

 mysql> use test
 mysql> create table hlw_offset(
        topic varchar(32),
        groupid varchar(50),
        partitions int,
        fromoffset bigint,
        untiloffset bigint,
        primary key(topic,groupid,partitions)
        );

2. Maven依赖包

   <scala.version>2.11.8</scala.version>
   <spark.version>2.3.1</spark.version>
   <scalikejdbc.version>2.5.0</scalikejdbc.version>
<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

<dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

<!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
<dependency>
    <groupId>org.scalikejdbc</groupId>
    <artifactId>scalikejdbc_2.11</artifactId>
    <version>2.5.0</version>
</dependency>

<dependency>
    <groupId>org.scalikejdbc</groupId>
    <artifactId>scalikejdbc-config_2.11</artifactId>
    <version>2.5.0</version>
</dependency>

<dependency>
      <groupId>com.typesafe</groupId>
      <artifactId>config</artifactId>
      <version>1.3.0</version>
    </dependency>

<dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

3. 处理思路

1)StreamingContext
2)从kafka中获取数据(从外部存储获取offset-->根据offset获取kafka中的数据)
3)根据业务进行逻辑处理
4)将处理结果存到外部存储中--保存offset
5)启动程序,等待程序结束

4. 代码实现

4.1 SparkStreaming主体代码如下:

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc._
import scalikejdbc.config._

object JDBCOffsetApp {
  def main(args: Array[String]): Unit = {
    //创建SparkStreaming入口
    val conf = new SparkConf().setMaster("local[2]").setAppName("JDBCOffsetApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    //kafka消费主题
    val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet
    //kafka参数
    //这里应用了自定义的ValueUtils工具类,来获取application.conf里的参数,方便后期修改
    val kafkaParams = Map[String,String](
      "metadata.broker.list"->ValueUtils.getStringValue("metadata.broker.list"),
      "auto.offset.reset"->ValueUtils.getStringValue("auto.offset.reset"),
      "group.id"->ValueUtils.getStringValue("group.id")
    )
    //先使用scalikejdbc从MySQL数据库中读取offset信息
    //+------------+------------------+------------+------------+-------------+
    //| topic      | groupid          | partitions | fromoffset | untiloffset |
    //+------------+------------------+------------+------------+-------------+
    //MySQL表结构如上,将“topic”,“partitions”,“untiloffset”列读取出来
    //组成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到
    DBs.setup()
    val fromOffset = DB.readOnly( implicit session => {
      SQL("select * from hlw_offset").map(rs => {
        (TopicAndPartition(rs.string("topic"),rs.int("partitions")),rs.long("untiloffset"))
      }).list().apply()
    }).toMap
    //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费
      val messages = if (fromOffset.isEmpty) {
        println("从头开始消费...")
        KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
      } else {
        println("从已存在记录开始消费...")
        val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(),mm.message())
        KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffset,messageHandler)
      }

      messages.foreachRDD(rdd=>{
        if(!rdd.isEmpty()){
          //输出rdd的数据量
          println("数据统计记录为:"+rdd.count())
          //官方案例给出的获得rdd offset信息的方法,offsetRanges是由一系列offsetRange组成的数组
//          trait HasOffsetRanges {
//            def offsetRanges: Array[OffsetRange]
//          }
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          offsetRanges.foreach(x => {
            //输出每次消费的主题,分区,开始偏移量和结束偏移量
            println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")
           //将最新的偏移量信息保存到MySQL表中
            DB.autoCommit( implicit session => {
              SQL("replace into hlw_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")
            .bind(x.topic,ValueUtils.getStringValue("group.id"),x.partition,x.fromOffset,x.untilOffset)
              .update().apply()
            })
          })
        }
      })

    ssc.start()
    ssc.awaitTermination()
  }
}

4.2 自定义的ValueUtils工具类如下:

import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils

object ValueUtils {
val load = ConfigFactory.load()
  def getStringValue(key:String, defaultValue:String="") = {
val value = load.getString(key)
    if(StringUtils.isNotEmpty(value)) {
      value
    } else {
      defaultValue
    }
  }
}

4.3 application.conf内容如下

metadata.broker.list = "192.168.137.251:9092"
auto.offset.reset = "smallest"
group.id = "hlw_offset_group"
kafka.topics = "hlw_offset"
serializer.class = "kafka.serializer.StringEncoder"
request.required.acks = "1"


# JDBC settings
db.default.driver = "com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/test"
db.default.user="root"
db.default.password="123456"

4.4 自定义kafka producer

import java.util.{Date, Properties}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

object KafkaProducer {
  def main(args: Array[String]): Unit = {

    val properties = new Properties()
    properties.put("serializer.class",ValueUtils.getStringValue("serializer.class"))
    properties.put("metadata.broker.list",ValueUtils.getStringValue("metadata.broker.list"))
    properties.put("request.required.acks",ValueUtils.getStringValue("request.required.acks"))

    val producerConfig = new ProducerConfig(properties)
    val producer = new Producer[String,String](producerConfig)

    val topic = ValueUtils.getStringValue("kafka.topics")
    //每次产生100条数据    
    var i = 0
    for (i <- 1 to 100) {
      val runtimes = new Date().toString
     val messages = new KeyedMessage[String, String](topic,i+"","hlw: "+runtimes)
      producer.send(messages)
    }
    println("数据发送完毕...")
  }
}

4.启动kafka服务,并创建主题

[hadoop@hadoop000 bin]$ ./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.10.0.1/config/server.properties
[hadoop@hadoop000 bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181/kafka
[hadoop@hadoop000 bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic hlw_offset

5.过程及输出结果展示

5.1查看MySQL中offset表,刚开始是个空表
mysql> select * from hlw_offset;
Empty set (0.00 sec)
5.2通过kafka producer产生500条数据
5.3启动SparkStreaming程序
//控制台输出结果:
从头开始消费...
数据统计记录为:500
---hlw_offset,0,0,500---

查看MySQL表,offset记录成功

mysql> select * from hlw_offset;
+------------+------------------+------------+------------+-------------+
| topic      | groupid          | partitions | fromoffset | untiloffset |
+------------+------------------+------------+------------+-------------+
| hlw_offset | hlw_offset_group |          0 |          0 |         500 |
+------------+------------------+------------+------------+-------------+
5.4关闭SparkStreaming程序,再使用kafka producer生产300条数据,再次启动spark程序(如果spark从500开始消费,说明成功读取了offset,做到了只读取一次语义)
//控制台结果输出:
从已存在记录开始消费...
数据统计记录为:300
---hlw_offset,0,500,800---

查看MySQL

mysql> select * from hlw_offset;
+------------+------------------+------------+------------+-------------+
| topic      | groupid          | partitions | fromoffset | untiloffset |
+------------+------------------+------------+------------+-------------+
| hlw_offset | hlw_offset_group |          0 |        500 |         800 |
+------------+------------------+------------+------------+-------------+
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,635评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,543评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,083评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,640评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,640评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,262评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,833评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,736评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,280评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,369评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,503评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,185评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,870评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,340评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,460评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,909评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,512评论 2 359

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,727评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,701评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,326评论 1 15
  • 上个月宝宝把奶瓶嘴咬坏,更换新奶瓶却不适应,从此拒绝喝奶粉,想想已经19个月可以断奶粉就,不喝也罢。而刚刚在正规妇...
    魔宁阅读 682评论 10 2
  • 第一次用着个软件写东西,这感觉就像是从eclipse转到AS一样的不习惯,就当是本人学习的日常吧,不用说,排版我自...
    破音男阅读 212评论 0 1