编程模型
结构化数据流中的关键思想是将实时数据流视为一个不断附加的表。这导致新的流处理模型与批处理模型非常相似。您将把流式计算表示为标准批量查询,就像在静态表上一样,Spark将它作为增量查询在无界输入表上运行。让我们更详细地了解这个模型。
与kafka的集成
1、参考文档
http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
2、kafka的版本
Kafka broker version 0.10.0 or higher
3、示例1 在ide上运行
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0
################编写代码在ide上启动##################
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("StructuredStreamingKafka")
.getOrCreate()
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
.option("subscribe","weblog")
.load()
val lines = df.selectExpr("CAST(value as STRING)")//对字段进行UDF操作,并返回该列
.as[String]
val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
//开启
val query = wordCount.writeStream
.outputMode("complete") //模式,complete,updata,
.format("console") //输出的地方在控制台
.start()
query.awaitTermination()
}
###############启动指定节点上的kafka和消息生产者##################
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --replication-factor 3 --partitions 1 --topic weblog
3、示例2 在spark-shell上运行
jars上需要导包
kafka_2.11-0.10.0.0.jar
kafka-clients-0.10.0.0.jar
spark-sql-kafka-0-10_2.11-2.2.0.jar
spark-streaming-kafka-0-10_2.11-2.1.0.jar
#####################启动spark-shell#################
bin/spark-shell
:paste
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
.option("subscribe","weblog")
.load()
val lines = df.selectExpr("CAST(value as STRING)")
.as[String]
val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCount.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
与mysql集成——输出到mysql中
spark2.2.0暂没有api直接输出道mysql中,但是可以利用重写ForeachWriter的方法,将每一行数据写入到mysql中。如果数据量非常大,建议先写到kafka中存储,kafka按照队列的排序进行写入到mysql中
jdbcSink类
package toMysql
import java.sql._
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Created by zhongyuan on 2018/3/18.
*/
class jdbcSink(url:String,user:String,pwd:String) extends ForeachWriter[Row]{
val driver = "com.mysql.jdbc.Driver";
var statement:Statement = _;
var connection:Connection = _;
//创建连接
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver);
connection = DriverManager.getConnection(url,user,pwd);
this.statement = connection.createStatement();
true;
}
//执行sql
override def process(value: Row): Unit = {
statement.executeUpdate("insert into wordcount values('"+value.getAs("value")+"',"+value.getAs("count")+")")
}
//关闭资源
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}
主函数 StructuredStreamingKafkaMysql
package toMysql
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.ProcessingTime
/**
* Created by zhongyuan on 2018/3/18.
*/
object StructuredStreamingKafkaMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("StructuredStreamingKafkaMysql")
.getOrCreate()
import spark.implicits._
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
.option("subscribe","weblog")
.load()
val lines = df.selectExpr("CAST(value as STRING)")//对字段进行UDF操作,并返回该列
.as[String]
val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
//输出到外部mysql
val url = "jdbc:mysql://bigdata-pro03.kfk.com/spark"
val user = "root"
val pwd = "123456"
val writer:ForeachWriter[Row] = new jdbcSink(url,user,pwd);//新建自定义类
val query = wordCount
.writeStream
.foreach(writer)//forEach()里只能写ForeachWriter[Row]类,所以需要指定writer的类型
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
query.awaitTermination()
}
}
执行顺序
先启动指定所有节点的zookeeper
在启动指定节点的kafka
启动指定节点的topic为weblog的消息生产者
启动指定节点的mysql
启动ide程序
利用消息producer来发送消息
查询mysql中是否有数据