spark streaming offset管理方式

kafka自动管理

enable.auto.commit=true
为了更好地理解这一章节中提到的内容,我们先来做一些铺垫。如果是使用spark-streaming-kafka-0-10,那么我们建议将enable.auto.commit设为false。这个配置只是在这个版本生效,enable.auto.commit如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中

enable.auto.commit
    Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
默认值是true。
auto.commit.interval.ms
    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

checkpoints

对Kafka Stream 执行checkpoint操作使得offset保存在checkpoint中,如果是应用挂掉的话,那么SparkStreamig应用功能可以从保存的offset中开始读取消息。但是,如果是对Spark Streaming应用进行升级的话,那么很抱歉,不能checkpoint的数据没法使用,所以这种机制并不可靠,特别是在严格的生产环境中,我们不推荐这种方式。

自动提交到kafka

新版消费者不再保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“__consumer_offsets”,该主题默认有50个分区,每个分区3个副本,分区数量有参数offset.topic.num.partition设置。通过消费者组ID的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存到__consumer_offsets主题的哪个分区中

def createKafkaRDD(ssc: StreamingContext, config: Source) = {
    var SparkDStream: InputDStream[ConsumerRecord[String, String]] = null
    try {
      SparkDStream = {
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> config.servers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> config.group,
          "auto.offset.reset" -> config.offset
        )
/*
          "enable.auto.commit" -> config.getString("kafkaSource.enable.auto.commit"))*/
        // val subscribeTopics = config.getStringList("kafkaSource.topics").toIterable
        import scala.collection.JavaConversions._
        val kafkaStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](config.topic.toList, kafkaParams)
        )
        kafkaStream
      }
    } catch {
      case e: Throwable => {
        throw new Exception("Couldn't init Spark stream processing", e)
      }
    }
    SparkDStream
  }

var inputDStream: InputDStream[ConsumerRecord[String, String]] = createKafkaRDD()
inputDStream.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            // 更新 Offset 值
            inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }

存储到redis

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._

import scala.collection.JavaConverters._
import scala.util.Try

/**
  * Created by chouyarn of BI on 2018/8/21
  */
object KafkaUtilsRedis {
  /**
    * 根据groupId保存offset
    * @param ranges
    * @param groupId
    */
  def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
    for (o <- ranges) {
      val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}"
      val value = o.untilOffset
      JedisUtil.set(key, value.toString)
    }
  }

  /**
    * 根据topic,groupid获取offset
    * @param topics
    * @param groupId
    * @return
    */
  def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = {
    val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]()

    topics.foreach(topic => {
      val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*")
      if (!keys.isEmpty) {
        keys.asScala.foreach(key => {
          val offset = JedisUtil.get(key)
          val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
          fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong)
        })
      }
    })
    if (fromOffSets.isEmpty) {
      (fromOffSets.toMap, 0)
    } else {
      (fromOffSets.toMap, 1)
    }
  }

  /**
    * 创建InputDStream,如果auto.offset.reset为latest则从redis读取
    * @param ssc
    * @param topic
    * @param kafkaParams
    * @return
    */
  def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String],
                                  kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = {
    var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
    val groupId = kafkaParams.get("group.id").get
    val (fromOffSet, flag) = getOffset(topic, groupId.toString)
    val offsetReset = kafkaParams.get("auto.offset.reset").get
    if (flag == 1 && offsetReset.equals("latest")) {
      kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet))
    } else {
      kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams))
    }
    kafkaStreams
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(60))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "group.id" -> "binlog.test.rpt_test_1min",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "20000",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )
    val topic = Array("binlog.test.rpt_test", "binlog.test.hbase_test", "binlog.test.offset_test")
    val groupId = "binlog.test.rpt_test_1min"
    val lines = createStreamingContextRedis(ssc, topic, kafkaParams)
    lines.foreachRDD(rdds => {
      if (!rdds.isEmpty()) {
        println("##################:" + rdds.count())
      }
      storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
import java.util

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool, JedisPoolConfig}

object JedisUtil {
  private val config = ConfigFactory.load("realtime-etl.conf")

  private val redisHosts: String = config.getString("redis.server")
  private val port: Int = config.getInt("redis.port")

  private val hostAndPortsSet: java.util.Set[HostAndPort] = new util.HashSet[HostAndPort]()
  redisHosts.split(",").foreach(host => {
    hostAndPortsSet.add(new HostAndPort(host, port))
  })


  private val jedisConf: JedisPoolConfig = new JedisPoolConfig()
  jedisConf.setMaxTotal(5000)
  jedisConf.setMaxWaitMillis(50000)
  jedisConf.setMaxIdle(300)
  jedisConf.setTestOnBorrow(true)
  jedisConf.setTestOnReturn(true)
  jedisConf.setTestWhileIdle(true)
  jedisConf.setMinEvictableIdleTimeMillis(60000l)
  jedisConf.setTimeBetweenEvictionRunsMillis(3000l)
  jedisConf.setNumTestsPerEvictionRun(-1)

  lazy val redis = new JedisCluster(hostAndPortsSet, jedisConf)

  def get(key: String): String = {
    try {
      redis.get(key)
    } catch {
      case e: Exception => e.printStackTrace()
        null
    }
  }

  def set(key: String, value: String) = {
    try {
      redis.set(key, value)
    } catch {
      case e: Exception => {
        e.printStackTrace()
      }
    }
  }


  def hmset(key: String, map: java.util.Map[String, String]): Unit = {
    //    val redis=pool.getResource
    try {
      redis.hmset(key, map)
    }catch {
      case e:Exception => e.printStackTrace()
    }
  }

  def hset(key: String, field: String, value: String): Unit = {
    //    val redis=pool.getResource
    try {
      redis.hset(key, field, value)
    } catch {
      case e: Exception => {
        e.printStackTrace()
      }
    }
  }

  def hget(key: String, field: String): String = {
    try {
      redis.hget(key, field)
    }catch {
      case e:Exception => e.printStackTrace()
        null
    }
  }

  def hgetAll(key: String): java.util.Map[String, String] = {
    try {
      redis.hgetAll(key)
    } catch {
      case e: Exception => e.printStackTrace()
        null
    }
  }
}

存储ZK

在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。
注意: Kafka offset在ZooKeeper中的存储路径为/consumers/[groupId]/offsets/topic/[partitionId], 存储的值为offset
此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。

val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)

val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)

Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.

def readOffsets(topics: Seq[String], groupId:String):
 Map[TopicPartition, Long] = {
 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
 val partitionMap = zkUtils.getPartitionsForTopics(topics)
 // /consumers/<groupId>/offsets/<topic>/
 partitionMap.foreach(topicPartitions => {
   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
   topicPartitions._2.foreach(partition => {
     val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
     try {
       val offsetStatTuple = zkUtils.readData(offsetPath)
       if (offsetStatTuple != null) {
         LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
           offsetStatTuple._1.toLong)
       }
     } catch {
       case e: Exception =>
         LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
     }
   })
 })
 topicPartOffsetMap.toMap
}

def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
 offsets.foreach(or => {
   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
   val acls = new ListBuffer[ACL]()
   val acl = new ACL
   acl.setId(ANYONE_ID_UNSAFE)
   acl.setPerms(PERMISSIONS_ALL)
   acls += acl
   val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
   val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
 zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
     + or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
   LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
 })
}

使用zk获取的offset来初始化direct dstream

val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352