spark streaming 接受socket数据 实现单词统计
package streamingSocket
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingSocket {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
val inputStream: ReceiverInputDStream[String] = ssc.socketTextStream("node-1", 9999)
val lines: DStream[String] = inputStream.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = lines.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
自带的print是默认打印前10行
运行spark自带的example:
因为程序运行起来不断的输出日志信息,导致无法看清楚单词计数的结果,所以将结果输出到一个文本文件当中,这里起名为
wordCountResult
,就在spark的目录下。
这样一来日志不断的在终端输出,运算的结果输出到文本文件中。
vi wordCountResult
:这是在保存输出结果的文本wordCountResult中得到的