spark-Streaming

总结一下,避免后面再重复踩坑。

Spark Streaming是近实时(near real time)的小批处理系统, 可以对接各类消息中间或者直接监控Hdfs目录, 可以做为实时大数据流式计算,也可以做一些按时间窗口的数据聚合分析,比如流量监控之类的, 主要的优势是和spark-sql, spark-mlib, spark-graphx无缝结合的生态系统。

官方地址: http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html

Spark Streaming

上游数据可以是Kafka, Flume, Hdfs或者是TCP Sockets;处理后的下游数据可以是落到HDFS, 数据库, 或者重新写回消息中间件,随意处理。

maven环境

<dependency>
 <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>`
</dependency>

spark-streaming2.20适配的消息中间件

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

官方给了一些例子:

nc -lk 9999 同一台机器上socket, 端口9999

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

Spark Streaming的优势在于:

  • 能运行在100+的结点上,并达到秒级延迟(最小设置batch-time为500ms,再小就很容易task大量堆积)。

  • 使用基于内存的Spark作为执行引擎,具有高效和容错的特性。

  • 能集成Spark的批处理和交互查询。

  • 为实现复杂的算法提供和批处理类似的简单接口

spark Streaming封装了kafka的高级接口: Kafka Integration Guide.

Spark Streaming

DStream是spark-streaming提供的一个抽象数据类型, 就是按时间切分的一组有序RDD集合。

关于更多的概念和方法参考官网教程, 这里总结一下使用的一些坑和优化:

一, kerberos 认证问题:

问题: 我们的hadoop访问有kerberos的认证机制,默认是7天更换,刚开始没注意这个问题,spark-streaming的程序每隔一周崩一次

解决:

  1. --deploy-mode 由 yarn-client模式改为yarn-cluster模式;

  2. --keytab /home/xxx/xxx.keytab --principal xxx@cloudera.xxx.com (刚开始客户端是2.1.0没生效,升级为2.2.0)

二, 优雅结束:

问题:application被人为中断,当前batch的数据没处理完

解决:源代码在spark.stop() 之前加了一个钩子, 来达到优雅退出, 保存断点checkpoint
--conf spark.streaming.stopGracefullyOnShutdown=true;

也可以自己在JVM关闭之前添加钩子, 来附加做一些邮件报警之类的事情(发送kill命令关闭driver进程,不要使用(-9)强制关闭,不然钩子无法捕获)

Runtime.getRuntime().addShutdownHook(

         new Thread() { override def run() {`

            log("Gracefully stop Spark Streaming")            `

              streamingContext.stop(true, true) } }`

      )

三, 数据缓存和清除:

cache或者persist的数据一定要在foreachRDD中清除掉,不然内存爆炸

spark.streaming.unpersist=true 这个配置只是自动推测并清除缓存数据, 最好还是代码中处理

四,batch的最大处理量,

根据内存和batchDuration设定合理的值, 保证batchDuration时间内能处理完,不造成堆积, 也和流数据大小有关。

– conf spark.streaming.kafka.maxRatePerPartition=1000

五, 应用程序失败自动重启次数, 和重试间隔

  --conf spark.yarn.maxAppAttempts=4
  --conf [spark.yarn.am](http://spark.yarn.am).attemptFailuresValidityInterval=1h

六,使用YARN Capacity Scheduler调度, 且提交到单独的Yarn队列

     --queue realtime_queue

七,开启spark推测执行

# 推测执行开启

spark.speculation                     true

# 检测周期

spark.speculation.interval 100

# 完成task的百分比时启动推测

spark.speculation.quantile 0.75

# 比其他的慢多少倍时启动推测

spark.speculation.multiplier 1.5

八, 避免单个任务阻塞:

spark.streaming.concurrentJobs=4

九,合理的batchDuration:

不要小于500ms, 太小,会积压数据, 太大,实时性不好

十,合理GC: 开启并行Mark-Sweep垃圾回收机制, 其它的参照JVM的调优,减少full-GC

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

十一,计算效率:

实时计算对效率要求很高(不然大量任务堆积), 所以spark的性能优化的方法在这里通用, 比如:

  1. 合理的并行度partition, 一般是core的2~5倍, spark。 spark.default.parallelism=200

  2. spark.sql.shuffle.partitions 设置大一点, 个人比较喜欢spark-sql处理逻辑,这个是sql shuffle时的并行度

  3. spark.rdd.compress=true 缓存时压缩, 默认是false, 节省内存, 但是增加性能损耗

  4. 参照 http://spark.apache.org/docs/latest/tuning.html

十二, 代码优化:

根据实际情况优化,在线任务和离线任务还是区别很大的,更多关注效率。

  1. 处理空Batch:
    空batch比较多, 不判断直接写的话会形成很多空文件
    if(rdd.count() != 0) 或者 if(!rdd.partitions.isEmpty)
    推荐第二种, 数据量比较大时 count很费时间的

  2. 高性能算子(平时要加强总结):

   groupByKey  →  reduceByKey/aggregateByKey
   map →  mapPartitions
   foreachPartitions  →  foreach
  1. 序列化(广播变量, cache, 自定义对象):

通常图省事, 直接继承 java的Serializable 接口。

Spark支持使用Kryo序列化机制, 大概效率是java序列化的10倍, 变少网络传输的数据,减少在集群中耗费的内存资源。

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark.kryo.registrationRequired=true // 应用的类没有注册会报错,默认false

  • 使用:需要先注册算子里边用到的类,不然会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。

    • 需要序列化的类继承 java.io.Serializable
    • 注册类继承KryoRegistrato并且注册那些需要序列化的类
    • 在sparkConf中设置spark.serializer和spark.kryo.registrator

十三,其它

checkpoint: http://bit1129.iteye.com/blog/2217505 没用到自带的checkpoint机制


Kyro序列化


import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

case class UserInfo(name: String ,age: Int,gender: String, addr: String)

class MyRegisterKryo extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[UserInfo])
  }
}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random


/**
  * 需要序列化的类继承java.io.Serializable
  * 注册类继承KryoRegistrator并且注册那些需要序列化的类
  * 在sparkConf中设置spark.serializer和spark.kryo.registrator
  */

object KyroExample {

  def kyroExample() {
    val conf = new SparkConf().setMaster("local[1]").setAppName("KyroTest")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrator", "tools.MyRegisterKryo")
    conf.registerKryoClasses(Array(classOf[UserInfo], classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
    val sc = new SparkContext(conf)

    val arr = new ArrayBuffer[UserInfo]()

    val nameArr = Array[String]("lsw","yyy","lss")
    val genderArr = Array[String]("male","female")
    val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")

    for(i <- 1 to 1000){
      val name = nameArr(Random.nextInt(3))
      val age = Random.nextInt(100)
      val gender = genderArr(Random.nextInt(2))
      val address = addressArr(Random.nextInt(5))
      arr.+=(UserInfo(name,age,gender,address))
    }
    val start = System.currentTimeMillis()

    val rdd = sc.parallelize(arr)

    //序列化的方式将rdd存到内存
    rdd.persist(StorageLevel.MEMORY_ONLY_SER)
    println(System.currentTimeMillis() - start)
    sc.stop()
  }
}
 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库

                  val sc = new SparkContext(conf)

 def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {

                        val kryoSerializer = new KryoSerializer(rdd.context.getConf)    //  KryoSerializer对象, rdd.context.getConf获取缓存大小

rdd.mapPartitions(iter => iter.grouped(10)
      .map(_.toArray))
      .map(splitArray => {
      //initializes kyro and calls your registrator class
      val kryo = kryoSerializer.newKryo()   //map种创建Kryo实例, 线程不安全,只能放在map或者mappartition中
 
      //convert data to bytes
      val bao = new ByteArrayOutputStream()    
      val output = kryoSerializer.newKryoOutput()  
      output.setOutputStream(bao)
      kryo.writeClassAndObject(output, splitArray)
      output.close()
 
      // We are ignoring key field of sequence file
      val byteWritable = new BytesWritable(bao.toByteArray)
      (NullWritable.get(), byteWritable)
    }).saveAsSequenceFile(path)

}

def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)

    (implicit ct: ClassTag[T]) = {

    val kryoSerializer = new KryoSerializer(sc.getConf)

    sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],

       minPartitions)

       .flatMap(x => {

       val kryo = kryoSerializer.newKryo()

       val input = new Input()

       input.setBuffer(x._2.getBytes)

       val data = kryo.readClassAndObject(input)

       val dataObject = data.asInstanceOf[Array[T]]

       dataObject

    })

  }

参考:

Kryo读写硬盘: https://www.iteblog.com/archives/1328.html

Kryo使用: https://blog.csdn.net/cjuexuan/article/details/51485427

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容