kafka鉴权PLAIN策略的自定义扩展

kafka鉴权PLAIN策略的自定义扩展

官方文档中PLAIN策略不推荐在线上环境使用jaas文件进行用户名密码的配置,但如何进行扩展讲的不是很清楚,这里记录下具体的实践过程

扩展接口开发

配置kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>2.5.0</version>
</dependency>

实现鉴权接口

package org.apache.kafka.common.security.auth;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class ServerCallbackHandler implements AuthenticateCallbackHandler {
    
    private List<AppConfigurationEntry> jaasConfigEntries;
    private final Logger logger = LoggerFactory.getLogger(ServerCallbackHandler.class);;
    private HikariDataSource dataSource;
    private final String SQL = "SELECT password FROM user WHERE username = ?";
    
    @Override
    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        this.jaasConfigEntries = jaasConfigEntries;
        
        Map<String,?> ops =  jaasConfigEntries.get(0).getOptions();
        String url = "jdbc:mysql://" + ops.get(MysqlConsts.HOSTNAME) + ":" + ops.get(MysqlConsts.PORT) + "/" + ops.get(MysqlConsts.DATABASE_NAME) + "?autoReconnect=true";   
        HikariConfig config = new HikariConfig();
        config.setDriverClassName("com.mysql.jdbc.Driver");
        config.setJdbcUrl(url);
        config.setUsername(ops.get(MysqlConsts.USER).toString());
        config.setPassword(ops.get(MysqlConsts.PASSWORD).toString());
        config.setAutoCommit(true);
        config.setMaximumPoolSize(5);
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("autoReconnect", "true");
        dataSource =  new HikariDataSource(config);
    }
    
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }
    
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            Connection conn = null;
            try {
                conn = dataSource.getConnection();
                PreparedStatement statement = conn.prepareStatement(SQL);
                statement.setString(1, username);
                logger.info("sql:{},username:{}",SQL,username);
                ResultSet resultSet = statement.executeQuery();
                String clientPassword = new String(password);
                if(resultSet.next()) {
                    String serverPassword = resultSet.getString("password");
                    logger.info("server password:{}",serverPassword);
                    logger.info("client password:{}",clientPassword);
                    if(serverPassword.equalsIgnoreCase(clientPassword)) {
                        logger.info("login success!");
                        conn.close();
                        return true;
                    }else {
                        conn.close();
                        return false;
                    }
                }else {
                    conn.close();
                    return false;
                }
            }  catch (SQLException t) {
                try {
                    if(conn != null) {
                        conn.close();
                    }
                } catch (SQLException e) {
                    logger.error("SQLException",e);
                }
                return false;
            } 
        }
    }
    
    @Override
    public void close() throws KafkaException {
        dataSource.close();
    }
}

在jaas文件中配置mysql

$ vi config/kafka_server_jaas.conf  
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    hostname="192.168.0.12"
    port="3306"
    databaseName="db"
    user="root"
    password="123456"
    ;
};

在服务端配置鉴权接口

$ vi config/server.properties 
listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.auth.ServerCallbackHandler

在服务端部署接口

将以上代码打成jar包,连同mysql驱动,连接池jar包一起放入kafka的libs目录
重启server之后,就可以用mysql进行用户名密码的管理了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。