前面我们已经了解了sparksql的使用,这一节我们将了解spark当中的流处理即spark-streaming。
1 系统、软件以及前提约束
- CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置
- 已完成spark中的DataFrame编程
https://www.jianshu.com/nb/37554943 - xshell
- 为去除权限对操作的影响,所有操作都以root进行
- 确保hadoop,spark已经启动
2 操作
- 1 在linux命令行中执行以下命令
# 启动8888端口,可以输入值
nc -lk 8888
- 2 启动另外一个窗口,执行以下命令:
# 进入spark的bin目录
cd /root/spark-2.2.1-bin-hadoop2.7/bin
# 进入scala命令行
./spark-shell
# 在命令行中执行以下语句
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("192.168.100.200", 8888, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
- 3 测试
在前一个nc窗口不断输入字符串,回车,我们会看到spark命令行中正在统计这一秒的输入的字符串的词频。
以上,就是sparkstreaming监听端口并进行词频统计的过程。