kafka-kafka+sparkstreaming不丢数据的方案

一、从kafka读数据保证不丢失的方案

 0.8版本
lines.foreachRDD( rdd =>{
      //业务逻辑代码处理
     // rdd.map(_._2).flatMap(_.split(","))
     rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map( o =>{
       println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
       //更新偏移量就可以了。
     })
    })

lines对象里面有偏移量,但是凡事代码对这个对象做了任何算子的操作(比如map),偏移量就丢失了.所以调用了createDirectStream之后,只要在多调用一个其他的算子,偏移量就丢失了,只能调用foreachRDD,然后进行业务逻辑处理.
但是有一个小小的遗憾,就是KafkaUtils.createDirectStream返回的其实是一个DStream,但是调用了foreachRDD之后变成了RDD把streaming的处理,变成了spark core 去处理了..虽然问题不大,但是streaming特有的算子就不能用了,比如 window、updatestatebykey也就是说,他不能适用100%的场景

 那么怎么写可以100%适用呢?

整体思想:如果我们能在读取kafka的时候,传递进去offset的信息,并且保证这个offset的更新跟数据处理成功与否是原子性的,就能保证数据不丢失

image.png

1、扩展createDirectStream方法,新建KafkaManager
2、设置一个监听器,帮我们完成偏移量的提交(因为0.8默认offset保存在zk,所以我们需要调用kafka往zk写offset的方法,重点是什么时候写)
3、在读取kafka的数据之前,先把偏移量更新到zk(setOrUpdateOffsets),然后在读取偏移量(kc.getConsumerOffsets)
注意0.8版本是保存在zk的,所以我们只需要把offset写入zk,然后调用kafka的方法去zk读取就好了
4、保证提交偏移量和数据处理逻辑的原子性

NOTE:最后这个方案,在架构设计上有一个缺点:
如果企业中spark streaming太多(上千个-->几十个其实是没问题的),会频繁的往zk写数据,这对zk来说是一个高并发的场景,而zk是不适合高并发的,所以用redis代替是可以的

分别解释一下:
新建KafkaManager类,新建一个createDirectStream方法和setOrUpdateOffsets方法

class KakfaManager {
  def createDirectStream(){
  // 在zookeeper上读取offsets前先根据实际情况更新offsets
  setOrUpdateOffsets()
  ...
  //获取offset
  val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
  //还是调用的sparkStermaing的API
  KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
}

setOrUpdateOffsets(){
...
//根据实际消费情况更新消费offsets
kc.setConsumerOffsets(groupId, offsets)
...
}

监听器(我们只需要把offset放入zk就可以了)

class MyListener implements StreamingListener(){
  @Override
  //batchCompleted对象里面是有任务的offset信息
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
    //提交偏移量
    //这个方法是 0.8的依赖提供的API
    //这样话,就自动帮我把offset存到了ZK
   kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
    /**
     * 批次完成时调用的方法
     * @param batchCompleted
     *
     * batchCompleted 对象里面带有了偏移量对信息,所以我提交偏移量对时候
     * 就是从这个对象里面读取offset就可以了。
     *
     * 注意:任务运行完了,但是有可能是成功的,也有可能是失败的
     */
    @Override
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {//这个对象里面是有任务的offset信息
      
        //任务运行完了以后会(任务成功,)提交偏移量
        //
        //如果本批次里面有任务失败了,那么就终止偏移量提交
        scala.collection.immutable.Map<Object, OutputOperationInfo> opsMap = batchCompleted.batchInfo().outputOperationInfos();
//
        Map<Object, OutputOperationInfo> javaOpsMap = JavaConversions.mapAsJavaMap(opsMap);
        for (Map.Entry<Object, OutputOperationInfo> entry : javaOpsMap.entrySet()) {
            //failureReason不等于None(是scala中的None),说明有异常,不保存offset
            //OutputOperationInfo task
            //task -> failureReason
            if (!"None".equalsIgnoreCase(entry.getValue().failureReason().toString())) {
                return;
            }
        }
      //  long batchTime = batchCompleted.batchInfo().batchTime().milliseconds();
        /**
         * topic,分区,偏移量
         */
        Map<String, Map<Integer, Long>> offset = getOffset(batchCompleted);

        for (Map.Entry<String, Map<Integer, Long>> entry : offset.entrySet()) {
            String topic = entry.getKey();
            Map<Integer, Long> paritionToOffset = entry.getValue();
            //我只需要在这里把offset放入到zookeeper就可以了。
            for(Map.Entry<Integer,Long> p2o : paritionToOffset.entrySet()){
                Map<TopicAndPartition, Object> map = new HashMap<TopicAndPartition, Object>();

                TopicAndPartition topicAndPartition =
                        new TopicAndPartition(topic,p2o.getKey());

                map.put(topicAndPartition,p2o.getValue());
                scala.collection.immutable.Map<TopicAndPartition, Object>
                        topicAndPartitionObjectMap = TypeHelper.toScalaImmutableMap(map);
                //提交偏移量

                //这个方法是 0.8的依赖提供的API
                //这样话,就自动帮我把offset存到了ZK
                kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
            }

        }
    }

private Map<String, Map<Integer, Long>> getOffset(StreamingListenerBatchCompleted batchCompleted) {
        Map<String, Map<Integer, Long>> map = new HashMap<>();

        scala.collection.immutable.Map<Object, StreamInputInfo> inputInfoMap =
                batchCompleted.batchInfo().streamIdToInputInfo();

        Map<Object, StreamInputInfo> infos = JavaConversions.mapAsJavaMap(inputInfoMap);

        infos.forEach((k, v) -> {
            Option<Object> optOffsets = v.metadata().get("offsets");
            if (!optOffsets.isEmpty()) {
                Object objOffsets = optOffsets.get();
                if (List.class.isAssignableFrom(objOffsets.getClass())) {
                    List<OffsetRange> scalaRanges = (List<OffsetRange>) objOffsets;

                    Iterable<OffsetRange> ranges = JavaConversions.asJavaIterable(scalaRanges);
                    for (OffsetRange range : ranges) {
                        if (!map.containsKey(range.topic())) {
                            map.put(range.topic(), new HashMap<>());
                        }
                        map.get(range.topic()).put(range.partition(), range.untilOffset());
                    }
                }
            }
        });

        return map;
    }
 }

0.10版本

subscribe方式

1、创建一个监听器,继承StreamingListener,在onBatchCompleted方法中去提交offset

//获取数据源(主题里面读取offset)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    //设置监听器
    ssc.addStreamingListener(new MyListener(stream))
    val result = stream.map(_.value()).flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
    result.foreachRDD( rdd =>{
      /**
       * 其它的操作
       */
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
if(offsetRanges != null){
     提交偏移量(Kafka)
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
    }

因为0.10版本的kafka的offset默认已经不下zk,而是写kafka了(_consumer_offset)

如何保证数据exactly-once语义

我们程序是通过kafka的offset去读取数据,我们一般是处理数据,然后写offset,
正常情况是没问题的,但是如果处理完了,但是offset没写成功,或者处理失败,offset写成功了
就出现了,重复消费,或者丢数据的情况;
所以原则上就是保证事务或者保证原子性,就是处理写库操作(保存mysql、HBase)和写offset操作要么一起成功,要么一起失败就行了
还有就是设计成幂等操作,重复了也无所谓

import scalikejdbc.{ConnectionPool, DB, _}
//第一步:在Driver端创建数据库连接池
ConnectionPool.singleton("jdbc:mysql://**", "", "")
//第二步:从数据库读取offset
//1)初次启动或重启时,从指定的Partition、Offset构建TopicPartition
//2)运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中
//3)后期Kafka Topic分区动态扩展,在运行过程中不能自动感知
val initOffset=DB.readOnly(implicit session=>{
      sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
        .map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
        .list().apply().toMap
    })
//第三步:CreateDirectStream
//从指定的Topic、Partition、Offset开始消费
    val sourceDStream =KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
    )
sourceDStream.foreachRDD(rdd =>{
//业务处理
//在Driver端存储数据、提交Offset
//结果存储与Offset提交在同一事务中原子执行
//这里将偏移量保存在Mysql中
//事务
DB.localTx(
//储存结果到mysql
//储存offset到mysql
)
})

参考文档:奈学教育

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容