优雅关闭spark streaming job填坑之路

业务相关代码已接近尾声,接下来要做下维护相关的工作,不可不提的就是spark streaming 优雅退出。

集群环境&参数

   spark:  2.2
   kafka:  0.10.1
    mode:  spark on yarn cluster 
   offset: enable.auto.commit=false 

driver kill

    ss -tanlp |  grep {driver port }|awk '{print $6}'|awk  -F, '{print $2}'    
    获取到driver pid进程号;然后 kill pid(不要使用kill -9)

spark 提供的参数

   设置:spark.streaming.stopGracefullyOnShutdown=true
   实验效果:当前batch未进行完就直接退出, 有重复消费数据的问题;
   分析:  该参数在yarn cluster mode 下不起作用。 please refer:https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/

Add Shutdown Hook

   add hook code:
   Runtime.getRuntime().addShutdownHook(
             new Thread(() -> {
                 logger.warn("start to stop gracefully .....................");
                 streamContext.stop(true,true);
                 logger.warn("right now to stop gracefully .....................");
             })); 
  实验效果: 会等待当前batch 处理完毕;并且不再接受新的input;能达到效果
  问   题: 但是面临的一个问题最后一个batch能正常处理,但是由于 consumer 已经killed; 该 kafka topic 在 当前 group下发生啦reblance;导致偏移量提交失败。
  分   析:    提交偏移量代码如下,提交是一个异步的过程
     ((CanCommitOffsets)   stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                            System.err.println("onComplete:" + offsets.size());
                            System.err.println("is failed " + exception);
                            System.err.println("commit flag inner over .........." + offsetRanges[0].fromOffset() + "    " + offsetRanges[0].untilOffset());

                        });
                        System.err.println("commit flag outer over .........." + offsetRanges[0].fromOffset()  +"    "+offsetRanges[0].untilOffset());
         实验结果多次发现:回调函数内未有日志打印,  相反 "commit flag outer over" 该日志片段却有在打印。导致下次重新启动;当前batch又被重新处理
  改   进: 坚持以本次任务本次解决的宗旨,尝试在 hook code 中 再次提交失败的偏移量。
             使用 原生 Kafka consumer API,依据缓存起来的 offsetRange[] 信息,再次提交偏移量;有以下方案:
         params:max.poll.records=1
         1:enable.auto.commit=false;consumer.commitSync();失败后异步提交:consumer.commitAsync(); 主要是kafka topic有在做 reblance的情况;
         2:enable.auto.commit=true; 自动提交偏移量
  目前使用使用的方案1;经过多次测试;可以满足需求

add stop maker

   依赖于hdfs设置maker file,单独线程扫描;有该文件就使用stop(true,true)关闭服务
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容