概述
spark-streaming 并不是真正意义上的实时计算,底层还是对数据源进行分批处理,会有一定时间的延迟。不想storm那样对每条数据进行实时处理
安装nc服务
为了方便测试,我们可以使用socket向streaming传送数据。以后可以用kafka等消息中间件来代替
安装
-ivh nc-1.84-22.el6.x86_64.rpm
启动nc服务
nc -lk 9999
streaming的2中初始化方式

image.png
在命令行上我们喜欢使用第一种
本地运行streaming,读取socket流数据
注意点1:本地运行默认指定 master uri 的并行度为1 即只有一个线程,那这个线程只会干一件事,读取流数据,不会去做处理。所以我们在启动spark-shell的时候需要指定的线程数一定要大于1.在集群模式下就可以不指定。

image.png
注意点2 :我们指向启动本地spark-shell,不想依赖metastore,hive等,则需要把hive-site.xml删掉,这样启动的时候就不会寻找metastore
注意点3:不想依赖hdfs,需要把spark-env.xml中的hadoop_conf_dir注释掉
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession
.builder
.appName("test")
.master("yarn-cluster")//指定master url
.getOrCreate()//获取sparkSession
//把SparkSession转换成sparkContext
val sc = spark.sparkContext;
//创建StreamingContext
val ssc = new StreamingContext(sc,Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com",9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
words.print();
ssc.start();
ssc.awaitTermination();
}
----------------------------------一下代码直接在命令行输入---------------------------
bin/spark-shell --master local[2]
:paste
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc,Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com",9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
words.print();
ssc.start();
ssc.awaitTermination();
streaming 保存数据到外部系统
保存到mysql中
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession
.builder
.appName("test")
.master("yarn-cluster") //指定master url
.getOrCreate() //获取sparkSession
//把SparkSession转换成sparkContext
val sc = spark.sparkContext;
//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
//将遍历所有的RDD,再遍历所有的分布块存储的数据集
words.foreachRDD(wd => wd.foreachPartition(data => {
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro03.kfk.com/spark","root","123456");
try{
//将数据集中的每个数据拿出来
for(row <- data){
val sql = "insert into test1(id,name) values("+row._2+",'"+row._1+"')";
conn.prepareStatement(sql).executeUpdate();
}
}finally {
if(conn != null){
conn.close();
}
}
}))
ssc.start();
ssc.awaitTermination();
}
--------------------------------------------------------------------------------------
bin/spark-shell --master local[2]
:paste
import java.sql.DriverManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
words.foreachRDD(wd => wd.foreachPartition(data => {
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro03.kfk.com/spark","root","123456");
try{
for(row <- data){
val sql = "insert into test1(id,name) values("+row._2+",'"+row._1+"')";
conn.prepareStatement(sql).executeUpdate();
}
}finally {
if(conn != null){
conn.close();
}
}
}))
ssc.start();
ssc.awaitTermination();
streaming与kafka的集成
集成所依赖的jar包有2种,一定要注意版本兼容性

image.png
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<version>2.2.0</version>
</dependency>
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder()
.appName("kafkaToStreaming")
.master("local[2]")
.getOrCreate()
//创建SparkStreamingContext
val ssc = new StreamingContext(spark.sparkContext,Seconds(5));
var kafkaMapParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")
val topicSet = Set("weblog")
//这里用的KafkaUtils是org.apache.spark.streaming.kafka包下的
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaMapParams,topicSet)
//message是map,从kafka取出来的信息在value上,所以我们只取_._2
val lines = messages.map(_._2); // 返回值是List(),装的是map的value
val words = lines.flatMap(_.split(" ")).map( x => (x,1L)).reduceByKey(_+_);
words.print();
ssc.start();
ssc.awaitTermination();
}
在idea上启动程序
启动所有节点的zookeeper
启动所有节点的kafka,
启动kafka的消息生产者
发送消息,idea上就能监听到了