[SPARK-19680] Offsets out of range with no configured reset policy for partitions

在我司的风电大数据项目中,出现了一个报错

比如
Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {winSink-2=161803385}

我司为了实现Exactly Once的语义,采取了自行保管offset的方式。即Spark App提交后,从上一次任务结束的位置开始继续读取消息。但是这样做会遇到问题,即上述的OffsetOutOfRangeException,通常是因为Kafka的retention expiration造成的。

在Kafka的配置中,需要关注这样一条

public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";

当你直接通过比如Kafka的Client访问时,即使你指定了一个不存在offset,即大于上边界或小于下边界,Kafka也将会根据这一条配置reset你的offset值,比如earliestlatest

但是当你在Spark Streaming中指定了一个OutOfRange的初始offset时,Spark不会理会你的auto.offset.reset,而是会出现文章开头的报错Offsets out of range with no configured reset policy for partitions

关于这一点的讨论可以参见SPARK-19680。这里摘录部分内容

The issue here is likely that you have lost data (because of retention expiration) between the time the batch was defined on the driver, and the time the executor attempted to process the batch. Having executor consumers obey auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937

If you've got really aggressive retention settings and are having trouble getting a stream started, look at specifying earliest + some margin on startup as a workaround. If you're having this trouble after a stream has been running for a while, you need more retention or smaller batches.

If you got an OffsetOutOfRangeException after a job had been running
for 4 days, it likely means that your job was not running fast enough
to keep up, and retention expired records that had not yet been
processed.

因而,对于上述这种情况,为了避免这样的问题发生,需要在程序初始化时,校验当前Kafka中的offset边界情况。如果当前存储的值低于最小值,应该调整为最小值。如何检验?可以参考我的另一篇博客:Fetch Offset range in Kafka

当然,这种丢失数据的情况通常是不应该出现的,应记录或避免这个情况。

  • 关于offset的管理,可以参见your-own-data-store
  • 关于Flume、Kafka、Spark、TSDB,欢迎指教与交流
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,302评论 19 139
  • PLEASE READ THE FOLLOWING APPLE DEVELOPER PROGRAM LICENSE...
    念念不忘的阅读 14,607评论 5 6
  • Spark SQL, DataFrames and Datasets Guide Overview SQL Dat...
    草里有只羊阅读 18,500评论 0 85
  • 当我从一路上一边慢腾腾挪腾,一边放着黑乎乎的屁的手扶拖拉机上跳下来的时候,我觉得我站在了过去三十年来我站过的最高的...
    张毛盛阅读 3,273评论 2 1
  • 忘记过去 有过多少个曾经 曾经同数漫天的星星 有过多少个曾经 曾经分手时的泪光晶莹 曾经流逝的岁月 一起走过的日子...
    MrME_Lee阅读 1,621评论 0 0

友情链接更多精彩内容