sparkStreaming使用zookeeper管理KafkaOffset方案

天地四方皆江湖,世人聪明反糊涂。
名利场上风浪起,赢到头来却是输。
----侠客行

先上maven依赖



 <properties>
        <fastjson.version>1.2.68</fastjson.version>
        <scala.version>2.11.12</scala.version>
        <spark.version>2.4.0</spark.version>
        <hadoop.version>3.0.0</hadoop.version>
        <zookeeper.version>3.4.5</zookeeper.version>
        <kafka.version>2.1.0</kafka.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.6</version>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.scala-lang</groupId>-->
<!--            <artifactId>scala-library</artifactId>-->
<!--            <version>${scala.version}</version>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>

        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

先来个sparkStreaming的启动程序

import java.util.Properties
import com.tiens.bigdata.order.insert.tradeParentOrders.analyzed.TradeParentOrderesAna
import com.tiens.bigdata.utils.{KafkaOffset, KafkaSink, PropertiesUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
import org.apache.spark.streaming.kafka010.{ KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}  

val bootstrapServer = PropertiesUtils.getProperties("kafka.bootstrap.server")
    //初始化kafka参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServer,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test_insert_trade_parent_orders",
//      "auto.offset.reset" -> "latest"
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    //初始化sparkcontext
      val sparkConf = new SparkConf()
        .set("spark.streaming.stopGracefullyOnShutdown","true")
        .set("spark.streaming.backpressure.enabled","true")
        .set("spark.streaming.backpressure.initialRate","100")
        .set("spark.streaming.kafka.maxRatePerPartition","100")
        .setMaster(PropertiesUtils.getProperties("model"))
        .setAppName("isnertTradeParentOrders")
      val sparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sparkContext.setLogLevel("WARN")

    //定义要消费的topic
    val topics: Array[String] = Array("olap_ods_ordercenter_trade_parent_orders_v3")
    val zkPath = PropertiesUtils.getProperties("kafkaOffsetUrl")+topics(0)
    val zkHosts = PropertiesUtils.getProperties("zkCi")
    //创建zk的客户端实例
    val zkClient = new ZkClient(zkHosts, 30000, 30000)


    //创建这个方法,所有业务逻辑在此方法中进行
    val sss = setupEsc(sparkContext, kafkaParams, topics, bootstrapServer,zkClient,zkPath,zkHosts)
    sss.start()
    sss.awaitTermination()

  }

  def setupEsc(sparkContext: SparkContext, kafkaParams: Map[String, Object],topics: Array[String],
               bootstrapServer:String,zkClient: ZkClient,zkPath: String,zkHoust:String ) ={
    //初始化streaming对象
    val streamingContext = new StreamingContext(sparkContext,Seconds(1))
    //初始化Kafka生产者对象
    //#################kafka  生产者创建
    //     广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", bootstrapServer)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      streamingContext.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    //创建数据流
    val stream: InputDStream[ConsumerRecord[String, String]] = getOrCreateDirectStream(streamingContext, kafkaParams, topics, zkClient, zkPath, zkHoust)

    stream.foreachRDD(rdd=>{
        //这里可以处理你的业务逻辑了
      })

    //把偏移量保存到自己的数据库中
    stream.foreachRDD { rdd =>
     KafkaOffset.saveOffsets(zkClient,zkHoust,zkPath,rdd)
    }


    streamingContext
  }

  def getOrCreateDirectStream(streamingContext:StreamingContext, kafkaParams: Map[String, Object],topics: Array[String],
                              zkClient: ZkClient,zkPath: String,zkHoust:String):InputDStream[ConsumerRecord[String, String]] = {

    //读取保存在zk中的偏移量
    val offsets: Option[Map[TopicPartition, Long]] = KafkaOffset.readOffsets(zkClient, zkHoust, zkPath, topics(0))
    //拿到zk中的偏移量需要考虑两种情况
    //1 如果zk中没有拿到数据,则从头消费
    //2如果zk中有数据,则通过偏移量进行消费
    // begin from the the offsets committed to the database
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = offsets match {
      case None => KafkaUtils.createDirectStream[String, String](streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
      case Some(fromOffset) =>
        KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset))
    }
    kafkaDStream
  }

再来个zookeeper的工具类

import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.zookeeper.data.Stat
object KafkaOffset {
  val logger = Logger.getLogger("KafkaOffset")

  /**
   * 从zookeeper指定的path路径中去读数据
   * @param client zookeeperClient
   * @param path 路径
   * @return option 对象
   */
  def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = {
    val stat: Stat = new Stat()
    val dataAndStat = try {
      (Some(client.readData(path, stat)), stat)
    } catch {
      case e: ZkNoNodeException =>
        (None, stat)
      case e2: Throwable => throw e2
    }
    dataAndStat
  }

  /**
   * 将数据写入zookeeper中指定的路径中,如果该路径存在父目录且父目录不存在则先创建父目录
   * @param client zookeeperClient
   * @param path 路径
   * @param data 数据
   */
  def updatePersistentPath(client: ZkClient, path: String, data: String) = {
    try {
      client.writeData(path, data)
    } catch {
      case e: ZkNoNodeException => {
        createParentPath(client, path)
        try {
          client.createPersistent(path, data)
        } catch {
          case e: ZkNodeExistsException =>
            client.writeData(path, data)
          case e2: Throwable => throw e2
        }
      }
      case e2: Throwable => throw e2
    }
  }

  /**
   * 创建父目录
   * @param client zookeeperClient
   * @param path 路径
   */
  private def createParentPath(client: ZkClient, path: String): Unit = {
    val parentDir = path.substring(0, path.lastIndexOf('/'))
    if (parentDir.length != 0)
      client.createPersistent(parentDir, true)
  }

  /*
   Read the previously saved offsets from Zookeeper
    */
  /**
   * 取kafka中的偏移量
   * @param zkClient zookeeperClient
   * @param zkHosts zookeeperhosts节点
   * @param zkPath  zookeeper路径
   * @param topic  kafka中的topic
   * @return  Option对象
   */
  def readOffsets(zkClient: ZkClient,
                  zkHosts: String,
                  zkPath: String,
                  topic: String): Option[Map[TopicPartition, Long]] = {
    logger.info("Reading offsets from Zookeeper")
    val stopwatch = new Stopwatch()
    val (offsetsRangesStrOpt, _) = readDataMaybeNull(zkClient, zkPath)
    offsetsRangesStrOpt match {
      case Some(offsetsRangesStr) =>
        logger.info(s"Read offset ranges: ${offsetsRangesStr}")
        val offsets = offsetsRangesStr.split(",")
          .map(s => s.split(":"))
          .map { case Array(partitionStr, offsetStr) => (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
          .toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offsets)
      case None =>
        logger.info("No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  /**
   * 将offest偏移量保存到zookeeper中,并打上日志
   * @param zkClient zookeeperClient
   * @param zkHosts  zookeeperhosts节点
   * @param zkPath  zookeeper路径
   * @param rdd 偏移量的rdd
   */
  def saveOffsets(zkClient: ZkClient,
                  zkHosts: String,
                  zkPath: String,
                  rdd: RDD[_]): Unit = {
    logger.info("Saving offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetRanges.foreach(offsetRange=>{
      val offsetsRangesStr = offsetRanges.map(offsetRange=>offsetRange.partition+":"+offsetRange.fromOffset)
        .mkString(",")
      logger.info("Writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
      updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
      logger.info("Done updating offsets in Zookeeper. Took " + stopwatch)
    })
  }

  /**
   * 过程时间
   */
  class Stopwatch {
    private val start = System.currentTimeMillis()
    override def toString() = (System.currentTimeMillis() - start) + " ms"
  }

没有别的意思,就是看看这个方案帮助了多少朋友

。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容