Kafka Direct 跟Receiver方式的区别
Receiver
Receiver是使用Kafka的 High-Level Consumer API来实现的。Receiver从Kafka中获取的数据都存储在Spark Executor的内存中的(
如果数据暴增,数据大量堆积,容易出现oom的问题
),Spark Streaming启动的job会去处理那些数据。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据,如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL),该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS,S3)上的预写日志中,所以当底层节点出现了失败,可以通过WAL中的数据进行恢复,但是效率会下降。
使用时注意事项:
1.操作简单,代码量少,不需要手动管理offset,需要开启wal机制,可以保证数据不丢失,但效率会减低,并且为了保证数据不丢失,将一份数据存两份,浪费资源
2.无法保证数据只被处理一次,在写入外部存储的数据还未将offset更新到zk就挂掉,这些数据会被重复消费
3.kafka的topic的分区和spark streaming生成的rdd分区不相关,增加topic的分区数,只会增加reciver读取分区数据的线程数,并不会提高spark的处理数据的并行度
Direct
Direct 使用Kafka的Low-Level Consumer api读取kafka数据,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的Low-Level Consumer api来获取Kafka指定offset范围的数据。
使用时注意事项:
1.当读取topic的数据时候,会自动对应topic的分区生成对应的RDD分区并行从Kafka中读取数据,在Kafka partition和RDD partition之间,有一对一的映射关系。
2.不需要开启WAL机制,只要Kafka中作了数据的备份,那么就可以使用通过Kafka的副本进行恢复。
3.Spark内部一定时同步的,所以可以自己跟踪offset并保存到checkpoint中,可以保证数据不会被重复消费
4.操作复杂,代码量大,并且需要自己对offset监控维护,增加用户开发成本
Receiver配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,SparkStreaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
区别
Receiver | Direct |
---|---|
需要开启WAL |
不需要开启WAL |
使用高层次 api |
使用简单api |
zk自动维护 |
手动维护offset |
无法保证数据被处理一次 |
数据只被处理一次 |
代码简单,量少 |
代码复杂,量大 |
topic分区与rdd分区不是一对一的关系 |
topic分区与rdd分区是一对一的关系 |
由receiver拉取kafka数据 |
由rdd分区拉取对应分区的数据(kafka与rdd分区相等的情况) |
.. |
.. |
连接kafka的两种方式 (receiver&direct) 栗子
Maven依赖
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.3</spark.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
Receiver
package xzw.shuai.kafka.demo
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafkaReceiver {
private val topics = "receiver-test"
private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
private val numThreads = 1
def main(args: Array[String]): Unit = {
//当应用程序停止的时候,会将当前批次的数据处理完成后在停止
System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
//1000*分区数*采样时间=拉取数据量
System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
//设置监控级别
.set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(HDFS_PATH)
val kafkaParams = Map(
"metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
"zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
"group.id" -> "receiver"
)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)
// word count
kafkaDStream
.map(_._2) // 1是分区号,2是具体kafka中数
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print(10) // 输出结果
ssc.start()
ssc.awaitTermination()
}
}
Direct
package xzw.shuai.kafka.demo
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat
object SparkKafkaDirect {
private val zkHosts = "node01:2181,node02:2181,node03:2181"
private val logger = Logger.getLogger("SparkKafkaDirect")
private val zkPath = "/kafka-direct-test"
private val topic = Set("direct-test")
private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map(
"metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
"group.id" -> "direct"
)
val zkClient: ZkClient = new ZkClient(zkHosts)
// 读取 offset
val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)
// 获取到kafka数据
val kafkaDStream: InputDStream[(String, String)] = offsets match {
// 使用 direct方式消费kafka数据
case None =>
print("start from scratch")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
case Some(offset) =>
print("start with the offset")
val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
}
// word count
kafkaDStream.map(_._2) // 1是分区号,2是具体kafka中数
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.foreachRDD(print(_)) // 输出结果
// 保存偏移量到zk中 , 也可自定义到其他存储介质
kafkaDStream.foreachRDD(rdd =>
saveOffset(zkClient, zkHosts, zkPath, rdd)
)
ssc.start()
ssc.awaitTermination()
}
// 保存 offset
def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
logger.info("save offsets to Zookeeper")
val stopwatch = new Stopwatch()
val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => logger.debug(s" Using $offsetRange"))
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
.mkString(",")
logger.info("writing offsets to Zookeeper zkClient=" + zkClient + " zkHosts=" + zkHosts + "zkPath=" + zkPath + " offsetsRangesStr:" + offsetsRangesStr)
updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
logger.info("done updating offsets in zookeeper. took " + stopwatch)
}
// 读取 offset
def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {
val stopwatch = new Stopwatch()
val stat = new Stat()
val dataAndStat: (Option[String], Stat) = try {
(Some(zkClient.readData(zkPath, stat)), stat)
} catch {
case _ => (None, stat)
case e2: Throwable => throw e2
}
// 获取offset
dataAndStat._1 match {
case Some(offsetsRangeStr) =>
logger.info(s" Read offset ranges: $offsetsRangeStr")
val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
.map(str => str.split(":"))
.map {
case Array(partitions, offset) =>
TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
}.toMap
logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
Some(offset)
case None =>
logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
None
}
}
// 更新 zk中的 offset
def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
try {
zkClient.writeData(zkPath, offsetsRangesStr)
} catch {
// 如果失败了 ==> 没有此目录,则创建目录
case _: ZkNoNodeException =>
createParentPath(zkClient, zkPath)
try {
// 创建一个持久的节点 ==> 即 目录
// 在offset写入到 该节点
zkClient.createPersistent(zkPath, offsetsRangesStr)
} catch {
case _: ZkNodeExistsException =>
zkClient.writeData(zkPath, offsetsRangesStr)
case e2: Throwable => throw e2
}
case e2: Throwable => throw e2
}
}
// 如果path不存在,则创建
def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
if (parentDir.length != 0)
zkClient.createPersistent(parentDir, true)
}
// 过程时间
class Stopwatch {
private val start = System.currentTimeMillis()
override def toString: String = (System.currentTimeMillis() - start) + " ms"
}
}