kafka鉴权PLAIN策略的自定义扩展
官方文档中PLAIN策略不推荐在线上环境使用jaas文件进行用户名密码的配置,但如何进行扩展讲的不是很清楚,这里记录下具体的实践过程
扩展接口开发
配置kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.5.0</version>
</dependency>
实现鉴权接口
package org.apache.kafka.common.security.auth;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class ServerCallbackHandler implements AuthenticateCallbackHandler {
private List<AppConfigurationEntry> jaasConfigEntries;
private final Logger logger = LoggerFactory.getLogger(ServerCallbackHandler.class);;
private HikariDataSource dataSource;
private final String SQL = "SELECT password FROM user WHERE username = ?";
@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
this.jaasConfigEntries = jaasConfigEntries;
Map<String,?> ops = jaasConfigEntries.get(0).getOptions();
String url = "jdbc:mysql://" + ops.get(MysqlConsts.HOSTNAME) + ":" + ops.get(MysqlConsts.PORT) + "/" + ops.get(MysqlConsts.DATABASE_NAME) + "?autoReconnect=true";
HikariConfig config = new HikariConfig();
config.setDriverClassName("com.mysql.jdbc.Driver");
config.setJdbcUrl(url);
config.setUsername(ops.get(MysqlConsts.USER).toString());
config.setPassword(ops.get(MysqlConsts.PASSWORD).toString());
config.setAutoCommit(true);
config.setMaximumPoolSize(5);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("autoReconnect", "true");
dataSource = new HikariDataSource(config);
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
if (username == null)
return false;
else {
Connection conn = null;
try {
conn = dataSource.getConnection();
PreparedStatement statement = conn.prepareStatement(SQL);
statement.setString(1, username);
logger.info("sql:{},username:{}",SQL,username);
ResultSet resultSet = statement.executeQuery();
String clientPassword = new String(password);
if(resultSet.next()) {
String serverPassword = resultSet.getString("password");
logger.info("server password:{}",serverPassword);
logger.info("client password:{}",clientPassword);
if(serverPassword.equalsIgnoreCase(clientPassword)) {
logger.info("login success!");
conn.close();
return true;
}else {
conn.close();
return false;
}
}else {
conn.close();
return false;
}
} catch (SQLException t) {
try {
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
logger.error("SQLException",e);
}
return false;
}
}
}
@Override
public void close() throws KafkaException {
dataSource.close();
}
}
在jaas文件中配置mysql
$ vi config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
hostname="192.168.0.12"
port="3306"
databaseName="db"
user="root"
password="123456"
;
};
在服务端配置鉴权接口
$ vi config/server.properties
listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.auth.ServerCallbackHandler
在服务端部署接口
将以上代码打成jar包,连同mysql驱动,连接池jar包一起放入kafka的libs目录
重启server之后,就可以用mysql进行用户名密码的管理了