sparkStreaming使用zookeeper管理KafkaOffset方案

天地四方皆江湖,世人聪明反糊涂。
名利场上风浪起,赢到头来却是输。
----侠客行

先上maven依赖



 <properties>
        <fastjson.version>1.2.68</fastjson.version>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.4.0</spark.version>
        <hadoop.version>3.0.0</hadoop.version>
        <zookeeper.version>3.4.5</zookeeper.version>
        <kafka.version>2.1.0</kafka.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.6</version>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.scala-lang</groupId>-->
<!--            <artifactId>scala-library</artifactId>-->
<!--            <version>${scala.version}</version>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>

        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

先来个sparkStreaming的启动程序

import java.util.Properties
import com.tiens.bigdata.order.insert.tradeParentOrders.analyzed.TradeParentOrderesAna
import com.tiens.bigdata.utils.{KafkaOffset, KafkaSink, PropertiesUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
import org.apache.spark.streaming.kafka010.{ KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}  

val bootstrapServer = PropertiesUtils.getProperties("kafka.bootstrap.server")
    //初始化kafka参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServer,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test_insert_trade_parent_orders",
//      "auto.offset.reset" -> "latest"
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    //初始化sparkcontext
      val sparkConf = new SparkConf()
        .set("spark.streaming.stopGracefullyOnShutdown","true")
        .set("spark.streaming.backpressure.enabled","true")
        .set("spark.streaming.backpressure.initialRate","100")
        .set("spark.streaming.kafka.maxRatePerPartition","100")
        .setMaster(PropertiesUtils.getProperties("model"))
        .setAppName("isnertTradeParentOrders")
      val sparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sparkContext.setLogLevel("WARN")

    //定义要消费的topic
    val topics: Array[String] = Array("olap_ods_ordercenter_trade_parent_orders_v3")
    val zkPath = PropertiesUtils.getProperties("kafkaOffsetUrl")+topics(0)
    val zkHosts = PropertiesUtils.getProperties("zkCi")
    //创建zk的客户端实例
    val zkClient = new ZkClient(zkHosts, 30000, 30000)


    //创建这个方法,所有业务逻辑在此方法中进行
    val sss = setupEsc(sparkContext, kafkaParams, topics, bootstrapServer,zkClient,zkPath,zkHosts)
    sss.start()
    sss.awaitTermination()

  }

  def setupEsc(sparkContext: SparkContext, kafkaParams: Map[String, Object],topics: Array[String],
               bootstrapServer:String,zkClient: ZkClient,zkPath: String,zkHoust:String ) ={
    //初始化streaming对象
    val streamingContext = new StreamingContext(sparkContext,Seconds(1))
    //初始化Kafka生产者对象
    //#################kafka  生产者创建
    //     广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", bootstrapServer)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      streamingContext.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    //创建数据流
    val stream: InputDStream[ConsumerRecord[String, String]] = getOrCreateDirectStream(streamingContext, kafkaParams, topics, zkClient, zkPath, zkHoust)

    stream.foreachRDD(rdd=>{
        //这里可以处理你的业务逻辑了
      })

    //把偏移量保存到自己的数据库中
    stream.foreachRDD { rdd =>
     KafkaOffset.saveOffsets(zkClient,zkHoust,zkPath,rdd)
    }


    streamingContext
  }

  def getOrCreateDirectStream(streamingContext:StreamingContext, kafkaParams: Map[String, Object],topics: Array[String],
                              zkClient: ZkClient,zkPath: String,zkHoust:String):InputDStream[ConsumerRecord[String, String]] = {

    //读取保存在zk中的偏移量
    val offsets: Option[Map[TopicPartition, Long]] = KafkaOffset.readOffsets(zkClient, zkHoust, zkPath, topics(0))
    //拿到zk中的偏移量需要考虑两种情况
    //1 如果zk中没有拿到数据,则从头消费
    //2如果zk中有数据,则通过偏移量进行消费
    // begin from the the offsets committed to the database
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = offsets match {
      case None => KafkaUtils.createDirectStream[String, String](streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
      case Some(fromOffset) =>
        KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset))
    }
    kafkaDStream
  }

再来个zookeeper的工具类

import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.zookeeper.data.Stat
object KafkaOffset {
  val logger = Logger.getLogger("KafkaOffset")

  /**
   * 从zookeeper指定的path路径中去读数据
   * @param client zookeeperClient
   * @param path 路径
   * @return option 对象
   */
  def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = {
    val stat: Stat = new Stat()
    val dataAndStat = try {
      (Some(client.readData(path, stat)), stat)
    } catch {
      case e: ZkNoNodeException =>
        (None, stat)
      case e2: Throwable => throw e2
    }
    dataAndStat
  }

  /**
   * 将数据写入zookeeper中指定的路径中,如果该路径存在父目录且父目录不存在则先创建父目录
   * @param client zookeeperClient
   * @param path 路径
   * @param data 数据
   */
  def updatePersistentPath(client: ZkClient, path: String, data: String) = {
    try {
      client.writeData(path, data)
    } catch {
      case e: ZkNoNodeException => {
        createParentPath(client, path)
        try {
          client.createPersistent(path, data)
        } catch {
          case e: ZkNodeExistsException =>
            client.writeData(path, data)
          case e2: Throwable => throw e2
        }
      }
      case e2: Throwable => throw e2
    }
  }

  /**
   * 创建父目录
   * @param client zookeeperClient
   * @param path 路径
   */
  private def createParentPath(client: ZkClient, path: String): Unit = {
    val parentDir = path.substring(0, path.lastIndexOf('/'))
    if (parentDir.length != 0)
      client.createPersistent(parentDir, true)
  }

  /*
   Read the previously saved offsets from Zookeeper
    */
  /**
   * 取kafka中的偏移量
   * @param zkClient zookeeperClient
   * @param zkHosts zookeeperhosts节点
   * @param zkPath  zookeeper路径
   * @param topic  kafka中的topic
   * @return  Option对象
   */
  def readOffsets(zkClient: ZkClient,
                  zkHosts: String,
                  zkPath: String,
                  topic: String): Option[Map[TopicPartition, Long]] = {
    logger.info("Reading offsets from Zookeeper")
    val stopwatch = new Stopwatch()
    val (offsetsRangesStrOpt, _) = readDataMaybeNull(zkClient, zkPath)
    offsetsRangesStrOpt match {
      case Some(offsetsRangesStr) =>
        logger.info(s"Read offset ranges: ${offsetsRangesStr}")
        val offsets = offsetsRangesStr.split(",")
          .map(s => s.split(":"))
          .map { case Array(partitionStr, offsetStr) => (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
          .toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offsets)
      case None =>
        logger.info("No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  /**
   * 将offest偏移量保存到zookeeper中,并打上日志
   * @param zkClient zookeeperClient
   * @param zkHosts  zookeeperhosts节点
   * @param zkPath  zookeeper路径
   * @param rdd 偏移量的rdd
   */
  def saveOffsets(zkClient: ZkClient,
                  zkHosts: String,
                  zkPath: String,
                  rdd: RDD[_]): Unit = {
    logger.info("Saving offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetRanges.foreach(offsetRange=>{
      val offsetsRangesStr = offsetRanges.map(offsetRange=>offsetRange.partition+":"+offsetRange.fromOffset)
        .mkString(",")
      logger.info("Writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
      updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
      logger.info("Done updating offsets in Zookeeper. Took " + stopwatch)
    })
  }

  /**
   * 过程时间
   */
  class Stopwatch {
    private val start = System.currentTimeMillis()
    override def toString() = (System.currentTimeMillis() - start) + " ms"
  }

没有别的意思,就是看看这个方案帮助了多少朋友

。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容