Spark Streaming优雅的关闭策略优化

前面文章介绍了不少有关Spark Streaming的offset的管理以及如何优雅的关闭Spark Streaming的流程序。
到目前为止还有几个问题:

  1. 有关spark streaming集成kafka时,如果kafka新增分区, 那么spark streaming程序能不能动态识别到而不用重启?
  2. 如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?
  3. spark streaming优雅关闭的策略还有那些?

问题一,如果kafka要新增分区,对于正在运行的实时流程序能否动态识别到?

经过测试,是不能识别的,我推测使用createDirectStream创建流对象一旦创建就是不可变的,也就是说创建实例那一刻的分区数量,会一直使用直到流程序结束,就算中间kafka的分区数量扩展了,流程序也是不能识别到的。所以在扩展kafka分区前,一定要先把流程序给停掉,然后扩展完成后需要再次重启流程序。

问题二,如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?

如果是我们自己管理offset时,一定要考虑到kafka扩展分区的情况,每次启动程序前都得检测下目前保存的偏移量里面的kafka的分区个数是否小于kafka实际元数据里面实际的分区个数,正常没扩展分区的情况下两个值应该是相等的,如果值不一致,就说明是kafka分区得到扩展了,所以我们的程序需要能够兼容这种情况。
核心代码如下:

//这个topic在zk里面最新的分区数量
        val  lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get
        var offsets = offsetsRangesStr.split(",")//按逗号split成数组
          .map(s => s.split(":"))//按冒号拆分每个分区和偏移量
          .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最终的格式
          .toMap//返回一个Map

        //说明有分区扩展了
        if(offsets.size<lastest_partitions.size){
          //得到旧的所有分区序号
          val old_partitions=offsets.keys.map(p=>p.partition).toArray
          //通过做差集得出来多的分区数量数组
          val add_partitions=lastest_partitions.diff(old_partitions)
          if(add_partitions.size>0){
            log.warn("发现kafka新增分区:"+add_partitions.mkString(","))
            add_partitions.foreach(partitionId=>{
              offsets += (TopicAndPartition(topic,partitionId)->0)
              log.warn("新增分区id:"+partitionId+"添加完毕....")
            })

          }

        }else{
          log.warn("没有发现新增的kafka分区:"+lastest_partitions.mkString(","))
        }

上面的代码在每次启动程序时,都会检查当前我们自己管理的offset的分区数量与zk元数据里面实际的分区数量,如果不一致就会把新增的分区id给加到TopicAndPartition里面并放入到Map对象里面,这样在启动前就会传入到createDirectStream对象中,就能兼容新增的分区了。

spark streaming优雅关闭的策略还有那些?

前面的文章谈到过我们可以有两种方式来更加优雅的停止流程序,分别是通过http暴露服务,和通过HDFS做消息中转来定时扫描mark文件是否存在来触发关闭服务。

通过http暴露服务的核心代码:
/****
    * 负责启动守护的jetty服务
    * @param port 对外暴露的端口号
    * @param ssc Stream上下文
    */
  def daemonHttpServer(port:Int,ssc: StreamingContext)={
    val server=new Server(port)
    val context = new ContextHandler();
    context.setContextPath( "/close" );
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
  }

  /*** 负责接受http请求来优雅的关闭流
    * @param ssc  Stream上下文
    */
  class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      log.warn("开始关闭......")
      ssc.stop(true,true)//优雅的关闭
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      out.println("close success");
      baseRequest.setHandled(true);
      log.warn("关闭成功.....")
    }
  }
另一种通过扫描HDFS文件的方式:
/***
    * 通过一个消息文件来定时触发是否需要关闭流程序
    * @param ssc StreamingContext
    */
  def stopByMarkFile(ssc:StreamingContext):Unit= {
    val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
    var isStop = false
    val hdfs_file_path = "/spark/streaming/stop" //判断消息文件是否存在,如果存在就
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExistsMarkFile(hdfs_file_path)) {
        log.warn("2秒后开始关闭sparstreaming程序.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }

    }
  }

    /***
      * 判断是否存在mark file
      * @param hdfs_file_path  mark文件的路径
      * @return
      */
    def isExistsMarkFile(hdfs_file_path:String):Boolean={
      val conf = new Configuration()
      val path=new Path(hdfs_file_path)
      val fs =path.getFileSystem(conf);
      fs.exists(path)
    }

上面是两种方式的核心代码,最后提下触发停止流程序:

第一种需要在启动服务的机器上,执行下面封装的脚本:

## tx.log是提交spark任务后的输出log重定向的log 
## &> tx.log &  

 #!/bin/bash
    driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`

    echo $driver

    curl http://$driver:port/close/

  echo "stop finish"

第二种方式,找到一个拥有HDFS客户端机器,向HDFS上写入指定的文件:

#生成文件后,10秒后程序就会自动停止
hadoop fs -touch /spark/streaming/stop

#下次启动前,需要清空这个文件,否则程序启动后就会停止
hadoop fs -rm -r /spark/streaming/stop

所有代码,已经同步更新到我的github上,有兴趣的朋友可以参考这个链接:https://github.com/Talefairy/sparkStreaming-offset-to-zk

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容