Spark 连接kafka两种方式及区别(direct和receiver)

Kafka Direct 跟Receiver方式的区别

Receiver

Receiver

Receiver是使用Kafka的 High-Level Consumer API来实现的。Receiver从Kafka中获取的数据都存储在Spark Executor的内存中的(如果数据暴增,数据大量堆积,容易出现oom的问题),Spark Streaming启动的job会去处理那些数据。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据,如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL),该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS,S3)上的预写日志中,所以当底层节点出现了失败,可以通过WAL中的数据进行恢复,但是效率会下降。

使用时注意事项:

1.操作简单,代码量少,不需要手动管理offset,需要开启wal机制,可以保证数据不丢失,但效率会减低,并且为了保证数据不丢失,将一份数据存两份,浪费资源
2.无法保证数据只被处理一次,在写入外部存储的数据还未将offset更新到zk就挂掉,这些数据会被重复消费
3.kafka的topic的分区和spark streaming生成的rdd分区不相关,增加topic的分区数,只会增加reciver读取分区数据的线程数,并不会提高spark的处理数据的并行度

Direct

Direct

Direct 使用Kafka的Low-Level Consumer api读取kafka数据,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的Low-Level Consumer api来获取Kafka指定offset范围的数据。

使用时注意事项:

1.当读取topic的数据时候,会自动对应topic的分区生成对应的RDD分区并行从Kafka中读取数据,在Kafka partition和RDD partition之间,有一对一的映射关系。
2.不需要开启WAL机制,只要Kafka中作了数据的备份,那么就可以使用通过Kafka的副本进行恢复。
3.Spark内部一定时同步的,所以可以自己跟踪offset并保存到checkpoint中,可以保证数据不会被重复消费
4.操作复杂,代码量大,并且需要自己对offset监控维护,增加用户开发成本

Receiver配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,SparkStreaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

区别

Receiver Direct
需要开启WAL 不需要开启WAL
使用高层次 api 使用简单api
zk自动维护 手动维护offset
无法保证数据被处理一次 数据只被处理一次
代码简单,量少 代码复杂,量大
topic分区与rdd分区不是一对一的关系 topic分区与rdd分区是一对一的关系
由receiver拉取kafka数据 由rdd分区拉取对应分区的数据(kafka与rdd分区相等的情况)
.. ..


连接kafka的两种方式 (receiver&direct) 栗子

Maven依赖

 <properties>
 <scala.version>2.11.8</scala.version>
 <spark.version>2.1.3</spark.version>
 <scala.binary.version>2.11</scala.binary.version>
 </properties>
 <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <!-- spark-streaming kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
 </dependencies>

Receiver

package xzw.shuai.kafka.demo

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaReceiver {

  private val topics = "receiver-test"
  private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
  private val numThreads = 1

  def main(args: Array[String]): Unit = {
    //当应用程序停止的时候,会将当前批次的数据处理完成后在停止
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //1000*分区数*采样时间=拉取数据量
    System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
      //设置监控级别
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(HDFS_PATH)
    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
      "group.id" -> "receiver"
    )
    
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)

    // word count
    kafkaDStream
      .map(_._2) // 1是分区号,2是具体kafka中数
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print(10) // 输出结果

    ssc.start()
    ssc.awaitTermination()
  }
}

Direct

package xzw.shuai.kafka.demo

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat

object SparkKafkaDirect {
  private val zkHosts = "node01:2181,node02:2181,node03:2181"
  private val logger = Logger.getLogger("SparkKafkaDirect")
  private val zkPath = "/kafka-direct-test"
  private val topic = Set("direct-test")
  private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val ssc = new StreamingContext(sc, Seconds(5))    

    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "group.id" -> "direct"
    )

    val zkClient: ZkClient = new ZkClient(zkHosts)

    // 读取 offset
    val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)

    // 获取到kafka数据
    val kafkaDStream: InputDStream[(String, String)] = offsets match {
      // 使用 direct方式消费kafka数据
      case None =>
        print("start from scratch")
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
      case Some(offset) =>
        print("start with the offset")
        val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
        KafkaUtils.createDirectStream[String, String, StringDecoder,
          StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
    }

    // word count
    kafkaDStream.map(_._2) // 1是分区号,2是具体kafka中数
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .foreachRDD(print(_)) // 输出结果
    
    // 保存偏移量到zk中 , 也可自定义到其他存储介质
    kafkaDStream.foreachRDD(rdd =>
      saveOffset(zkClient, zkHosts, zkPath, rdd)
    )

    ssc.start()
    ssc.awaitTermination()

  }

  // 保存 offset
  def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
    logger.info("save offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsRanges.foreach(offsetRange => logger.debug(s"  Using $offsetRange"))
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
      .mkString(",")
    logger.info("writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
    updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
    logger.info("done updating offsets in zookeeper. took " + stopwatch)
  }

  // 读取 offset
  def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {

    val stopwatch = new Stopwatch()

    val stat = new Stat()
    val dataAndStat: (Option[String], Stat) = try {
      (Some(zkClient.readData(zkPath, stat)), stat)
    } catch {
      case _ => (None, stat)
      case e2: Throwable => throw e2
    }

    // 获取offset
    dataAndStat._1 match {
      case Some(offsetsRangeStr) =>
        logger.info(s" Read offset ranges: $offsetsRangeStr")
        val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
          .map(str => str.split(":"))
          .map {
            case Array(partitions, offset) =>
              TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
          }.toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offset)
      case None =>
        logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  // 更新 zk中的 offset
  def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
    try {
      zkClient.writeData(zkPath, offsetsRangesStr)
    } catch {
      // 如果失败了 ==> 没有此目录,则创建目录
      case _: ZkNoNodeException =>
        createParentPath(zkClient, zkPath)
        try {
          // 创建一个持久的节点 ==> 即 目录
          // 在offset写入到 该节点
          zkClient.createPersistent(zkPath, offsetsRangesStr)
        } catch {
          case _: ZkNodeExistsException =>
            zkClient.writeData(zkPath, offsetsRangesStr)
          case e2: Throwable => throw e2
        }
      case e2: Throwable => throw e2
    }
  }

  // 如果path不存在,则创建
  def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
    val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
    if (parentDir.length != 0)
      zkClient.createPersistent(parentDir, true)
  }

  // 过程时间
  class Stopwatch {
    private val start = System.currentTimeMillis()

    override def toString: String = (System.currentTimeMillis() - start) + " ms"
  }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容