一、从kafka读数据保证不丢失的方案
0.8版本
lines.foreachRDD( rdd =>{
//业务逻辑代码处理
// rdd.map(_._2).flatMap(_.split(","))
rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map( o =>{
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//更新偏移量就可以了。
})
})
lines对象里面有偏移量,但是凡事代码对这个对象做了任何算子的操作(比如map),偏移量就丢失了.所以调用了createDirectStream之后,只要在多调用一个其他的算子,偏移量就丢失了,只能调用foreachRDD,然后进行业务逻辑处理.
但是有一个小小的遗憾,就是KafkaUtils.createDirectStream返回的其实是一个DStream,但是调用了foreachRDD之后变成了RDD把streaming的处理,变成了spark core 去处理了..虽然问题不大,但是streaming特有的算子就不能用了,比如 window、updatestatebykey也就是说,他不能适用100%的场景
那么怎么写可以100%适用呢?
整体思想:如果我们能在读取kafka的时候,传递进去offset的信息,并且保证这个offset的更新跟数据处理成功与否是原子性的,就能保证数据不丢失
1、扩展createDirectStream方法,新建KafkaManager
2、设置一个监听器,帮我们完成偏移量的提交(因为0.8默认offset保存在zk,所以我们需要调用kafka往zk写offset的方法,重点是什么时候写)
3、在读取kafka的数据之前,先把偏移量更新到zk(setOrUpdateOffsets),然后在读取偏移量(kc.getConsumerOffsets)
注意0.8版本是保存在zk的,所以我们只需要把offset写入zk,然后调用kafka的方法去zk读取就好了
4、保证提交偏移量和数据处理逻辑的原子性
NOTE:最后这个方案,在架构设计上有一个缺点:
如果企业中spark streaming太多(上千个-->几十个其实是没问题的),会频繁的往zk写数据,这对zk来说是一个高并发的场景,而zk是不适合高并发的,所以用redis代替是可以的
分别解释一下:
新建KafkaManager类,新建一个createDirectStream方法和setOrUpdateOffsets方法
class KakfaManager {
def createDirectStream(){
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets()
...
//获取offset
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
//还是调用的sparkStermaing的API
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
}
setOrUpdateOffsets(){
...
//根据实际消费情况更新消费offsets
kc.setConsumerOffsets(groupId, offsets)
...
}
监听器(我们只需要把offset放入zk就可以了)
class MyListener implements StreamingListener(){
@Override
//batchCompleted对象里面是有任务的offset信息
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
//提交偏移量
//这个方法是 0.8的依赖提供的API
//这样话,就自动帮我把offset存到了ZK
kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
/**
* 批次完成时调用的方法
* @param batchCompleted
*
* batchCompleted 对象里面带有了偏移量对信息,所以我提交偏移量对时候
* 就是从这个对象里面读取offset就可以了。
*
* 注意:任务运行完了,但是有可能是成功的,也有可能是失败的
*/
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {//这个对象里面是有任务的offset信息
//任务运行完了以后会(任务成功,)提交偏移量
//
//如果本批次里面有任务失败了,那么就终止偏移量提交
scala.collection.immutable.Map<Object, OutputOperationInfo> opsMap = batchCompleted.batchInfo().outputOperationInfos();
//
Map<Object, OutputOperationInfo> javaOpsMap = JavaConversions.mapAsJavaMap(opsMap);
for (Map.Entry<Object, OutputOperationInfo> entry : javaOpsMap.entrySet()) {
//failureReason不等于None(是scala中的None),说明有异常,不保存offset
//OutputOperationInfo task
//task -> failureReason
if (!"None".equalsIgnoreCase(entry.getValue().failureReason().toString())) {
return;
}
}
// long batchTime = batchCompleted.batchInfo().batchTime().milliseconds();
/**
* topic,分区,偏移量
*/
Map<String, Map<Integer, Long>> offset = getOffset(batchCompleted);
for (Map.Entry<String, Map<Integer, Long>> entry : offset.entrySet()) {
String topic = entry.getKey();
Map<Integer, Long> paritionToOffset = entry.getValue();
//我只需要在这里把offset放入到zookeeper就可以了。
for(Map.Entry<Integer,Long> p2o : paritionToOffset.entrySet()){
Map<TopicAndPartition, Object> map = new HashMap<TopicAndPartition, Object>();
TopicAndPartition topicAndPartition =
new TopicAndPartition(topic,p2o.getKey());
map.put(topicAndPartition,p2o.getValue());
scala.collection.immutable.Map<TopicAndPartition, Object>
topicAndPartitionObjectMap = TypeHelper.toScalaImmutableMap(map);
//提交偏移量
//这个方法是 0.8的依赖提供的API
//这样话,就自动帮我把offset存到了ZK
kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
}
}
private Map<String, Map<Integer, Long>> getOffset(StreamingListenerBatchCompleted batchCompleted) {
Map<String, Map<Integer, Long>> map = new HashMap<>();
scala.collection.immutable.Map<Object, StreamInputInfo> inputInfoMap =
batchCompleted.batchInfo().streamIdToInputInfo();
Map<Object, StreamInputInfo> infos = JavaConversions.mapAsJavaMap(inputInfoMap);
infos.forEach((k, v) -> {
Option<Object> optOffsets = v.metadata().get("offsets");
if (!optOffsets.isEmpty()) {
Object objOffsets = optOffsets.get();
if (List.class.isAssignableFrom(objOffsets.getClass())) {
List<OffsetRange> scalaRanges = (List<OffsetRange>) objOffsets;
Iterable<OffsetRange> ranges = JavaConversions.asJavaIterable(scalaRanges);
for (OffsetRange range : ranges) {
if (!map.containsKey(range.topic())) {
map.put(range.topic(), new HashMap<>());
}
map.get(range.topic()).put(range.partition(), range.untilOffset());
}
}
}
});
return map;
}
}
0.10版本
subscribe方式
1、创建一个监听器,继承StreamingListener,在onBatchCompleted方法中去提交offset
//获取数据源(主题里面读取offset)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//设置监听器
ssc.addStreamingListener(new MyListener(stream))
val result = stream.map(_.value()).flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
result.foreachRDD( rdd =>{
/**
* 其它的操作
*/
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
if(offsetRanges != null){
提交偏移量(Kafka)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
}
因为0.10版本的kafka的offset默认已经不下zk,而是写kafka了(_consumer_offset)
如何保证数据exactly-once语义
我们程序是通过kafka的offset去读取数据,我们一般是处理数据,然后写offset,
正常情况是没问题的,但是如果处理完了,但是offset没写成功,或者处理失败,offset写成功了
就出现了,重复消费,或者丢数据的情况;
所以原则上就是保证事务或者保证原子性,就是处理写库操作(保存mysql、HBase)和写offset操作要么一起成功,要么一起失败就行了
还有就是设计成幂等操作,重复了也无所谓
import scalikejdbc.{ConnectionPool, DB, _}
//第一步:在Driver端创建数据库连接池
ConnectionPool.singleton("jdbc:mysql://**", "", "")
//第二步:从数据库读取offset
//1)初次启动或重启时,从指定的Partition、Offset构建TopicPartition
//2)运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中
//3)后期Kafka Topic分区动态扩展,在运行过程中不能自动感知
val initOffset=DB.readOnly(implicit session=>{
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
.map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
//第三步:CreateDirectStream
//从指定的Topic、Partition、Offset开始消费
val sourceDStream =KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
)
sourceDStream.foreachRDD(rdd =>{
//业务处理
//在Driver端存储数据、提交Offset
//结果存储与Offset提交在同一事务中原子执行
//这里将偏移量保存在Mysql中
//事务
DB.localTx(
//储存结果到mysql
//储存offset到mysql
)
})
参考文档:奈学教育