【offset管理策略】Spark Streaming消费Kafka

摘要:offset管理Spark StreamingKafka

Spark Streaming offset的管理方式

offset的三种管理方式

  • 自动提交offset(彻底放弃使用这种方式吧):enable.auto.commit=true。一但consumer挂掉,就会导致数据丢失重复消费。offset不可控。
  • Kafka自身的offset管理:在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。Spark Streaming也专门提供了commitAsync() API用于提交offset。需要将参数修改为enable.auto.commit=false。这种offset的管理方式,不会丢失数据,但会出现重复消费。停掉streaming应用程序再次启动后,会再次消费停掉前最后的一个批次数据,应该是由于offset是异步提交的方式导致,offset更新不及时引起的。
  • 自定义offset:可以将offset存放在第方三储中,包括RDBMS、Redis、ZK、ES甚至Kafka中。若消费数据存储在带事务的组件上,则强烈推荐将offset存储在一起,借助事务实现 Exactly-once 语义。

自动提交offset

代码实现,消费JSON数据,消费目标是在控制台打印出JSON数据。

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object OffsetAutoCommit {
  Logger.getRootLogger.setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val groupId = args(0)
    // spark配置
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("OffsetAutoCommit")
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // spark streaming上下文
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // kafka参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092,cloudera02:9092,cloudera03:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (true: java.lang.Boolean),
      ConsumerConfig.GROUP_ID_CONFIG -> groupId
    )

    // 创建DStream
    val kafkaDirectStream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe[String, String](List("test_gp3"), kafkaParams) // 直接订阅集合
    )

    val saveRdd = kafkaDirectStream.map(x => {
      val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
      jsonObject.put("partition", x.partition())
      jsonObject.put("offset", x.offset())
      jsonObject
    })

    saveRdd.foreachRDD(rdd => {
      rdd.foreach(x => println(x.toJSONString))
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

修改发送给kafka的数据格式,使得spark程序报错退出,重启spark offset为10的数据丢失,offset为10的数据已经在spark接受到数据时提交,但是消费失败,重启后不会重新消费

{"partition":0,"offset":9,"id":"1"}
{"partition":1,"offset":9,"id":"0"}
20/10/10 16:53:40 ERROR executor.Executor: Exception in task 0.0 in stage 27.0 (TID 54)
com.alibaba.fastjson.JSONException: not close json text, token : :
    at com.alibaba.fastjson.parser.DefaultJSONParser.close(DefaultJSONParser.java:1427)
    at com.alibaba.fastjson.JSON.parse(JSON.java:156)
    at com.alibaba.fastjson.JSON.parse(JSON.java:143)
    at OffsetAutoCommit$$anonfun$1.apply(OffsetAutoCommit.scala:42)
    at OffsetAutoCommit$$anonfun$1.apply(OffsetAutoCommit.scala:41)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
......
[root@cloudera01 MyStreamTest]# bash run_auto_commit.sh 
20/10/10 16:54:10 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/10 16:54:10 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/10 16:54:10 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-OffsetAutoCommit_group_1
20/10/10 16:54:10 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
{"partition":1,"offset":11,"id":"0"}
{"partition":0,"offset":11,"id":"1"}

总结一下auto_commit模式offset管理策略:

  • Spark DStream接受到Kafka数据就算消费完成直接提交offset
  • 如果程序在拉取kafka消息后报错处理失败,再次重启Spark程序不会重复再次消费,数据丢失
  • 生产者向kafka发送消息的时候,在kafka broker端就会记录每个分区的offset,这个和消费者,消费者没有关系,在生产者和kafka交互的时候就已经产生并且作为依据,也就是说如果更换消费者组,消费的offset是继续累加的而不是从0开始。
    自动提交offset的管理策略.png

    数据4丢失没有被消费成功,因为坏数据4提交了offset,所有重启程序不会一直卡在数据4,可以继续消费,更换消费者组不影响offset累加,不会从头开始。

手动提交,使用自带的API

通过一手的DStream转化为KafkaRDD获得批次的offsets,使用commitAsync API提交offsets,这种使用方式必须将提交offset和数据转化写在同一个foreachRDD代码块中,否则会报错。

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object OffsetAPICommit {
  Logger.getRootLogger.setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val groupId = args(0)
    // spark配置
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("OffsetAPICommit")
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // spark streaming上下文
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // kafka参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092,cloudera02:9092,cloudera03:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.GROUP_ID_CONFIG -> groupId
    )

    // 创建DStream
    val kafkaDirectStream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe[String, String](List("test_gp3"), kafkaParams) // 直接订阅集合
    )

    kafkaDirectStream.foreachRDD(rdd => {
      rdd.foreach(x => {
        val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
        jsonObject.put("partition", x.partition())
        jsonObject.put("offset", x.offset())
        println(jsonObject)
      })
      //获取当前批次的offset信息
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //异步提交offset
      kafkaDirectStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })

    // 分开来写报错 spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
    // 只有kafkaDirectStream可以转化为kafkaRDD
//    val saveRdd = kafkaDirectStream.map(x => {
//      val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
//      jsonObject.put("partition", x.partition())
//      jsonObject.put("offset", x.offset())
//      jsonObject
//    })
//
//    saveRdd.foreachRDD(rdd => {
//      //获取当前批次的offset信息
//      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      rdd.foreach(x => println(x.toJSONString))
//      //异步提交offset
//      saveRdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
//    })

    ssc.start()
    ssc.awaitTermination()
  }
}

运行消费成功一条消息,此时修改发送的数据格式使程序报错,再次重启程序消费还是报错,消费一直卡在者条消息,因为offset一直没有提交。

[root@cloudera01 MyStreamTest]# bash run_api_commit.sh 
20/10/12 10:42:30 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 10:42:30 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 10:42:30 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_api_commit_group_1
20/10/12 10:42:30 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
{"partition":1,"offset":22,"id":"0"}
20/10/12 10:43:16 ERROR executor.Executor: Exception in task 1.0 in stage 22.0 (TID 45)
com.alibaba.fastjson.JSONException: syntax error, position at 0, name id
    at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:570)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1318)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1284)
    at com.alibaba.fastjson.JSON.parse(JSON.java:152)
    at com.alibaba.fastjson.JSON.parse(JSON.java:143)
    at OffsetAPICommit$$anonfun$main$1$$anonfun$apply$1.apply(OffsetAPICommit.scala:44)
    at OffsetAPICommit$$anonfun$main$1$$anonfun$apply$1.apply(OffsetAPICommit.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.streaming.kafka010.KafkaRDDIterator.foreach(KafkaRDD.scala:231)
...
[root@cloudera01 MyStreamTest]# bash run_api_commit.sh 
20/10/12 10:43:23 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 10:43:23 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 10:43:23 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_api_commit_group_1
20/10/12 10:43:23 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
20/10/12 10:43:29 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.alibaba.fastjson.JSONException: syntax error, position at 0, name id
    at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:570)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1318)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1284)
    at com.alibaba.fastjson.JSON.parse(JSON.java:152)
    at com.alibaba.fastjson.JSON.parse(JSON.java:143)
    at OffsetAPICommit$$anonfun$main$1$$anonfun$apply$1.apply(OffsetAPICommit.scala:44)
    at OffsetAPICommit$$anonfun$main$1$$anonfun$apply$1.apply(OffsetAPICommit.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.streaming.kafka010.KafkaRDDIterator.foreach(KafkaRDD.scala:231)
...

修改启动命令中的消费者组,消费通过,更换消费者组后如果策略是latest程序启动后的新数据可以继续消费,但是之前报错的消息直接丢失。

[root@cloudera01 MyStreamTest]# vim run_api_commit.sh 
[root@cloudera01 MyStreamTest]# bash run_api_commit.sh 
20/10/12 10:43:46 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 10:43:46 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 10:43:46 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_api_commit_group_2
20/10/12 10:43:46 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
{"partition":1,"offset":24,"id":"0"}
手动使用API提交offset的策略.png

由于手动提交将提交的时间点放在程序真正消费成功而不是拉取到消息,所以一直提交不成功,及时重启也会一直消费这条数据,导致程序一直卡在这条数据报错,此时如果修改消费者组,则从新数据开始,之前的报错的数据不管了,出现数据丢失。


第三方存储offset读写管理策略

从zookeeper读取offset,消费完成再写入zookeeper,这样可以在zk中直接查看和修改offset从而来改变消费位置。创建一个Zk class实现读取,写入,创建Zk节点值,读取broker端offset信息。

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object OffsetUseZK {
  Logger.getRootLogger.setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val groupId = args(0)

    // spark配置项
    val sparkConf = new SparkConf()
      .setAppName("OffsetUseZK")
      .setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // streaming上下文
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // 其他配置广播
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "cloudera01:9092,cloudera02:9092,cloudera03:9092")
    properties.setProperty("kafka.topic", "test_gp3")
    properties.setProperty("group.id", groupId)
    properties.setProperty("ZK_HOST", "cloudera01,cloudera02,cloudera03")
    val configProperties = ssc.sparkContext.broadcast(properties)

    // kafka参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "cloudera01:9092,cloudera02:9092,cloudera03:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (true: java.lang.Boolean),
      ConsumerConfig.GROUP_ID_CONFIG -> groupId
    )

    // 获取kafka DStream
    var stry = Subscribe[String, String](List("test_gp3"), kafkaParams)  // 默认直接订阅集合
    val partitionOffsets = ZKUtils.getIns(configProperties.value).getPartitionOffset("/test/monitor/test_gp3/" + groupId)
    //从上次记录的offset开始消费
    if (null != partitionOffsets) {
      println("从上次记录的offset开始消费:", partitionOffsets)
      import scala.collection.JavaConverters._
      stry = ConsumerStrategies.Assign[String, String](partitionOffsets.keySet(), kafkaParams.asJava, partitionOffsets)
    }

    val kafkaDirectStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, // 在executor上均匀分布分区
      stry) // 订阅主题集合,kafka参数

    val saveRdd = kafkaDirectStream.map(x => {
      val jsonObject = JSON.parse(x.value()).asInstanceOf[JSONObject]
      jsonObject.put("partition", x.partition())
      jsonObject.put("offset", x.offset())
      jsonObject
    })

    saveRdd.foreachRDD(rdd => {
      rdd.foreach(x => {
        println(x.toJSONString)  // 消费程序
        // offset提交
        ZKUtils.getIns(configProperties.value).setData(
          "/test/monitor/" + configProperties.value.getProperty("kafka.topic") + "/" +
            configProperties.value.getProperty("group.id") + "/" +
            x.getString("partition"),
          x.getString("offset"))}
      )
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

ZKUtils.scala

import org.apache.zookeeper._
import org.apache.zookeeper.common._
import java.util.Properties

import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
import kafka.api.{OffsetRequest, PartitionMetadata, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer

import scala.collection.mutable

class ZKUtils() extends Watcher {
  private var zk: ZooKeeper = _

  private def this(pro: Properties) = {
    this()
    this.zk = new ZooKeeper(pro.getProperty("ZK_HOST"), 20000, this)
  }

  @Override
  def process(event: WatchedEvent) {
    //no watching
  }

  private def createPath(path: String, zk: ZooKeeper): Unit = {
    //创建路径
    PathUtils.validatePath(path)
    var parentPath = path.substring(0, path.indexOf("/", 0) + 1)
    while (parentPath != null && !parentPath.isEmpty) {
      if (zk.exists(parentPath, false) == null) {
        // 同步创建,acl权限完全开放,节点类型持久化节点
        zk.create(parentPath, "".getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      }

      // 如果根路径和全路径一致停止创建
      if (parentPath.equals(path)) {
        parentPath = null
      } else {
        // 寻找下一个/作为新根路径
        var idx = path.indexOf("/", parentPath.length() + 1)
        if (-1 == idx) {
          idx = path.length
        }
        parentPath = path.substring(0, idx)
      }
    }
  }

  def setData(path: String, data: String): Stat = {
    if (zk.exists(path, false) == null) {
      createPath(path, zk)
    }
    zk.setData(path, data.getBytes, -1)
  }

  def getData(path: String): String = {
    val stat = zk.exists(path, false)
    if (stat == null) {
      null
    } else {
      new String(zk.getData(path, false, stat))
    }
  }

  /**
   * 使用SimpleConsumer找到leader
   * @param seedBrokers
   * @param topic
   * @return
   */
  private def findLeader(seedBrokers: List[(String, Int)], topic: String): mutable.Map[Int, PartitionMetadata] = {
    val map: mutable.Map[Int, PartitionMetadata] = mutable.Map[Int, PartitionMetadata]()
    for (seed <- seedBrokers) {
      var consumer: SimpleConsumer = null;
      try {
        consumer = new SimpleConsumer(seed._1, seed._2, 100000, 64 * 1024,
          "leaderLookup" + new java.util.Date().getTime());
        val topics: Array[String] = Array[String](topic);
        val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0);
        val resp: TopicMetadataResponse = consumer.send(req);
        val metaData: Seq[TopicMetadata] = resp.topicsMetadata;
        for (item <- metaData) {
          for (part <- item.partitionsMetadata) {
            map += (part.partitionId -> part)
          }
        }
      } catch {
        case ex: Exception =>
          System.out.println(Thread.currentThread().getName + "[" + "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
            + ", ] Reason: " + ex);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    println("map", map)  // Map("分区" -> 分区元数据)
    map
  }

  def getTopicOffsets(topic: String, bootstrap: String, where: String): java.util.Map[TopicPartition, java.lang.Long] = {
    val clientId = ""
    var brokers = List[(String, Int)]()
    var hostAry: Array[String] = bootstrap.split(",");
    if (hostAry == null) {
      hostAry = new Array[String](1)
      hostAry(0) = bootstrap;
    }

    for (host <- hostAry) {
      val hostinfo: Array[String] = host.split(":")
      if (hostinfo == null) {
        if (host != null && !host.isEmpty()) {
          brokers = brokers.+:((host, 9092))
        }
      } else {
        if (hostinfo(0).length > 0 && hostinfo(1).length > 0) {
          brokers = brokers.+:((hostinfo(0), Integer.parseInt(hostinfo(1))))
        }
      }
    }
    println("brokers: ", brokers)  // List((cloudera03,9092), (cloudera02,9092), (cloudera01,9092))

    val metas = findLeader(brokers, topic)
    val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
    //遍历每个主分区
    metas.keys.foreach(f => {
      val meta = metas(f)
      println("查看meta.leader: ", meta.leader)  // Some(BrokerEndPoint(79,cloudera03,9092)))
      meta.leader match {
        case Some(leader) => {
          var consumer: SimpleConsumer = null
          try {
            consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
            val topicAndPartition = TopicAndPartition(topic, f)
            var request: OffsetRequest = null
            if (where.equals("earliest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
            } else if (where.equals("latest")) {
              request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
            }
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
            println("offsets: ", offsets.head)
            ret.put(new TopicPartition(topic, f), new java.lang.Long(offsets.head))
          } catch {
            case ex: Exception => {
              ex.printStackTrace()
            }
          } finally {
            consumer.close
          }
        }
        case None => {
          System.err.println(Thread.currentThread().getName + "[" + "Error: partition %d does not exist".format(f))
        }
      }
    })
    println("findLeader ret: ", ret)
    ret
  }

  /**
   * 获得每个分区的当下需要消费的offset
   * @param path
   * @return
   */
  def getPartitionOffset(path: String): java.util.Map[TopicPartition, java.lang.Long] = {
    if (zk.exists(path, false) == null) {
      null
    } else {
      val children: java.util.List[java.lang.String] = zk.getChildren(path, false)
      println("获取路径的子节点信息: ", children)

      // 遍历zk上每个分区的offset值
      if (children != null && children.size() > 0) {
        val topic = path.split("/")(3)
        println("获取主题: ", topic)

        val earliestOffsets = getTopicOffsets(
          topic,
          "cloudera01:9092,cloudera02:9092,cloudera03:9092",
          "earliest")
        println("获取最早的offset: ", earliestOffsets)

        val latestOffsets = getTopicOffsets(
          topic,
          "cloudera01:9092,cloudera02:9092,cloudera03:9092",
          "latest")
        println("获取最新的offset: ", latestOffsets)

        // 初始化一个Map存储所有分区的offset
        val ret = new java.util.HashMap[TopicPartition, java.lang.Long]()
        // 遍历节点值,获取分区信息
        for (i <- 0 until children.size) {
          val keyObj = new TopicPartition(topic, children.get(i).toInt)
          // 再往下一层节点,获取节点值+1为新的要拉取的offset
          var offset = new java.lang.Long(getData(path + "/" + children.get(i)).toLong + 1)

          // 如果zk节点的offset比最早offset还小,修正到最早offset
          if (offset.longValue() < earliestOffsets.get(keyObj).longValue()) {
            offset = earliestOffsets.get(keyObj)
          } else if (offset.longValue() > latestOffsets.get(keyObj).longValue()) {
            // 如果zk节点的offset比最新offset大,修正到最新offset
            offset = latestOffsets.get(keyObj)
          }
          ret.put(keyObj, offset)
          println("ret: ", ret)
          //把存在的都删掉
          earliestOffsets.remove(keyObj)
        }

        //把不存在的 再添加进去
        ret.putAll(earliestOffsets)
        ret
      } else {
        null
      }
    }
  }
}


object ZKUtils {
  private var zkUtils: ZKUtils = _

  def getIns(prop: Properties): ZKUtils = {
    this.synchronized {
      if (zkUtils == null) {
        zkUtils = new ZKUtils(prop)
      }
      zkUtils
    }
  }

运行程序消费,第一次在Zk读取不到offset节点值直接订阅消息,终端程序重启,从上次记录的offset开始消费,使用Assign指定分区和offset进行消费。

[root@cloudera01 MyStreamTest]# bash run_use_zk.sh 
20/10/12 11:55:25 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 11:55:25 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 11:55:25 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_use_zk_1
20/10/12 11:55:25 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
{"partition":1,"offset":32,"id":"0"}
{"partition":1,"offset":33,"id":"0"}
{"partition":1,"offset":34,"id":"0"}
{"partition":1,"offset":35,"id":"0"}
{"partition":0,"offset":28,"id":"0"}
^C[root@cloudera01 MyStreamTest]# bash run_use_zk.sh 
(从上次记录的offset开始消费:,{test_gp3-0=29, test_gp3-1=36})
20/10/12 11:58:27 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 11:58:27 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 11:58:27 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_use_zk_1
20/10/12 11:58:27 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135

修改发送数据的格式让程序报错,重启之后一直卡在报错的数据的offset,此时手动修改Zk的节点值向后加1,则能顺利消费

[zk: localhost:2181(CONNECTED) 27] set /test/monitor/test_gp3/offset_use_zk_1/0 30
cZxid = 0x3d000e28ff
ctime = Mon Oct 12 11:57:50 CST 2020
mZxid = 0x3d000e344a
mtime = Mon Oct 12 14:08:10 CST 2020
pZxid = 0x3d000e28ff
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 28] set /test/monitor/test_gp3/offset_use_zk_1/1 36
cZxid = 0x3d000e28cf
ctime = Mon Oct 12 11:56:02 CST 2020
mZxid = 0x3d000e344b
mtime = Mon Oct 12 14:08:18 CST 2020
pZxid = 0x3d000e28cf
cversion = 0
dataVersion = 5
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0

修改回正确数据格式,读取调整后的Zk节点offset值继续消费

[root@cloudera01 MyStreamTest]# bash run_use_zk.sh 
(从上次记录的offset开始消费:,{test_gp3-0=31, test_gp3-1=36})
20/10/12 14:08:41 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor
20/10/12 14:08:41 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
20/10/12 14:08:41 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-offset_use_zk_1
20/10/12 14:08:41 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
{"partition":1,"offset":36,"id":"0"}

zookeeper管理offset的策略.png

commitAsync一样不消费成功不提交offset,可以手动修改zk的offset节点值控制消费位移。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352