SparkStreaming优雅关闭

开启另外一个线程每5秒监听HDFS上一个文件是否存在。如果检测到存在,调用ssc.stop()方法关闭SparkStreaming任务(当你要关闭任务时,可以创建你自定义监控的文件目录)

object SparkStreaming12_Stop {

    def main(args: Array[String]): Unit = {
        // 使用DStream的窗口操作
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming12_Stop")
        sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

        val ssc = new StreamingContext(sparkConf, Seconds(3))
        val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)

        // 将采集的数据进行扁平化操作
        val wordDStream: DStream[String] = lineDStream.flatMap(line=>line.split( " " ))

        // 将扁平化数据进行结构的转变:(word, one)
        val wordToOneDStream: DStream[(String, Long)] = wordDStream.map {
            word => (word, 1L)
        }

        // 将转变解构后的数据进行聚合统计
        val wordToCountDStream: DStream[(String, Long)] = wordToOneDStream.reduceByKey(_+_)

        wordToCountDStream.foreachRDD(rdd=>rdd.foreach(println))

        // 打印结果
        wordToCountDStream.print()

        // 启动新的线程,希望在特殊的场合关闭SparkStreaming
        new Thread(new Runnable {
            override def run(): Unit = {

                while ( true ) {
                    try {
                        Thread.sleep(5000)
                    } catch {
                        case ex : Exception => println(ex)
                    }

                    // 监控HDFS文件的变化
                    val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"), new Configuration(), "root")

                    val state: StreamingContextState = ssc.getState()
                    // 如果环境对象处于活动状态,可以进行关闭操作
                    if ( state == StreamingContextState.ACTIVE ) {

                        // 判断路径是否存在
                        val flg: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark4"))
                        if ( flg ) {
                            // 关闭采集器和Driver:优雅的关闭
                            ssc.stop(true, true)
                            System.exit(0)
                        }

                    }
                }

            }
        }).start()


        // 启动采集器
        ssc.start()
        // Driver程序等待采集器的执行完毕
        ssc.awaitTermination()


    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,145评论 1 32
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,357评论 0 34
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,876评论 1 19
  • 一、简历准备 1、个人技能 (1)自定义控件、UI设计、常用动画特效 自定义控件 ①为什么要自定义控件? Andr...
    lucas777阅读 5,266评论 2 54
  • 我们都知道SparkStreaming程序是一个长服务,一旦运转起来不会轻易停掉,那么如果我们想要停掉正在运行的程...
    尼小摩阅读 3,106评论 1 9