Kafka监控安装
Kafka集群监控方案选择:
- Kafka只能依靠
kafka-run-class.sh
等命令进行管理 -
Kafka Manager(CMAK)是目前比较常用的监控工具,它有如下功能:
- 管理多个集群
- 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
- 运行首选副本选举
- 使用选项生成分区分配以选择要使用的代理
- 运行分区重新分配(基于生成的分配)
- 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
- 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true)
- 主题列表现在指示标记为删除的主题(仅支持0.8.2+)
- 批量生成多个主题的分区分配,并可选择要使用的代理
- 批量运行重新分配多个主题的分区
- 将分区添加到现有主题
- 更新现有主题的配置
- 可随意开启对broker级别或者对topic级别的JMX轮询
- 可方便的过滤出没有id 、所有者、延迟或目录等的消费者
所以本小节先介绍该监控工具的安装及配置,到如下地址可以下载各个版本的Kafka Manager:
我这里下载的是当前最新的3.0.0.5
版本,需要注意的是3.x版本需要基于JDK11环境,2.0.0.2
及以下版本才兼容JDK1.8。复制下载链接后,使用wget
命令进行下载:
[root@localhost ~]# cd /usr/local/src
[root@localhost /usr/local/src]# wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
-
Tips:
2.0.0.2
版本只有源码安装这一种安装方式,如果选用2.0.0.2
版本的话可以参考此文的安装步骤:https://blog.51cto.com/liqingbiao/2417010
解压下载好的压缩包,放到kafka的安装目录下,并且更名为kafka-manager
,不然总感觉cmak
这个名称容易和cmake
搞混:
[root@localhost /usr/local/src]# unzip cmak-3.0.0.5.zip
[root@localhost /usr/local/src]# mv cmak-3.0.0.5 /usr/local/kafka/kafka-manager
然后修改一下配置文件,主要是配置Kafka集群中Zookeeper的连接地址,也就是要监控哪个Kafka集群就配置哪个Kafka集群的Zookeeper地址:
[root@localhost ~]# vim /usr/local/kafka/kafka-manager/conf/application.conf
# 配置多个地址使用逗号分隔即可
cmak.zkhosts="127.0.0.1:2181"
配置完成后,使用如下命令启动即可:
[root@localhost ~]# nohup /usr/local/kafka/kafka-manager/bin/cmak &
如下,正常监听了9000
端口代表已经启动成功:
[root@localhost ~]# netstat -lntp |grep 9000
tcp6 0 0 :::9000 :::* LISTEN 25237/java
[root@localhost ~]#
如果你的机器打开了防火墙的话,还需要在防火墙上放开9000
端口:
[root@localhost ~]# firewall-cmd --zone=public --add-port=9000/tcp --permanent
[root@localhost ~]# firewall-cmd --reload
然后就可以在浏览器上打开监控界面了,如下所示:
Kafka监控界面
当我们首次打开CMAK的监控页面时,是一片空白的。因为此时我们还没有添加任何需要被监控的集群,所以首先第一步就是要添加集群:
需要注意的是,如果要开启JMX轮询,则必须事先在Kafka的启动脚本中打开JMX的端口号:
[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh
# 打开JMX端口
export JMX_PORT=9999
然后重启Kafka:
[root@localhost ~]# kafka-server-stop.sh
[root@localhost ~]# nohub kafka-server-start.sh /usr/local/kafka/config/server.properties &
剩下的配置基本保持默认即可,然后点击“Save”进行保存:
保存成功后,点击“Go to cluster view”:
就可以查看到我们添加的这个集群信息,在“Cluster Summary”一栏中显示了该集群的Topic数量和Broker数量:
点击“Topics”的数字就可以进入到Topic的监控界面:
- Partitions:Topic的Partition数量
- Brokers:Topic的Broker数量
- Replicas:Topic中Partition的副本数量
- Under Replicated:Topic中的Partition副本处于同步失败或失效状态的比例,该指标过高则代表副本没有复制到足够的Broker上
- Producer Message/Sec:生产者每秒投递的消息数量
- Summed Recent Offsets:当前总计的消费偏移量
- Brokers Spread:看作broker使用率,如kafka集群9个broker,某topic有7个partition,则broker spread: 7 / 9 = 77%
- Brokers Skew:partition是否存在倾斜,如kafka集群9个broker,某topic有18个partition,正常每个broker应该2个partition。若其中有3个broker上的partition数>2,则broker skew: 3 / 9 = 33%
- Brokers Leader Skew:leader partition是否存在倾斜,如kafka集群9个broker,某topic14个partition,则正常每个broker有2个leader partition。若其中一个broker有0个leader partition,一个有4个leader partition,则broker leader skew: (4 - 2) / 14 = 14%
点击“Brokers”的数字就则是进入到Broker的监控界面:
- Messages in /sec:每秒流入的消息数
- Bytes in /sec:每秒流入的字节数
- Bytes out /sec:每秒流出的字节数
- Bytes rejected /sec:每秒拒绝的字节数
- Failed fetch request /sec:每秒失败抓取数据请求数
- Failed produce request /sec:每秒失败的生产数据请求数
Tips:这些指标的监控需要打开JMX
在Topic的监控界面中,点击一个具体的Topic可以进入到该Topic的监控页面,并且提供了对Topic的操作支持:
Topic的监控指标里也有与Broker一样的监控指标,只不过这里是针对Topic的,指标的含义都一样:
Kafka SSL签名库生成
Kafka的安全措施:
- Kafka提供了SSL或SASL机制来保障集群安全
- Kafka提供了Broker到zk连接的安全机制
- Kafka支持Client的读写验证
值得一提的是通常情况下都不会给Kafka加安全措施,类似的其他中间件也是。因为通常我们都会将这些中间件部署在一个可信的网络里,例如与外网隔离的内部网络,并且有防火墙进行保护。
而且给Kafka加上SSL或SASL安全机制也会导致性能有所损耗,通常这个损耗在20~30%左右。但如果你的Kafka是允许在外网进行访问的话,那么就需要考虑增加安全机制了。
在本文中主要介绍一下SSL这种安全机制,SSL如今也不算啥冷门知识了,对HTTPS有所了解的话都应该清楚,这里就不进行赘述了。首先我们知道SSL是需要证书的,所以第一步就是创建证书,但在此之前需要先创建密钥仓库,用于存储证书文件。具体命令如下:
[root@localhost ~]# mkdir ca-store # 创建一个目录
[root@localhost ~]# cd ca-store/ # 进入该目录
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -validity 100000 -genkey # 创建密钥仓库
Enter keystore password: # 输入密码
Re-enter new password: # 确认密码
What is your first and last name? # 输入你的姓名
[Unknown]: lingyi
What is the name of your organizational unit? # 输入你的组织单位
[Unknown]: zj
What is the name of your organization? # 输入你的组织名称
[Unknown]: zj
What is the name of your City or Locality? # 输入你的所在城市
[Unknown]: beijing
What is the name of your State or Province? # 输入你的所在省份
[Unknown]: beijing
What is the two-letter country code for this unit? # 输入两个字母的国家代码
[Unknown]: cn
Is CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn correct?
[no]: y # 输入y确认以上信息
[root@localhost ~/ca-store]# ls
server.keystore.jks # 创建完成后,当前目录下会有这样一个文件
[root@localhost ~/ca-store]#
创建CA证书:
[root@localhost ~]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 100000
Generating a 2048 bit RSA private key
........................+++
........+++
writing new private key to 'ca-key'
Enter PEM pass phrase: # 输入密码
Verifying - Enter PEM pass phrase: # 确认密码
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:cn # 输入两个字母的国家代码
State or Province Name (full name) []:beijing # 输入你的所在省份
Locality Name (eg, city) [Default City]:beijing # 输入你的所在城市
Organization Name (eg, company) [Default Company Ltd]:zj # 输入你的组织单位
Organizational Unit Name (eg, section) []:zj # 输入你的组织名称
Common Name (eg, your name or your server's hostname) []:lingyi # 输入你的姓名或服务器名称
Email Address []:email@example.com # 输入你的邮箱地址
[root@localhost ~/ca-store]# ls # 创建完成后,当前目录下会多出两个文件
ca-cert ca-key server.keystore.jks
[root@localhost ~/ca-store]#
将生成的CA添加到客户端信任库:
[root@localhost ~]# keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
Enter keystore password: # 输入密码
Re-enter new password: # 确认密码
Owner: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn
Issuer: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn
Serial number: e6fa410f7c90ff2a
Valid from: Mon Jul 06 22:20:50 CST 2020 until: Sat Apr 21 22:20:50 CST 2294
Certificate fingerprints:
SHA1: 1F:F3:8C:F4:37:9C:47:45:42:A4:51:77:7D:DA:05:E5:59:27:0C:9F
SHA256: F6:7E:F6:E2:A9:12:8B:C4:04:6E:F0:23:49:6F:0D:3C:94:5F:AD:D6:76:42:42:63:24:69:96:C6:EE:02:70:91
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j.
0010: 40 3B E8 4D @;.M
]
]
#2: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
CA:true
PathLen:2147483647
]
#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j.
0010: 40 3B E8 4D @;.M
]
]
Trust this certificate? [no]: y # 是否信任此证书
Certificate was added to keystore
[root@localhost ~/ca-store]# ls
ca-cert ca-key client.truststore.jks server.keystore.jks
[root@localhost ~/ca-store]#
为Broker提供信任库以及所有客户端签名了密钥的CA证书:
[root@localhost ~/ca-store]# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
... 与之前的内容一致,略 ...
[root@localhost ~/ca-store]# ls
ca-cert ca-key client.truststore.jks server.keystore.jks server.truststore.jks
[root@localhost ~/ca-store]#
完成以上步骤后,就是对证书进行签名,也就是用自己生成的CA来签名前面生成的证书。
1、从密钥仓库导出证书:
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -certreq -file cert-file
Enter keystore password: # 这里输入server.keystore.jks的密码
[root@localhost ~/ca-store]#
2、用CA签名:
[root@localhost ~/ca-store]# openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 100000 -CAcreateserial -passin pass:123456
Signature ok
subject=/C=cn/ST=beijing/L=beijing/O=zj/OU=zj/CN=lingyi
Getting CA Private Key
[root@localhost ~/ca-store]#
3、导入CA的证书和已签名的证书到密钥仓库:
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
... 与之前的内容一致,略 ...
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -import -file cert-signed
Enter keystore password:
Certificate reply was installed in keystore
[root@localhost ~/ca-store]# ls # 完成所有步骤后,当前目录下会有如下文件
ca-cert ca-cert.srl ca-key cert-file cert-signed client.truststore.jks server.keystore.jks server.truststore.jks
[root@localhost ~/ca-store]#
Kafka SSL服务端集成
Kafka SSL服务端集成其实也比较简单,只需要修改一下Kafka的server.properties
配置文件即可。具体如下所示:
[root@localhost ~]# vim /usr/local/kafka/config/server.properties
# 在原本的配置上追加SSL的监听端口及协议配置
listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989
advertised.listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989
# 增加SSL相关配置
ssl.keystore.location=/root/ca-store/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/ca-store/server.truststore.jks
ssl.truststore.password=123456
完成配置的修改后重启Kafka:
[root@localhost ~]# kafka-server-stop.sh
[root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
然后就可以使用openssl
测试一下SSL配置是否成功,执行如下命令并输出了类似的内容则代表配置成功,已经能够通过SSL协议连接:
[root@localhost ~]# openssl s_client -debug -connect 192.168.220.128:8989 -tls1
CONNECTED(00000003)
write to 0xfcd230 [0xfd6d63] (181 bytes => 181 (0xB5))
0000 - 16 03 01 00 b0 01 00 00-ac 03 01 0c a9 85 ea 8f ................
0010 - f2 f3 c1 ac fb 9d f6 78-9c ed f4 60 97 ad 91 33 .......x...`...3
0020 - 32 ab b2 81 9e 81 6b 3f-e0 db da 00 00 64 c0 14 2.....k?.....d..
0030 - c0 0a 00 39 00 38 00 37-00 36 00 88 00 87 00 86 ...9.8.7.6......
0040 - 00 85 c0 0f c0 05 00 35-00 84 c0 13 c0 09 00 33 .......5.......3
0050 - 00 32 00 31 00 30 00 9a-00 99 00 98 00 97 00 45 .2.1.0.........E
0060 - 00 44 00 43 00 42 c0 0e-c0 04 00 2f 00 96 00 41 .D.C.B...../...A
0070 - c0 12 c0 08 00 16 00 13-00 10 00 0d c0 0d c0 03 ................
0080 - 00 0a 00 07 c0 11 c0 07-c0 0c c0 02 00 05 00 04 ................
0090 - 00 ff 01 00 00 1f 00 0b-00 04 03 00 01 02 00 0a ................
00a0 - 00 0a 00 08 00 17 00 19-00 18 00 16 00 23 00 00 .............#..
00b0 - 00 0f 00 01 01 .....
read from 0xfcd230 [0xfd2813] (5 bytes => 5 (0x5))
0005 - <SPACES/NULS>
write to 0xfcd230 [0xfdc2b0] (7 bytes => 7 (0x7))
0000 - 15 03 01 00 02 02 46 ......F
140425165125520:error:1408F10B:SSL routines:SSL3_GET_RECORD:wrong version number:s3_pkt.c:365:
---
no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 7 bytes
---
New, (NONE), Cipher is (NONE)
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
Protocol : TLSv1
Cipher : 0000
Session-ID:
Session-ID-ctx:
Master-Key:
Key-Arg : None
Krb5 Principal: None
PSK identity: None
PSK identity hint: None
Start Time: 1594126681
Timeout : 7200 (sec)
Verify return code: 0 (ok)
---
Kafka SSL客户端集成
完成服务端的配置后,接下来继续完成客户端的配置。首先我们需要把之前生成的client.truststore.jks
文件从服务器上download下来,并存放在客户端工程目录里,例如resources
目录:
然后在创建Producer客户端的时候,增加SSL相关配置项。如下代码示例:
/**
* 创建支持SSL的Producer实例
*/
public static Producer<String, String> createProducerWitchSSL() {
Properties properties = new Properties();
// 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
// 配置事务支持
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans-id");
// SSL配置
properties.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks");
properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
properties.setProperty("security.protocol", "SSL");
return new KafkaProducer<>(properties);
}
/**
* 发送消息
*/
public static void producerAsyncSend() {
String topicName = "MyTopic";
String key = "test-key";
String value = "this is test message!";
Producer<String, String> producer = createProducerWitchSSL();
// 初始化事务
producer.initTransactions();
try {
// 开启事务
producer.beginTransaction();
// 构建消息对象
ProducerRecord<String, String> record =
new ProducerRecord<>(topicName, key, value);
// 发送一条消息
Future<RecordMetadata> future = producer.send(record);
System.out.println(future.get().timestamp());
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
// 发生异常回滚事务
producer.abortTransaction();
} finally {
producer.close();
}
}
Consumer客户端也是同样的,只需要在创建客户端实例的时候增加相同的SSL配置即可。完整代码如下:
/**
* 创建支持SSL的Consumer实例
*/
public static Consumer<String, String> createConsumerWithSSL() {
Properties props = new Properties();
// 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// SSL配置
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
props.setProperty("security.protocol", "SSL");
return new KafkaConsumer<>(props);
}
/**
* 消费消息
*/
public static void autoCommitOffset() {
Consumer<String, String> consumer = createConsumerWithSSL();
List<String> topics = List.of("MyTopic");
// 订阅一个或多个Topic
consumer.subscribe(topics);
while (true) {
// 从Topic中拉取数据,每1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 每次拉取可能都是一组数据,需要遍历出来
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
Kafka最佳实践配置项
服务端必要参数
zookeeper.connect
:必配参数,建议在kafka集群的每台实例都配置所有的zk节点
broker.id
:必配参数。集群节点的标示符,不得重复,取值范围0~n
log.dirs
:不要使用默认的“/tmp/kafka-logs
”,因为/tmp
目录的性质没法保证数据的持久性
服务端推荐参数
advertised.host.name
:注册到zk供用户使用的主机名
advertised.port
:注册到zk供用户使用的服务端口
num.partitions
:创建topic时的默认partition数量,默认是1
default.replication.factor
:自动创建topic的默认副本数量,建议至少修改为2
min.insync.replicasISR
:提交生成者请求的最小副本数,建议至少2~3个
unclean.leader.election.enable
:是否允许不具备ISR资格的replicas被选举为leader,建议设置为否,除非能够允许数据的丢失
controlled.shutdown.enable
:在kafka收到stop命令或者异常终止时,允许自动同步数据,建议开启
可动态调整的参数
unclean.leader.election.enable
:不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。
min.insync.replicas
:如果同步状态的副本小于该值,服务器将不再接受request.required.acks
为-1或all的写入请求。
max.message.bytes
:单条消息的最大长度。如果修改了该值,那么replica.fetch.max.bytes
和消费者的fetch.message.max.bytes
也要跟着修改。
cleanup.policy
:生命周期终结数据的处理,默认删除。
flush.messages
:强制刷新写入的最大缓存消息数。
flush.ms
:强制刷新写入的最大等待时长。
客户端配置:
Producer
客户端:ack、压缩、同步生产 vs 异步生产、批处理大小(异步生产)
Consumer
客户端方面主要考虑:partition数量及获取消息的大小
Kafka服务器配置最佳实践
JVM参数建议:
1、能分配较大堆的情况下使用JVM的G1垃圾回收器
以下是一段基于24GB内存、四核英特尔至强处理器,8x7200转的SATA硬盘机器的配置参考示例,可以参照该示例代入自己的机器配置进行调整:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
机器硬件建议:
1. 内存:建议使用64G内存的机器
2. CPU:尽量选择更多核心,将会获得多核带来的更好的并发处理性能
3. 磁盘:RAID是优先推荐的,SSD可以考虑
4. 网络:最好是万兆网络,千兆也可
5. 文件系统:ext4是最佳选择
6. 操作系统:任何Unix系统上运行良好,并且已经在Linux和Solaris上进行了测试
核心参数调整建议:
- 文件描述符数量调整:
(number_of_partitions)*(partition_size / segment_size)
,通常都在100000
左右 - 视具体情况调整最大套接字缓冲区大小
- pagecache:尽量分配与大多数日志的激活日志段大小一致
- 禁用swap
- 设计broker的数量:单broker上的分区数小于2000;分区大小则建议不要超过25GB
- 设计partition的数量:
- 至少和最大的消费者组中consumer的数量一致
- 分区不要太大,建议小于25GB