Kafka删除日志源码分析

问题描述

1.问题定位

最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要采集一个月的。然后发现数据还没来得及消费就被删除了,

2.问题处理

查看kafka配置后,发现两个关于删除日志策略的参数都设置了。
log.retention.hourslog.retention.bytes
1. 查看kafka0.9.0.0源码,LogManager类

   def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      debug("Garbage collecting '" + log.name + "'")
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
    debug("Log cleanup completed. " + total + " files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }

2. 查看total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)两个方法,删除逻辑是或
首先查看cleanupExpiredSegments(log) 方法,

 private def cleanupExpiredSegments(log: Log): Int = {
    if (log.config.retentionMs < 0)
      return 0
    val startMs = time.milliseconds
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }```
 startMs 这个参数的源码查看在这里
```scala
private def getLogRetentionTimeMillis: Long = {
    val millisInMinute = 60L * 1000L
    val millisInHour = 60L * millisInMinute

    val millis: java.lang.Long =
      Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
        Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
          case Some(mins) => millisInMinute * mins
          case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
        })

    if (millis < 0) return -1
    millis   //总之就是 log.retention.hours log.retention.minutes log.retention.ms配置文件这三个来决定的,把他们转为毫秒
  }

所以cleanupExpiredSegments(log) 方法会根据日志超时时间来删除

查看cleanupSegmentsToMaintainSize(log)方法

private def cleanupSegmentsToMaintainSize(log: Log): Int = {
    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
      return 0
    var diff = log.size - log.config.retentionSize
    def shouldDelete(segment: LogSegment) = {
      if(diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }
    log.deleteOldSegments(shouldDelete)
  }

根据日志大小来决定,这个参数是log.retention.bytes
总结:kafka删除日志的逻辑是通过时间和日志大小来删除的,配置参数log.retention.hourslog.retention.bytes,如果只要按时间删除,则不设置log.retention.bytes,默认是-1,今天的问题就在于两个参数都设置了,所以出现日志过大就被删除了。

3.最后

Kafka日志是多久删除一次的呢?
LogManager类中的startup

def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }```
寻找` retentionCheckMs,`是由这个参数决定的,具体过程不详细秒速,`log.retention.check.interval.ms`,默认是300秒,就是5分钟执行清理逻辑一次。



©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,323评论 19 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 10,805评论 0 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 10,719评论 1 15
  • 在软件项目的生命周期中,开发只占开始的一小部分,大部分时间我们要对项目进行运行维护,Kafka相关的项目也不例外。...
    柴诗雨阅读 12,516评论 0 7
  • ** 今天看了一下kafka官网,尝试着在自己电脑上安装和配置,然后学一下官方document。** Introd...
    RainChang阅读 10,436评论 1 30