[SPARK-19680] OffsetOutOfRangeException 解决方案

当kafka中的数据丢失时,Spark程序消费kafka中数据的时候就可能会出现以下异常:

Lost task 12.0 in stage 398.0 (TID 2311, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {abc_2018-0=151260}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

原因分析

Spark在创建Kafka RDD时会将kafkaParams 中的 auto.offset.reset 强制修改为none因此,当在zookeeper中获取到的offset超出kafka中topic有效offset范围时,就会报这个异常。这个异常通常出现在kafka中的数据丢失或过期所导致。
问题源码参考:

DirectKafkaInputDStream.scala:218
DirectKafkaInputDStream.scala:63
KafkaUtils.scala:205

解决方案

在创建KafkaRDD时,设置验证过的offset,代码如下:

/**
  * Kafka辅助处理工具
  */
object MyKafkaUtils {
  private val logger: Logger = LoggerFactory.getLogger(this.getClass)

  /**
    * 获取最小offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToBeginning(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取最小offset
    * Returns the earliest (lowest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客户端配置
    * @param topics      获取获取offset的topic
    */
  def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToBeginning(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 获取最大offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToEnd(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取最大offset
    * Returns the latest (highest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客户端配置
    * @param topics      需要获取offset的topic
    **/
  def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToEnd(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 获取消费者当前offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取offsets
    *
    * @param kafkaParams kafka参数
    * @param topics      topic
    * @return
    */
  def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
      consumer.poll(0)
    } catch {
      case ex: NoOffsetForPartitionException =>
        logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    val topicPartition = parts.diff(notOffsetTopicPartition)
    //获取当前offset
    val currentOffset = mutable.Map[TopicPartition, Long]()
    topicPartition.foreach(x => {
      try {
        currentOffset.put(x, consumer.position(x))
      } catch {
        case ex: NoOffsetForPartitionException =>
          logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
          notOffsetTopicPartition.add(ex.partition())
      }
    })
    //获取earliestOffset
    val earliestOffset = getEarliestOffsets(consumer, parts)
    earliestOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty) {
        currentOffset(x._1) = x._2
      } else if (value.get < x._2) {
        logger.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
        currentOffset(x._1) = x._2
      }
    })
    //获取lastOffset
    val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
      getLatestOffsets(consumer, topicPartition)
    } else {
      getLatestOffsets(consumer, parts)
    }
    latestOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty || value.get > x._2) {
        currentOffset(x._1) = x._2
      }
    })
    consumer.unsubscribe()
    consumer.close()
    currentOffset.toMap
  }
}

Spark Kafka RDD 创建:

val offset = MyKafkaUtils.getCurrentOffset(kafkaParams.toMap,topics)
    val kafkaStreams: DStream[ConsumerRecord[String, ObjectNode]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topics, kafkaParams,offset))

到些问题基本解决,但是如果是从checkpoint里面恢复时,依然会出现问题,这个就得使用commit了

//自动提交新的offset
    kafkaStreams.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      kafkaStreams.asInstanceOf[CanCommitOffsets].commitAsync(offset)
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容