sparkStreaming拉取kafka010的bug

  • 目前kafka版本中,很多公司在用kafka010,但是在用sparkstreaming去消费kafka时,很多人都会碰到一个错误
Caused by: java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-abcd1 test1 8 1 after polling for 512
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
  • 解决步骤
  1. 根据报错信息
kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
  • 可以看出是在这里跑出的错误,进入CachedKafkaConsumer.scala类,找到74行
if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
  • 继续进入poll(timeout)方法
#如果经过timeout的时间都没有拉取到数据,那么poll会返回一个空的消息集合,这里是kafka consumer的设计
private def poll(timeout: Long): Unit = {
    val p = consumer.poll(timeout)
    val r = p.records(topicPartition)
    logDebug(s"Polled ${p.partitions()}  ${r.size}")
    buffer = r.iterator
  }
  • 可以看出这里是用poll去拉取kafka数据放到buffer
#这是buffer的声明
protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
  • 在回到
#这里判断buffer是否有数据,如果没有数据就去poll,但是当poll也没有拉到数据
if (!buffer.hasNext()) { poll(timeout) }
#这里spark断言buffer里肯定有数据,如果没有就会抛出上面那个错误
assert(buffer.hasNext(),
      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
  • 其实解决方法很简单
    KafkaRDD.scala
#这里spark设置的默认poll的timeout是512毫秒,如果512毫秒后没拉到数据就会抛异常
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
  • 只需将这个参数在session中设置大于512就可以了,具体设置多少要根据实际情况看,例如:
sparkConf.set("spark.streaming.kafka.consumer.poll.ms", 10000)

在kafka consumer API中如果poll(timeout)没有拉到数据会返回空的消息集合
但是spark在这里抛了异常

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,742评论 13 425
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,097评论 0 43
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,491评论 0 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,353评论 1 15
  • 6月执行易效能情况 一、收获 在生活这一块,我觉得自己坚持得很好,早起——反思——饮食——运动——冥想这一系列的线...
    朱林115阅读 242评论 0 1