Spark Streaming WordCount

在spark官网讲解spark streaming的时候,举了一个word count的例子,通过监听一个端口的TCP连接,统计单词的个数。程序如下(in scala):

import org.apache.spark._
import org.apache.spark.streaming._

object NetworkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 val ssc = new StreamingContext(conf, Seconds(5))
 Logger.getRootLogger.setLevel(Level.ERROR)

val lines = ssc.socketTextStream("localhost", 9998)
val wordcount=lines.flatMap(_.split("\\W+"))
                          .map((_,1))
                          .reduceByKey(_+_)
wordcount.print()

ssc.start()
ssc.awaitTermination()

 }
}

然后,再在linux命令窗口中使用nc -lk 9998,输入一串单词,就可以统计单词出现的频率。
但是上面的写法只能统计当时输入的内容,而不能加上以前统计的结果。

有两个方法可以实现:
一,使用一个HashMap来存储以前统计的结果
二,使用DStream提供的updateStateByKey方法

先来看第一种方法(只显示main方法中的code):

    val wordCountMap=new mutable.HashMap[String,Int]()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    Logger.getRootLogger.setLevel(Level.ERROR)
    val lines = ssc.socketTextStream("localhost", 9998)
    val wordcount=lines.flatMap(_.split("\\W+"))
                          .map((_,1))
                          .reduceByKey(_+_)
        //map与reduceByKey两步可以合成一步:countByValue(),但是此
        //时,次数是Long类型,而不是Int类型了
      wordcount.foreachRDD(line=>{
          val array = line.collect()
          array.map(w=>{
          var count= wordCountMap.get(w._1)
          if(count==None){
          wordCountMap+=w
        }else{
          var count2=count.asInstanceOf[Some[Int]].get
          count2+=w._2
          wordCountMap+=Tuple2(w._1,count2)
        }
      })

      var time=new Date(System.currentTimeMillis()).toString
      println(s"---------------Time:$time------------------")
      for(w<-wordCountMap.iterator) println(w)
      println("---------------------end---------------")
    })

 ssc.start()
 ssc.awaitTermination()

此方法使用DStream中的foreachRDD方法,操作RDD,当每一次有新内容进入时,统计单词出现的频率,并累加到之前统计的结果上。

第二种方法(只显示main中的code)

val wordCountMap=new mutable.HashMap[String,Long]()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    Logger.getRootLogger.setLevel(Level.ERROR)
    val lines = ssc.socketTextStream("localhost", 9998)
    ssc.sparkContext.setCheckpointDir("d:/spark_check")
    val wordcount=lines.flatMap(_.split("\\W+"))
                          .map((_,1))
                          .updateStateByKey(updateFunction _)
    wordcount.print()

ssc.start()
ssc.awaitTermination()

需要有一个更新状态的函数:updateFunction

  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    var newCount = if(runningCount!=None) runningCount.get else 0
    if(newValues.size>0)
     newCount +=1
    Some(newCount)
  }

需要注意的是需要设置checkpoint目录,因为spark需要此目录保存状态信息,如果是在windows中运行这段程序,还需要设置HADOOP_HOME环境变量,而且HADOOP_HOME的bin目录下还需要有winutils.exe。每当有新内容进入时,spark都会调用updateFunction函数,newValues是新进入的内容,runningCount是上一个状态。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。