无标题文章

## Kafka开启Kerberos认证

> AS(Authentication Server): 认证服务器

KDC(Key Distribution Center): 密钥分发中心

TGT(Ticket Granting Ticket): 票据授权票据,票据的票据

TGS(Ticket Granting Server):票据授权服务器

SS(Service Server): 特定服务提供端

Principal: 被认证的个体

Ticket: 票据, 客户端用来证明身份真实性。包含: 用户名,IP,时间戳,有效期,会话秘钥。

### 1. 检查系统环境

> 检查当前系统是否安装kafka,没有则先自主安装

### 2. 安装Kerberos相关软件

```java

yum install krb5-server krb5-libs krb5-workstation -y

```

### 3. 配置kdc.conf

> vim /var/kerberos/krb5kdc/kdc.conf

```java

[kdcdefaults]

kdc_ports = 88

kdc_tcp_ports = 88

[realms]

KER.COM = {

  #master_key_type = aes256-cts

  acl_file = /var/kerberos/krb5kdc/kadm5.acl

  dict_file = /usr/share/dict/words

  admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab

  supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal

}

```

> KER.COM:是设定的realms。名字随意。Kerberos可以支持多个realms,一般全用大写masterkeytype,supported_enctypes默认使用aes256-cts。由于,JAVA使用aes256-######cts验证方式需要安装额外的jar包,这里暂不使用

acl_file:标注了admin的用户权限。

admin_keytab:KDC进行校验的keytab

supported_enctypes:支持的校验方式。注意把aes256-cts去掉

### 4. 配置krb5.conf

> vim /etc/krb5.conf

```java

# Configuration snippets may be placed in this directory as well

includedir /etc/krb5.conf.d/

[logging]

default = FILE:/var/log/krb5libs.log

kdc = FILE:/var/log/krb5kdc.log

admin_server = FILE:/var/log/kadmind.log

[libdefaults]

dns_lookup_realm = false

dns_lookup_kdc = false

ticket_lifetime = 24h

# renew_lifetime = 7d

forwardable = true

# rdns = false

# pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt

default_realm = KER.COM

# default_ccache_name = KEYRING:persistent:%{uid}

[realms]

KER.COM = {

  kdc = 192.168.12.104

  admin_server = 192.168.12.104

}

[domain_realm]

.ker = KER.COM

ker = KER.COM

```

### 5. 初始化kerberos database

>  kdb5_util create -s -r KER.COM

### 6. 修改database administrator的ACL权限

>vim /var/kerberos/krb5kdc/kadm5.acl

>#修改如下

*/admin@KER.COM 

### 7. 启动kerberos daemons

> #启动

systemctl start kadmin krb5kdc

#设置开机自动启动

systemctl enable kadmin krb5kdc

#查看启动状态

systemctl status kadmin krb5kdc

### 8. 创建用户

> #1.首先以超管身份进入kadmin

kadmin.local

#2.查看用户

listprincs

#新增用户kafka/es1

addprinc ms_kafka/ker

#退出kadmin.local

exit

### 9. 生成用户keytab(该文件后续kafka配置和客户端认证需用到)

> kadmin.local -q "xst  -k /var/kerberos/krb5kdc/kadm5.keytab  ms_kafka/ker@KER.COM"

### 10. 在kafka安装目录下config创建kafka-jaas.conf

```java

KafkaServer {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab="/var/kerberos/krb5kdc/kadm5.keytab"

principal="ms_kafka/ker@KER.COM";

};

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

useTicketCache=true

keyTab="/var/kerberos/krb5kdc/kadm5.keytab"

principal="ms_kafka/ker@KER.COM";

};

```

### 11. 在config/server.properties末尾添加如下配置

```java

advertised.listeners=SASL_PLAINTEXT://192.168.12.104:9092

listeners=SASL_PLAINTEXT://0.0.0.0:9092

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=GSSAPI

sasl.enabled.mechanisms=GSSAPI

sasl.kerberos.service.name=ms_kafka

```

### 12. 启动项配置

> vi bin/kafka-server-start.sh

```java

在kafka-server-start.sh脚本倒数第二行插入

export KAFKA_OPTS="-Dzookeeper.sasl.client=false -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"

```

> vi bin/kafka-console-producer.sh

```java

在kafka-console-producer.sh脚本倒数第二行插入

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"

```

> vi bin/kafka-console-consumer.sh

```java

在kafka-console-consumer.sh脚本倒数第二行插入

export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/dxkafka/config/kafka-jaas.conf"

```

### 13. 启动kafka和zookeeper

> bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

> bin/kafka-server-start.sh -daemon config/server.properties

如果需要用命令创建生产者或者消费者需要配置如下

```java

配置config/producer.properties,kafka生产者kerberos配置

security.protocol = SASL_PLAINTEXT

sasl.mechanism = GSSAPI

sasl.kerberos.service.name =ms_kafka

配置config/consumer.properties,kafka消费者kerberos配置

security.protocol = SASL_PLAINTEXT

sasl.mechanism = GSSAPI

sasl.kerberos.service.name=ms_kafka

创建生产者

./bin/kafka-console-producer.sh --broker-list ker:9092 --topic test  --producer.config config/producer.properties

创建消费者

./bin/kafka-console-consumer.sh --bootstrap-server ker:9092 --topic test  --consumer.config config/consumer.properties

```

### 14. Java客户端消费

1.把krb5.conf和kafka-jaas.conf和kadm5.keytab下载到本地,并且修改kafka-jaas.conf文件的keytab路径:改为本地keytab路径

2.使用域名记得修改/etc/hosts文件,添加内容:192.168.12.104 ker

3. 示例:

```java

public class KerberosTest {

    public static void main(String[] args) {

        testProducer();

//        testConsumer();

    }


    private static void testProducer() {

        // JAAS配置文件路径和Kerberos配置文件路径

        System.setProperty("java.security.auth.login.config", "D:\\kafka-jaas.conf");

        System.setProperty("java.security.krb5.conf", "D:\\krb5.conf");

        // kafka属性配置

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.12.104:9092");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // kerberos安全认证

        props.put("security.protocol", "SASL_PLAINTEXT");

        props.put("sasl.mechanism", "GSSAPI");

        props.put("sasl.kerberos.service.name", "ms_kafka");

        String topic = "http";

        String msg = "this is a test msg";

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);

        // 发送消息记录

        Future<RecordMetadata> future = kafkaProducer.send(record);

        try {

            RecordMetadata metadata = future.get();

            System.out.printf("Message sent to Kafka topic=%s, msg=%s, partition=%d, offset=%d\n", metadata.topic(),msg, metadata.partition(), metadata.offset());

        } catch (Exception e) {

            e.printStackTrace();

        }

        kafkaProducer.close();

    }

    public static void testConsumer(){

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.12.104:9092"); // Kafka Broker 地址

        props.put("group.id", "test-group"); // 消费者组

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 安全协议和认证方式

        props.put("security.protocol", "SASL_PLAINTEXT");

        props.put("sasl.mechanism", "GSSAPI"); // 使用 Kerberos 认证

        props.put("sasl.kerberos.service.name", "ms_kafka"); // Kafka 服务 Principal 名字

        // Kerberos 配置

        System.setProperty("java.security.auth.login.config", "D:\\kafka-jaas.conf");

        System.setProperty("java.security.krb5.conf", "D:\\krb5.conf");

        // 创建消费者

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("http"));

        // 消费数据

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            }

        }

        // 创建 topic

//        try (AdminClient adminClient = AdminClient.create(props)) {

//            // 创建 topic

//            NewTopic newTopic = new NewTopic("test_topic", 1, (short) 1); // name, partitions, replication-factor

//            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();

//            System.out.println("Topic created successfully");

//        } catch (Exception e) {

//            e.printStackTrace();

//        }

    }

}

```

### 15. 注意

> 服务器时差问题:  ntpdate -u pool.ntp.org

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容