总结一下,避免后面再重复踩坑。
Spark Streaming是近实时(near real time)的小批处理系统, 可以对接各类消息中间或者直接监控Hdfs目录, 可以做为实时大数据流式计算,也可以做一些按时间窗口的数据聚合分析,比如流量监控之类的, 主要的优势是和spark-sql, spark-mlib, spark-graphx无缝结合的生态系统。
官方地址: http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
上游数据可以是Kafka, Flume, Hdfs或者是TCP Sockets;处理后的下游数据可以是落到HDFS, 数据库, 或者重新写回消息中间件,随意处理。
maven环境
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>`
</dependency>
spark-streaming2.20适配的消息中间件
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
官方给了一些例子:
nc -lk 9999
同一台机器上socket, 端口9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
Spark Streaming的优势在于:
能运行在100+的结点上,并达到秒级延迟(最小设置batch-time为500ms,再小就很容易task大量堆积)。
使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
能集成Spark的批处理和交互查询。
为实现复杂的算法提供和批处理类似的简单接口
spark Streaming封装了kafka的高级接口: Kafka Integration Guide.
DStream是spark-streaming提供的一个抽象数据类型, 就是按时间切分的一组有序RDD集合。
关于更多的概念和方法参考官网教程, 这里总结一下使用的一些坑和优化:
一, kerberos 认证问题:
问题: 我们的hadoop访问有kerberos的认证机制,默认是7天更换,刚开始没注意这个问题,spark-streaming的程序每隔一周崩一次
解决:
--deploy-mode 由 yarn-client模式改为yarn-cluster模式;
--keytab /home/xxx/xxx.keytab --principal xxx@cloudera.xxx.com (刚开始客户端是2.1.0没生效,升级为2.2.0)
二, 优雅结束:
问题:application被人为中断,当前batch的数据没处理完
解决:源代码在spark.stop() 之前加了一个钩子, 来达到优雅退出, 保存断点checkpoint
--conf spark.streaming.stopGracefullyOnShutdown=true;
也可以自己在JVM关闭之前添加钩子, 来附加做一些邮件报警之类的事情(发送kill命令关闭driver进程,不要使用(-9)强制关闭,不然钩子无法捕获)
Runtime.getRuntime().addShutdownHook(
new Thread() { override def run() {`
log("Gracefully stop Spark Streaming") `
streamingContext.stop(true, true) } }`
)
三, 数据缓存和清除:
cache或者persist的数据一定要在foreachRDD中清除掉,不然内存爆炸
spark.streaming.unpersist=true 这个配置只是自动推测并清除缓存数据, 最好还是代码中处理
四,batch的最大处理量,
根据内存和batchDuration设定合理的值, 保证batchDuration时间内能处理完,不造成堆积, 也和流数据大小有关。
– conf spark.streaming.kafka.maxRatePerPartition=1000
五, 应用程序失败自动重启次数, 和重试间隔
--conf spark.yarn.maxAppAttempts=4
--conf [spark.yarn.am](http://spark.yarn.am).attemptFailuresValidityInterval=1h
六,使用YARN Capacity Scheduler调度, 且提交到单独的Yarn队列
--queue realtime_queue
七,开启spark推测执行
# 推测执行开启
spark.speculation true
# 检测周期
spark.speculation.interval 100
# 完成task的百分比时启动推测
spark.speculation.quantile 0.75
# 比其他的慢多少倍时启动推测
spark.speculation.multiplier 1.5
八, 避免单个任务阻塞:
spark.streaming.concurrentJobs=4
九,合理的batchDuration:
不要小于500ms, 太小,会积压数据, 太大,实时性不好
十,合理GC: 开启并行Mark-Sweep垃圾回收机制, 其它的参照JVM的调优,减少full-GC
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
十一,计算效率:
实时计算对效率要求很高(不然大量任务堆积), 所以spark的性能优化的方法在这里通用, 比如:
合理的并行度partition, 一般是core的2~5倍, spark。 spark.default.parallelism=200
spark.sql.shuffle.partitions 设置大一点, 个人比较喜欢spark-sql处理逻辑,这个是sql shuffle时的并行度
spark.rdd.compress=true 缓存时压缩, 默认是false, 节省内存, 但是增加性能损耗
十二, 代码优化:
根据实际情况优化,在线任务和离线任务还是区别很大的,更多关注效率。
处理空Batch:
空batch比较多, 不判断直接写的话会形成很多空文件
if(rdd.count() != 0) 或者 if(!rdd.partitions.isEmpty)
推荐第二种, 数据量比较大时 count很费时间的高性能算子(平时要加强总结):
groupByKey → reduceByKey/aggregateByKey
map → mapPartitions
foreachPartitions → foreach
- 序列化(广播变量, cache, 自定义对象):
通常图省事, 直接继承 java的Serializable 接口。
Spark支持使用Kryo序列化机制, 大概效率是java序列化的10倍, 变少网络传输的数据,减少在集群中耗费的内存资源。
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=true // 应用的类没有注册会报错,默认false
-
使用:需要先注册算子里边用到的类,不然会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。
- 需要序列化的类继承 java.io.Serializable
- 注册类继承KryoRegistrato并且注册那些需要序列化的类
- 在sparkConf中设置spark.serializer和spark.kryo.registrator
十三,其它
checkpoint: http://bit1129.iteye.com/blog/2217505 没用到自带的checkpoint机制
Kyro序列化
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
case class UserInfo(name: String ,age: Int,gender: String, addr: String)
class MyRegisterKryo extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserInfo])
}
}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* 需要序列化的类继承java.io.Serializable
* 注册类继承KryoRegistrator并且注册那些需要序列化的类
* 在sparkConf中设置spark.serializer和spark.kryo.registrator
*/
object KyroExample {
def kyroExample() {
val conf = new SparkConf().setMaster("local[1]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "tools.MyRegisterKryo")
conf.registerKryoClasses(Array(classOf[UserInfo], classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
val sc = new SparkContext(conf)
val arr = new ArrayBuffer[UserInfo]()
val nameArr = Array[String]("lsw","yyy","lss")
val genderArr = Array[String]("male","female")
val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")
for(i <- 1 to 1000){
val name = nameArr(Random.nextInt(3))
val age = Random.nextInt(100)
val gender = genderArr(Random.nextInt(2))
val address = addressArr(Random.nextInt(5))
arr.+=(UserInfo(name,age,gender,address))
}
val start = System.currentTimeMillis()
val rdd = sc.parallelize(arr)
//序列化的方式将rdd存到内存
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
println(System.currentTimeMillis() - start)
sc.stop()
}
}
- 用Kyro来读写硬盘文件 https://www.iteblog.com/archives/1328.html
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
val sc = new SparkContext(conf)
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf) // KryoSerializer对象, rdd.context.getConf获取缓存大小
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
//initializes kyro and calls your registrator class
val kryo = kryoSerializer.newKryo() //map种创建Kryo实例, 线程不安全,只能放在map或者mappartition中
//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
}
def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
minPartitions)
.flatMap(x => {
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[Array[T]]
dataObject
})
}
参考:
Kryo读写硬盘: https://www.iteblog.com/archives/1328.html
Kryo使用: https://blog.csdn.net/cjuexuan/article/details/51485427