spark streaming+kafka ran out of message报错(java)

Caused by: java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 699792 for topic code-commit partition 7 start 699542. This should not happen, and indicates that messages may have been lost
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    ... 3 more

原因:
1、是由于要消费的kafka的offset(此处是699542~699792)的消息的大小超过了默认设置(1024*1024=1M),应该要将拉取消息的大小的配置扩大。
2、在消费端配置,此处在spark streaming里配置拉取kafka消息的参数,kafkaParameters.put("fetch.message.max.bytes","52428800");注意里面的值不能超过int类型,消费端是spark streaming

kafkaParameters.put("metadata.broker.list","node1:9092");
        kafkaParameters.put("fetch.message.max.bytes","52428800");
        String topic = "testTopic";
        Set<String> topics = new HashSet<>();
        topics.add(topic);
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("testTopic", 0),20680L);
JavaInputDStream<String> message = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                String.class,
                kafkaParameters,
                fromOffsets,
                new Function<MessageAndMetadata<String, String>, String>() {
                    public String call(MessageAndMetadata<String, String> v1) throws Exception {
                        System.out.println(v1.message());
                        return v1.message();
                    }
                }
        );

补充:
--conf spark.streaming.kafka.maxRatePerPartition
这个配置是设置spark streaming每个batch拉取kafka的数据量的,如果kafka的topic的partition是8个,设置的时间间隔是10s,这个时候--conf spark.streaming.kafka.maxRatePerPartition=5,每个batch拉取的数据量就是8x10x5=400条数据。如果设置的时间间隔是60s,每个batch拉取的数据量就是8x60x5=400条数据2400条。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 姓名:何宏伟 【嵌牛导读】 在使用移动设备或者PC时,关键时刻总会遇到上网比较慢的问题,使用搜索引擎之后得到一个简...
    亨利何阅读 36,686评论 0 2
  • 原作者: Benjamin Hardy 文章来源:Medium 翻译:Joyce Cheng 译文仅供个人学习,不...
    JoyceCheng阅读 14,508评论 48 562
  • 正群雄纷争,又南蛮入侵。桃园结义扶社稷,铁索连环沉江底。铁骑奔腾万箭发,青龙丈八闪逃杀。龙胆在手能进退,乐不思蜀温...
    红叶竹马阅读 2,688评论 0 3
  • 怎样解决拖延症?重新审视目标,想为什么心不愿意去做的原因,然后理智的做出判断,2统一后立马行动! 1写作要找准自己...
    敢想才敢做阅读 988评论 0 1
  • 晚上睡得迷迷糊糊的时候,耳朵周围传来像飞机轰炸的“嗡嗡”声,人马上被惊醒了!几秒钟后,“嗡嗡”声又来了,脸上敏感的...
    斗米半升阅读 3,558评论 0 0