Kafka版本 0.11.0 Scala版本 2.11
官方文档 https://kafka.apache.org/documentation/#adminclientconfigs
前情提要
之前编写的kafka监控数据采集agent里使用了AdminClient来获取了所有的消费者组(ConsumerGroup)的信息 使用方法如下:
client = AdminClient.createSimplePlaintext(host + ":" + kafkaPort);
List<GroupOverview> groupOverviews = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
问题简介
在给业务方布这个agent的时候发现业务方的kafka集群采用了SASL认证,AdminClient无法连接成功,获取不到kafka的数据。
解决方法
创建配置文件(sasl.properties) 写入如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\n\
username="alice"\n\
password="alice";
上述配置只适用于SASL明文认证的情况 其他情况请进行相应更改
注意:配置中最后的\n和;是必不可少的
然后改用如下创建方式使用AdminClient
saslProperties = new Properties();
saslProperties.load(new FileInputStream(yourConfigFilePath));
saslProperties.put("bootstrap.servers", host + ":" + kafkaPort);
client = AdminClient.create(properties);
不将servers地址放在配置中的原因是有多个broker的需求 如果没有可以放在配置中
如果只想解决问题看到这里就够了
解决思路
我没有在网络上找到该问题现成的解决方案,所以记录下我的解决思路以供大家思考
① 首先 查看AdminClient.class的代码
public static AdminClient create(AdminClient.AdminConfig var0) {
return AdminClient$.MODULE$.create(var0);
}
public static AdminClient create(Map<String, ?> var0) {
return AdminClient$.MODULE$.create(var0);
}
public static AdminClient create(Properties var0) {
return AdminClient$.MODULE$.create(var0);
}
public static AdminClient createSimplePlaintext(String var0) {
return AdminClient$.MODULE$.createSimplePlaintext(var0);
}
可以看到上面三个create调用的都是同一套接口,我们跟着看
public AdminClient create(AdminConfig config) {
Time time = Time.SYSTEM;
Metrics metrics = new Metrics(time);
Metadata metadata = new Metadata(100L, 3600000L, true);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Integer requestTimeoutMs = config.getInt("request.timeout.ms");
Long retryBackoffMs = config.getLong("retry.backoff.ms");
List brokerUrls = config.getList("bootstrap.servers");
List brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls);
Cluster bootstrapCluster = Cluster.bootstrap(brokerAddresses);
metadata.update(bootstrapCluster, Collections.emptySet(), 0L);
Selector selector = new Selector((long)this.DefaultConnectionMaxIdleMs(), metrics, time, "admin", channelBuilder);
NetworkClient networkClient = new NetworkClient(selector, metadata, (new StringBuilder()).append("admin-").append(BoxesRunTime.boxToInteger(this.AdminClientIdSequence().getAndIncrement())).toString(), this.DefaultMaxInFlightRequestsPerConnection(), (long)this.DefaultReconnectBackoffMs(), (long)this.DefaultReconnectBackoffMax(), this.DefaultSendBufferBytes(), this.DefaultReceiveBufferBytes(), .MODULE$.Integer2int(requestTimeoutMs), time, true, new ApiVersions());
ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(networkClient, metadata, time, .MODULE$.Long2long(retryBackoffMs), (long).MODULE$.Integer2int(requestTimeoutMs));
return new AdminClient(time, .MODULE$.Integer2int(requestTimeoutMs), .MODULE$.Long2long(retryBackoffMs), highLevelClient, ((TraversableOnce)scala.collection.JavaConverters..MODULE$.asScalaBufferConverter(bootstrapCluster.nodes()).asScala()).toList());
}
可以看到 可能使用到我们认证参数的只有
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
这一行代码 我们进去看
public static ChannelBuilder createChannelBuilder(AbstractConfig config) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString("security.protocol"));
if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) {
throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
} else {
String clientSaslMechanism = config.getString("sasl.mechanism");
return ChannelBuilders.clientChannelBuilder(securityProtocol, Type.CLIENT, config, (ListenerName)null, clientSaslMechanism, true);
}
}
从这里开始已经可以看到关于认证的内容了
首先获取了security.protocol的值 如果合法的话获取了sasl.mechanism的值
继续跟进去
public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable) {
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
if (contextType == null) {
throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
}
if (clientSaslMechanism == null) {
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism, saslHandshakeRequestEnable, (CredentialCache)null);
}
继续
private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache) {
Map configs;
if (listenerName == null) {
configs = config.values();
} else {
configs = config.valuesWithPrefixOverride(listenerName.configPrefix());
}
Object channelBuilder;
switch(securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SslChannelBuilder(mode);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
break;
case PLAINTEXT:
case TRACE:
channelBuilder = new PlaintextChannelBuilder();
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
((ChannelBuilder)channelBuilder).configure(configs);
return (ChannelBuilder)channelBuilder;
}
前后看了两个版本的AdminClient(0.11.0 1.1.1)代码 格式有所区别 但是原理是一样的
我们是SASL_PLAINTEXT 跟着走 首先看这行
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName, Map<String, ?> configs) {
String listenerContextName;
String globalContextName;
switch(contextType) {
case CLIENT:
if (listenerName != null) {
throw new IllegalArgumentException("listenerName should be null for CLIENT");
}
globalContextName = "KafkaClient";
listenerContextName = null;
break;
case SERVER:
if (listenerName == null) {
throw new IllegalArgumentException("listenerName should not be null for SERVER");
}
globalContextName = "KafkaServer";
listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + "KafkaServer";
break;
default:
throw new IllegalArgumentException("Unexpected context type " + contextType);
}
return load(contextType, listenerContextName, globalContextName, configs);
}
我们是客户端连接 KafkaClient
继续
static JaasContext load(JaasContext.Type contextType, String listenerContextName, String globalContextName, Map<String, ?> configs) {
Password jaasConfigArgs = (Password)configs.get("sasl.jaas.config");
if (jaasConfigArgs != null) {
if (contextType == JaasContext.Type.SERVER) {
throw new IllegalArgumentException("JAAS config property not supported for server");
} else {
JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName);
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1) {
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module");
} else {
return new JaasContext(globalContextName, contextType, jaasConfig);
}
}
} else {
return defaultContext(contextType, listenerContextName, globalContextName);
}
}
这里获取了sasl.jaas.config的值 看这一行
JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
public JaasConfig(String loginContextName, String jaasConfigParams) {
StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(jaasConfigParams));
tokenizer.slashSlashComments(true);
tokenizer.slashStarComments(true);
tokenizer.wordChars(45, 45);
tokenizer.wordChars(95, 95);
tokenizer.wordChars(36, 36);
try {
this.configEntries = new ArrayList();
while(tokenizer.nextToken() != -1) {
this.configEntries.add(this.parseAppConfigurationEntry(tokenizer));
}
if (this.configEntries.isEmpty()) {
throw new IllegalArgumentException("Login module not specified in JAAS config");
} else {
this.loginContextName = loginContextName;
}
} catch (IOException var5) {
throw new KafkaException("Unexpected exception while parsing JAAS config");
}
}
这里就是在分割config字符串获得对应每一个的值了 看这一行
while(tokenizer.nextToken() != -1) {
this.configEntries.add(this.parseAppConfigurationEntry(tokenizer));
}
private AppConfigurationEntry parseAppConfigurationEntry(StreamTokenizer tokenizer) throws IOException {
String loginModule = tokenizer.sval;
if (tokenizer.nextToken() == -1) {
throw new IllegalArgumentException("Login module control flag not specified in JAAS config");
} else {
LoginModuleControlFlag controlFlag = this.loginModuleControlFlag(tokenizer.sval);
HashMap options = new HashMap();
while(true) {
if (tokenizer.nextToken() != -1 && tokenizer.ttype != 59) {
String key = tokenizer.sval;
if (tokenizer.nextToken() == 61 && tokenizer.nextToken() != -1 && tokenizer.sval != null) {
String value = tokenizer.sval;
options.put(key, value);
continue;
}
throw new IllegalArgumentException("Value not specified for key '" + key + "' in JAAS config");
}
if (tokenizer.ttype != 59) {
throw new IllegalArgumentException("JAAS config entry not terminated by semi-colon");
}
return new AppConfigurationEntry(loginModule, controlFlag, options);
}
}
}
loginModule就是config里第一行那个导入模块
然后 看这里
LoginModuleControlFlag controlFlag = this.loginModuleControlFlag(tokenizer.sval);
private LoginModuleControlFlag loginModuleControlFlag(String flag) {
String var3 = flag.toUpperCase(Locale.ROOT);
byte var4 = -1;
switch(var3.hashCode()) {
case -848090850:
if (var3.equals("SUFFICIENT")) {
var4 = 2;
}
break;
case -810754599:
if (var3.equals("REQUISITE")) {
var4 = 1;
}
break;
case 389487519:
if (var3.equals("REQUIRED")) {
var4 = 0;
}
break;
case 703609696:
if (var3.equals("OPTIONAL")) {
var4 = 3;
}
}
LoginModuleControlFlag controlFlag;
switch(var4) {
case 0:
controlFlag = LoginModuleControlFlag.REQUIRED;
break;
case 1:
controlFlag = LoginModuleControlFlag.REQUISITE;
break;
case 2:
controlFlag = LoginModuleControlFlag.SUFFICIENT;
break;
case 3:
controlFlag = LoginModuleControlFlag.OPTIONAL;
break;
default:
throw new IllegalArgumentException("Invalid login module control flag '" + flag + "' in JAAS config");
}
return controlFlag;
}
如果没写\n 这里读到的就是requiredusername 就会报如下错误
"Invalid login module control flag '" + flag + "' in JAAS config"
后面就没什么好说的了 到了这里 所有需要我们提供的配置都已经出现。我们只要按照要求的格式提供给AdminClient就可以正常使用了