问题描述
1.问题定位
最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要采集一个月的。然后发现数据还没来得及消费就被删除了,
2.问题处理
查看kafka配置后,发现两个关于删除日志策略的参数都设置了。
log.retention.hours
和log.retention.bytes
1. 查看kafka0.9.0.0源码,LogManager类
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
2. 查看total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
两个方法,删除逻辑是或
首先查看cleanupExpiredSegments(log)
方法,
private def cleanupExpiredSegments(log: Log): Int = {
if (log.config.retentionMs < 0)
return 0
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}```
startMs 这个参数的源码查看在这里
```scala
private def getLogRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
val millis: java.lang.Long =
Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
case Some(mins) => millisInMinute * mins
case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
})
if (millis < 0) return -1
millis //总之就是 log.retention.hours log.retention.minutes log.retention.ms配置文件这三个来决定的,把他们转为毫秒
}
所以cleanupExpiredSegments(log)
方法会根据日志超时时间来删除
查看cleanupSegmentsToMaintainSize(log)
方法
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
log.deleteOldSegments(shouldDelete)
}
根据日志大小来决定,这个参数是log.retention.bytes
总结:kafka删除日志的逻辑是通过时间和日志大小来删除的,配置参数log.retention.hours
和log.retention.bytes
,如果只要按时间删除,则不设置log.retention.bytes
,默认是-1,今天的问题就在于两个参数都设置了,所以出现日志过大就被删除了。
3.最后
Kafka日志是多久删除一次的呢?
LogManager
类中的startup
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}```
寻找` retentionCheckMs,`是由这个参数决定的,具体过程不详细秒速,`log.retention.check.interval.ms`,默认是300秒,就是5分钟执行清理逻辑一次。