Kafka基础运维

在软件项目的生命周期中,开发只占开始的一小部分,大部分时间我们要对项目进行运行维护,Kafka相关的项目也不例外。所以这里介绍了一些Kafka服务器基础的运维方法。

关于Kafka的日志

日志的英语是“log”,但Kafka的数据文件也被称为log,所以很多时候会造成一定的歧义。在Kafka中,日志分为两种:

  • 数据日志
  • 操作日志

数据日志是指Kafka的topic中存储的数据,这种日志的路径是在$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir(请仔细观察,两个配置项最后差了一个s)。log.dir的默认路径为/tmp/kafka-logs,大家知道,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,强烈推荐将文件目录设置在其他可以永久保存的路径。另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOME路径下启动Kafka服务,那么操作日志的路径为KAFKA_HOME/logs。

数据日志清理

数据日志有两种类型的清理方式,一种是按照日志被发布的时间来删除,另一种是按照日志文件的size来删除。有专门的配置项可以配置这个删除策略:

按时间删除:

Kafka提供了配置项让我们可以按照日志被发布的时间来删除。它们分别是:

  • log.retention.ms
  • log.retention.minutes
  • log.retention.hours

根据配置项的名称很容易理解它们的含义。log.retention.ms表示日志会被保留多少毫秒,如果为null,则Kafka会使用使用log.retention.minutes配置项。log.retention.minutes表示日志会保留多少分钟,如果为null,则Kafka会使用log.retention.hours选项。默认情况下,log.retention.ms和log.retention.minutes均为null,log.retention.hours为168,即Kafka的数据日志默认会被保留7天。如果想修改Kafka中数据日志被保留的时间长度,可以通过修改这三个选项来实现。

按size删除

Kafka除了提供了按时间删除的配置项外,也提供了按照日志文件的size来删除的配置项:

  • log.retention.bytes

即日志文件到达多少byte后再删除日志文件。默认为-1,即无限制。需要注意的是,这个选项的值如果小于segment文件大小的话是不起作用的。segment文件的大小取决于log.segment.bytes配置项,默认为1G。
另外,Kafka的日志删除策略并不是非常严格的(比如如果log.retention.bytes设置了10G的话,并不是超过10G的部分就会立刻删除,只是被标记为待删除,Kafka会在恰当的时候再真正删除),所以请预留足够的磁盘空间。当磁盘空间剩余量为0时,Kafka服务会被kill掉。

操作日志清理

目前Kafka的操作日志暂时不提供自动清理的机制,需要运维人员手动干预,比如使用shell和crontab命令进行定时备份、清理等。

Kafka配置IP访问

在默认情况下,访问Kafka集群需要在/etc/hosts文件下配置所有Kafka集群的hostname,这样比较麻烦。可以通过配置advertised.listeners配置项来使用ip来访问Kafka集群,比如单台Kafka broker的ip地址为192.168.1.100,那么可以使用如下配置来使客户端来使用ip访问:

advertised.listeners=PLAINTEXT://192.168.1.100:9092

Kafka的基本操作

接下来简单介绍一下关于Kafka集群的基本操作。所有这些用到的工具全部在Kafka目录下的/bin目录下可以找到。如果不添加任何参数运行这些工具脚本,控制台上会打印使用的提示。

添加topic

在Kafka中添加topic有两种方式,一种是手动添加,另一种是在数据第一次写入的时候动态添加。一般情况下还是推荐手动添加topic。
使用topic工具创建topic:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor参数用来指定需要多少个副本(连同leader在内),一般比较推荐设置为2或3。如果设置太少(比如1)导致可用性下降,如果设置太大会影响Kafka的性能。

partitions参数用来设置topic有多少个partition。首先,每个partition必须存在于单台服务器上,就是说如果某个topic有20个partition,那么它们将最多分布在20台服务器上(不算replicas)。另外很重要的一点是,partition的数量将直接影响消费端程序的并行度。所以创建topic时要根据实际情况决定partition数量。

修改topic

修改topic主要是为了修改topic的partition数量和topic层面的配置。修改partition数量可以使用此命令:

bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions 40

关于修改partition数量,有两点需要注意:

  1. partition只能增加,不支持减少
  2. 新增的partition只会对将来写入的数据起作用,以前存在的数据不会被移动到新的partition中

除了修改partition数量,我们还可以修改topic层面的配置,增加配置项:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

删除配置项:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x

删除topic

Kafka删除topic的方法也很简单。首先,需要在config/server.properties文件中配置

delete.topic.enable=true

这个配置项默认是关闭的,也就是不支持删除,我们需要手动配置为true。然后使用命令:

./bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name

就可以删除topic了。

查看消费者位置

有些时候我们需要查看某个消费者组的消费情况,我们可以使用工具kafka-consumer-groups.sh来查看。

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test

查看当前正在使用的消费者组

自从kafka 0.9.0.0版本,Kafka引入了新的Java consumer模块代替了原来的Scala consumer模块。消费者组的元数据从zookeeper迁移到了broker中,所以使用新的API查看消费者组列表:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看topic的每个partition状态

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

扩展Kafka集群

本节基本翻译自官网:Expanding your cluster
向Kafka集群添加服务器是很简单的,分配给它们一个唯一的broker id并且在新服务器上启动Kafka服务器就可以了。但是这些新服务器却不能被自动分配数据分区(partition),所以除非partition被移动到这些服务器上,否者这些服务器直到新建topic的时候才会发挥作用。所以,通常情况下当你添加服务器到你的集群中时,你都希望迁移一些已经存在的数据到这些服务器上。

迁移数据的的过程是手动启动的,但是执行是自动化的。内在过程就是Kafka将这些新服务器添加作为它们正在迁移的partition的follower并且允许它全部复制这个分区中已存在的数据。当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。

重新分配分区工具(partition reassignment tool)可以被用来在brokers间移动partition。理想的partition分配会确保跨broker的均匀的数据负载和分区大小。重新分配分区工具并没有自动获取Kafka集群中的数据分布信息并均衡负载的能力。因此,管理员必须清楚要移动哪些topic或partition。

重新分配分区工具有以下三种互斥的运行模式:

  • generate:在这种模式下,给出topic列表和broker列表,分区工具将生成一个候选的分区方案用来将指定topic的所有分区移动到新broker上。这个选项仅仅提供了一种通过给出的topic列表和broker列表生成分区重新分配方案的简便方式。
  • execute:这种模式下,分区工具将根据用户提供的重新分区方案开始重新分区(使用 -- --reassignment-json-file )。这个选项既可以执行管理员手写的重新分区文件也可以执行通过--generate命令生成的重新分区文件。
  • verify:这种模式下,分区工具验证上次执行-generate以来被重新分区的所有partition的状态。这些状态可能是successfully completed(成功完成), failed(失败)或in progress(进行中)

自动迁移数据到新的服务器上

重新分区工具可以将当前brokers上的topics移动到新的brokers上。这在扩展现有集群时很有用,因为将整个topic移动到新brokers上比一次移动一个partition更容易。然后,分区工具可以在新的brokers中均匀分配给定topic集合中的所有partition。在移动过程中,replication factor(副本数)保持不变。

举例来说,下面的例子将会把topics foo1、foo2的所有partition移动到新的brokers 5,6。当移动行为结束以后,foo1和foo2的所有partition将仅存在于broker 5、6。

因为分区工具接受将topic列表写入json文件的形式,所以你需要指定需要移动的topics并创建json文件,就像下面这样:

cat topics-to-move.json
{ "topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }

当json文件准备好 了以后,使用重新分区工具自动生成一个候选的重新分配计划:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate</br>
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]

}</br>
    Proposed partition reassignment configuration</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

分区工具生成的候选计划将会将topic foo1,foo2的所有分区移动到broker 5,6。记住,尽管到了这一步,partition的移动却尚未开始,而仅仅告诉你当前的分区情况(Current partition replica assignment)和建议的新分区方案(Proposed partition reassignment configuration)。当前分区情况应该保留起来,以防你想要回滚。新分区方案应该被存入一个josn文件中,(比如:expand-cluster-reassignment.json),然后作为分区工具 --execute选项的输入文件。就像下面这样:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute</br>
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}</br>
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

最后,--verify选项可以用来检查重新分配分区的状态。注意此处应该使用与执行 --execute 选项时相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

自定义分区分配和移动

分区重新分配工具也可以被用来移动broker被选中的分区。当使用这种方式的时候,我们假设用户了解重新分配分区方案并且不需要使用分区工具去自动生成候选重新分区方案。为了高效,我们跳过了--generate步骤,直接来到--execute步骤

举例来说,下面的例子将topic foo1的partition 0移动到 broker 5,6,将topic foo2的partition 1 移动到broker 2,3:
第一步来手写自定义重新分区计划的json文件:

cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用--execute选项和json文件来开始重新分配分区的操作:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}</br>
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

用--verify选项来查看被重新分配分区的状态。注意此处应该使用与执行 --execute 选项时相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:

  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容