## Kafka开启Kerberos认证
> AS(Authentication Server): 认证服务器
KDC(Key Distribution Center): 密钥分发中心
TGT(Ticket Granting Ticket): 票据授权票据,票据的票据
TGS(Ticket Granting Server):票据授权服务器
SS(Service Server): 特定服务提供端
Principal: 被认证的个体
Ticket: 票据, 客户端用来证明身份真实性。包含: 用户名,IP,时间戳,有效期,会话秘钥。
### 1. 检查系统环境
> 检查当前系统是否安装kafka,没有则先自主安装
### 2. 安装Kerberos相关软件
```java
yum install krb5-server krb5-libs krb5-workstation -y
```
### 3. 配置kdc.conf
> vim /var/kerberos/krb5kdc/kdc.conf
```java
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
KER.COM = {
#master_key_type = aes256-cts
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
}
```
> KER.COM:是设定的realms。名字随意。Kerberos可以支持多个realms,一般全用大写masterkeytype,supported_enctypes默认使用aes256-cts。由于,JAVA使用aes256-######cts验证方式需要安装额外的jar包,这里暂不使用
acl_file:标注了admin的用户权限。
admin_keytab:KDC进行校验的keytab
supported_enctypes:支持的校验方式。注意把aes256-cts去掉
### 4. 配置krb5.conf
> vim /etc/krb5.conf
```java
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
# renew_lifetime = 7d
forwardable = true
# rdns = false
# pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt
default_realm = KER.COM
# default_ccache_name = KEYRING:persistent:%{uid}
[realms]
KER.COM = {
kdc = 192.168.12.104
admin_server = 192.168.12.104
}
[domain_realm]
.ker = KER.COM
ker = KER.COM
```
### 5. 初始化kerberos database
> kdb5_util create -s -r KER.COM
### 6. 修改database administrator的ACL权限
>vim /var/kerberos/krb5kdc/kadm5.acl
>#修改如下
*/admin@KER.COM
### 7. 启动kerberos daemons
> #启动
systemctl start kadmin krb5kdc
#设置开机自动启动
systemctl enable kadmin krb5kdc
#查看启动状态
systemctl status kadmin krb5kdc
### 8. 创建用户
> #1.首先以超管身份进入kadmin
kadmin.local
#2.查看用户
listprincs
#新增用户kafka/es1
addprinc ms_kafka/ker
#退出kadmin.local
exit
### 9. 生成用户keytab(该文件后续kafka配置和客户端认证需用到)
> kadmin.local -q "xst -k /var/kerberos/krb5kdc/kadm5.keytab ms_kafka/ker@KER.COM"
### 10. 在kafka安装目录下config创建kafka-jaas.conf
```java
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="ms_kafka/ker@KER.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=true
keyTab="/var/kerberos/krb5kdc/kadm5.keytab"
principal="ms_kafka/ker@KER.COM";
};
```
### 11. 在config/server.properties末尾添加如下配置
```java
advertised.listeners=SASL_PLAINTEXT://192.168.12.104:9092
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=ms_kafka
```
### 12. 启动项配置
> vi bin/kafka-server-start.sh
```java
在kafka-server-start.sh脚本倒数第二行插入
export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"
```
> vi bin/kafka-console-producer.sh
```java
在kafka-console-producer.sh脚本倒数第二行插入
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"
```
> vi bin/kafka-console-consumer.sh
```java
在kafka-console-consumer.sh脚本倒数第二行插入
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"
```
### 13. 启动kafka和zookeeper
> bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
> bin/kafka-server-start.sh -daemon config/server.properties
如果需要用命令创建生产者或者消费者需要配置如下
```java
配置config/producer.properties,kafka生产者kerberos配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name =ms_kafka
配置config/consumer.properties,kafka消费者kerberos配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name=ms_kafka
创建生产者
./bin/kafka-console-producer.sh --broker-list ker:9092 --topic test --producer.config config/producer.properties
创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server ker:9092 --topic test --consumer.config config/consumer.properties
```
### 14. Java客户端消费
1.把krb5.conf和kafka-jaas.conf和kadm5.keytab下载到本地,并且修改kafka-jaas.conf文件的keytab路径:改为本地keytab路径
2.使用域名记得修改/etc/hosts文件,添加内容:192.168.12.104 ker
3. 示例:
```java
public class KerberosTest {
public static void main(String[] args) {
testProducer();
// testConsumer();
}
private static void testProducer() {
// JAAS配置文件路径和Kerberos配置文件路径
System.setProperty("java.security.auth.login.config", "D:\\kafka-jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\krb5.conf");
// kafka属性配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.12.104:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kerberos安全认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ms_kafka");
String topic = "http";
String msg = "this is a test msg";
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
// 发送消息记录
Future<RecordMetadata> future = kafkaProducer.send(record);
try {
RecordMetadata metadata = future.get();
System.out.printf("Message sent to Kafka topic=%s, msg=%s, partition=%d, offset=%d\n", metadata.topic(),msg, metadata.partition(), metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
kafkaProducer.close();
}
public static void testConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.12.104:9092"); // Kafka Broker 地址
props.put("group.id", "test-group"); // 消费者组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 安全协议和认证方式
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI"); // 使用 Kerberos 认证
props.put("sasl.kerberos.service.name", "ms_kafka"); // Kafka 服务 Principal 名字
// Kerberos 配置
System.setProperty("java.security.auth.login.config", "D:\\kafka-jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\krb5.conf");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("http"));
// 消费数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
// 创建 topic
// try (AdminClient adminClient = AdminClient.create(props)) {
// // 创建 topic
// NewTopic newTopic = new NewTopic("test_topic", 1, (short) 1); // name, partitions, replication-factor
// adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
// System.out.println("Topic created successfully");
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
```
### 15. 注意
> 服务器时差问题: ntpdate -u pool.ntp.org