22 spark-streaming

概述

spark-streaming 并不是真正意义上的实时计算,底层还是对数据源进行分批处理,会有一定时间的延迟。不想storm那样对每条数据进行实时处理

安装nc服务

为了方便测试,我们可以使用socket向streaming传送数据。以后可以用kafka等消息中间件来代替

安装
 -ivh nc-1.84-22.el6.x86_64.rpm
启动nc服务
nc -lk 9999

streaming的2中初始化方式

image.png

在命令行上我们喜欢使用第一种

本地运行streaming,读取socket流数据

注意点1:本地运行默认指定 master uri 的并行度为1 即只有一个线程,那这个线程只会干一件事,读取流数据,不会去做处理。所以我们在启动spark-shell的时候需要指定的线程数一定要大于1.在集群模式下就可以不指定。


image.png

注意点2 :我们指向启动本地spark-shell,不想依赖metastore,hive等,则需要把hive-site.xml删掉,这样启动的时候就不会寻找metastore
注意点3:不想依赖hdfs,需要把spark-env.xml中的hadoop_conf_dir注释掉

 def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession
      .builder
      .appName("test")
      .master("yarn-cluster")//指定master url
      .getOrCreate()//获取sparkSession

    //把SparkSession转换成sparkContext
    val sc = spark.sparkContext;

    //创建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5));

    val lines = ssc.socketTextStream("bigdata-pro02.kfk.com",9999);
    val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
    words.print();
    ssc.start();
    ssc.awaitTermination();

  }
----------------------------------一下代码直接在命令行输入---------------------------
bin/spark-shell --master local[2]
:paste
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc,Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com",9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
words.print();
ssc.start();
ssc.awaitTermination();

streaming 保存数据到外部系统

保存到mysql中

def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession
      .builder
      .appName("test")
      .master("yarn-cluster") //指定master url
      .getOrCreate() //获取sparkSession

    //把SparkSession转换成sparkContext
    val sc = spark.sparkContext;

    //创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5));

    val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999);
    val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
    //将遍历所有的RDD,再遍历所有的分布块存储的数据集
    words.foreachRDD(wd => wd.foreachPartition(data => {
      Class.forName("com.mysql.jdbc.Driver")
      val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro03.kfk.com/spark","root","123456");
      try{
        //将数据集中的每个数据拿出来
        for(row <- data){
          val sql = "insert into test1(id,name) values("+row._2+",'"+row._1+"')";
          conn.prepareStatement(sql).executeUpdate();
        }
      }finally {
        if(conn != null){
          conn.close();
        }
      }
    }))
    ssc.start();
    ssc.awaitTermination();
  }
--------------------------------------------------------------------------------------
bin/spark-shell --master local[2]
:paste
import java.sql.DriverManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(5));
val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999);
val words = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_)
words.foreachRDD(wd => wd.foreachPartition(data => {
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro03.kfk.com/spark","root","123456");
try{
for(row <- data){
val sql = "insert into test1(id,name) values("+row._2+",'"+row._1+"')";
conn.prepareStatement(sql).executeUpdate();
}
}finally {
if(conn != null){
conn.close();
}
}
}))
ssc.start();
ssc.awaitTermination();

streaming与kafka的集成

集成所依赖的jar包有2种,一定要注意版本兼容性


image.png
添加依赖
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
      <version>2.2.0</version>
    </dependency>

def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder()
      .appName("kafkaToStreaming")
      .master("local[2]")
      .getOrCreate()
    //创建SparkStreamingContext
    val ssc  = new StreamingContext(spark.sparkContext,Seconds(5));
    var kafkaMapParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")
    val topicSet = Set("weblog")

    //这里用的KafkaUtils是org.apache.spark.streaming.kafka包下的
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaMapParams,topicSet)
    //message是map,从kafka取出来的信息在value上,所以我们只取_._2
    val lines = messages.map(_._2); // 返回值是List(),装的是map的value
    val words = lines.flatMap(_.split(" ")).map( x => (x,1L)).reduceByKey(_+_);
    words.print();

    ssc.start();
    ssc.awaitTermination();

  }

在idea上启动程序
启动所有节点的zookeeper
启动所有节点的kafka,
启动kafka的消息生产者
发送消息,idea上就能监听到了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容