Spark 连接kafka两种方式及区别(direct和receiver)

Kafka Direct 跟Receiver方式的区别

Receiver

Receiver

Receiver是使用Kafka的 High-Level Consumer API来实现的。Receiver从Kafka中获取的数据都存储在Spark Executor的内存中的(如果数据暴增,数据大量堆积,容易出现oom的问题),Spark Streaming启动的job会去处理那些数据。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据,如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL),该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS,S3)上的预写日志中,所以当底层节点出现了失败,可以通过WAL中的数据进行恢复,但是效率会下降。

使用时注意事项:

1.操作简单,代码量少,不需要手动管理offset,需要开启wal机制,可以保证数据不丢失,但效率会减低,并且为了保证数据不丢失,将一份数据存两份,浪费资源
2.无法保证数据只被处理一次,在写入外部存储的数据还未将offset更新到zk就挂掉,这些数据会被重复消费
3.kafka的topic的分区和spark streaming生成的rdd分区不相关,增加topic的分区数,只会增加reciver读取分区数据的线程数,并不会提高spark的处理数据的并行度

Direct

Direct

Direct 使用Kafka的Low-Level Consumer api读取kafka数据,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的Low-Level Consumer api来获取Kafka指定offset范围的数据。

使用时注意事项:

1.当读取topic的数据时候,会自动对应topic的分区生成对应的RDD分区并行从Kafka中读取数据,在Kafka partition和RDD partition之间,有一对一的映射关系。
2.不需要开启WAL机制,只要Kafka中作了数据的备份,那么就可以使用通过Kafka的副本进行恢复。
3.Spark内部一定时同步的,所以可以自己跟踪offset并保存到checkpoint中,可以保证数据不会被重复消费
4.操作复杂,代码量大,并且需要自己对offset监控维护,增加用户开发成本

Receiver配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,SparkStreaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

区别

Receiver Direct
需要开启WAL 不需要开启WAL
使用高层次 api 使用简单api
zk自动维护 手动维护offset
无法保证数据被处理一次 数据只被处理一次
代码简单,量少 代码复杂,量大
topic分区与rdd分区不是一对一的关系 topic分区与rdd分区是一对一的关系
由receiver拉取kafka数据 由rdd分区拉取对应分区的数据(kafka与rdd分区相等的情况)
.. ..


连接kafka的两种方式 (receiver&direct) 栗子

Maven依赖

 <properties>
 <scala.version>2.11.8</scala.version>
 <spark.version>2.1.3</spark.version>
 <scala.binary.version>2.11</scala.binary.version>
 </properties>
 <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <!-- spark-streaming kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
 </dependencies>

Receiver

package xzw.shuai.kafka.demo

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaReceiver {

  private val topics = "receiver-test"
  private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
  private val numThreads = 1

  def main(args: Array[String]): Unit = {
    //当应用程序停止的时候,会将当前批次的数据处理完成后在停止
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //1000*分区数*采样时间=拉取数据量
    System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
      //设置监控级别
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(HDFS_PATH)
    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
      "group.id" -> "receiver"
    )
    
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)

    // word count
    kafkaDStream
      .map(_._2) // 1是分区号,2是具体kafka中数
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print(10) // 输出结果

    ssc.start()
    ssc.awaitTermination()
  }
}

Direct

package xzw.shuai.kafka.demo

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat

object SparkKafkaDirect {
  private val zkHosts = "node01:2181,node02:2181,node03:2181"
  private val logger = Logger.getLogger("SparkKafkaDirect")
  private val zkPath = "/kafka-direct-test"
  private val topic = Set("direct-test")
  private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val ssc = new StreamingContext(sc, Seconds(5))    

    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "group.id" -> "direct"
    )

    val zkClient: ZkClient = new ZkClient(zkHosts)

    // 读取 offset
    val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)

    // 获取到kafka数据
    val kafkaDStream: InputDStream[(String, String)] = offsets match {
      // 使用 direct方式消费kafka数据
      case None =>
        print("start from scratch")
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
      case Some(offset) =>
        print("start with the offset")
        val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
        KafkaUtils.createDirectStream[String, String, StringDecoder,
          StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
    }

    // word count
    kafkaDStream.map(_._2) // 1是分区号,2是具体kafka中数
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .foreachRDD(print(_)) // 输出结果
    
    // 保存偏移量到zk中 , 也可自定义到其他存储介质
    kafkaDStream.foreachRDD(rdd =>
      saveOffset(zkClient, zkHosts, zkPath, rdd)
    )

    ssc.start()
    ssc.awaitTermination()

  }

  // 保存 offset
  def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
    logger.info("save offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsRanges.foreach(offsetRange => logger.debug(s"  Using $offsetRange"))
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
      .mkString(",")
    logger.info("writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
    updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
    logger.info("done updating offsets in zookeeper. took " + stopwatch)
  }

  // 读取 offset
  def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {

    val stopwatch = new Stopwatch()

    val stat = new Stat()
    val dataAndStat: (Option[String], Stat) = try {
      (Some(zkClient.readData(zkPath, stat)), stat)
    } catch {
      case _ => (None, stat)
      case e2: Throwable => throw e2
    }

    // 获取offset
    dataAndStat._1 match {
      case Some(offsetsRangeStr) =>
        logger.info(s" Read offset ranges: $offsetsRangeStr")
        val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
          .map(str => str.split(":"))
          .map {
            case Array(partitions, offset) =>
              TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
          }.toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offset)
      case None =>
        logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  // 更新 zk中的 offset
  def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
    try {
      zkClient.writeData(zkPath, offsetsRangesStr)
    } catch {
      // 如果失败了 ==> 没有此目录,则创建目录
      case _: ZkNoNodeException =>
        createParentPath(zkClient, zkPath)
        try {
          // 创建一个持久的节点 ==> 即 目录
          // 在offset写入到 该节点
          zkClient.createPersistent(zkPath, offsetsRangesStr)
        } catch {
          case _: ZkNodeExistsException =>
            zkClient.writeData(zkPath, offsetsRangesStr)
          case e2: Throwable => throw e2
        }
      case e2: Throwable => throw e2
    }
  }

  // 如果path不存在,则创建
  def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
    val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
    if (parentDir.length != 0)
      zkClient.createPersistent(parentDir, true)
  }

  // 过程时间
  class Stopwatch {
    private val start = System.currentTimeMillis()

    override def toString: String = (System.currentTimeMillis() - start) + " ms"
  }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容