Kafka安全机制解析及重构(三)| 重构示例及脑洞

2018.12.01注:详细代码由于在内网无法直接奉上,所以这篇介绍的只是基本的代码结构,可能还需要再调试一下才能通。调试过程中需要注意的是:

  1. server.properties中需要配置上集群内部之间交互走PLAIN模式
    sasl.enabled.mechanism=ABC,PLAIN
    security.inter.broker.protocol=PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
  2. hasInitialResponse经常会出问题,建议在client和server的交互过程中加入debug日志
  3. 如果还需要做队列权限的校验,authID很重要,是用来做acl控制里的用户的。建议配置成username。

在前两篇文章中讲解了Kafka的SASL安全机制,并看了PLAIN的源码:
Kafka安全机制解析及重构(一)
Kafka安全机制解析及重构(二)

接下来我们要做的就是自己实现一套Kafka的安全认证机制ABC。这套机制要实现以下的需求:

  • 客户端配置时指定使用机制ABC
  • Broker之间无认证,Broker接收客户端请求时使用ABC机制校验
  • 密码加密传输

根据上一篇中的介绍,我们需要实现的类有以下几个:

  1. LoginModule
  2. Provider(client和server)
  3. SaslClientFactory
  4. SaslClient
  5. SaslServerFactory
  6. SaslServer

LoginModule

要先实现LoginModule。这个LoginModule中在客户端需要初始化SaslClientProvider,服务端需要初始化SaslServerProvider,为了方便,这里直接初始化两个Provider。如果客户端是给外部用户使用的话,为了安全,只需要初始化SaslClientProvider即可。

import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
import java.util.Map;

public class ABCLoginModule implements LoginModule {

    private static final String USERNAME = "username";
    private static final String PASSWORD = "password";

    static {
        ABCSaslClientProvider.initialize();
        ABCSaslServerProvider.initialize();
    }

    @Override
    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> optiABC) {
        String accessKey = (String) optiABC.get(USERNAME);
        if (accessKey != null)
            subject.getPublicCredentials().add(accessKey);
        String secretKey = (String) optiABC.get(PASSWORD);
        if (secretKey != null)
            subject.getPrivateCredentials().add(secretKey);
    }

    @Override
    public boolean login() throws LoginException {
        return true;
    }

    @Override
    public boolean logout() throws LoginException {
        return true;
    }

    @Override
    public boolean commit() throws LoginException {
        return true;
    }

    @Override
    public boolean abort() throws LoginException {
        return false;
    }
}

Provider

而后我们分别实现两个Provider,Client和Server,代码太简单了就不多赘述了。

public class ABCSaslClientProvider extends Provider {

    private static final long serialVersionUID = 1L;


    protected ABCSaslClientProvider() {
        super("Simple SASL/ABC Client Provider", 1.0, "Simple SASL/ABC Client Provider for Kafka");
        super.put("SaslClientFactory." + ABCSaslClient.ABC_MECHANISM, ABCSaslClientFactory.class.getName());
    }

    public static void initialize() {
        Security.addProvider(new ABCSaslClientProvider());
    }
}
public class ABCSaslServerProvider extends Provider {

    private static final long serialVersionUID = 1L;

    protected ABCSaslServerProvider() {
        super("Simple SASL/ABC Server Provider", 1.0, "Simple SASL/ABC Server Provider for Kafka");
        super.put("SaslServerFactory." + ABCSaslServer.ABC_MECHANISM, ABCSaslServerFactory.class.getName());
    }

    public static void initialize() {
        Security.addProvider(new ABCSaslServerProvider());
    }
}

SaslClientFactory

SaslClientFactory中需要做的事有以下几个:

  • 判断客户端的安全机制是否是ABC,只支持ABC的安全机制
  • 从callbackHandler对象中取出用户密码
  • 使用用户密码来构造出SaslClient
public class ABCSaslClientFactory implements SaslClientFactory{
    private static final String[] myMechs = new String[]{ABCSaslClient.ABC_MECHANISM};

    @Override
    public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName,
            Map<String, ?> props, CallbackHandler cbh) throws SaslException {
        if (mechanisms.length != 1 || !mechanisms[0].equals(ABCSaslClient.ABC_MECHANISM)) {
            throw new SaslException("Mechanism ERROR, only support mechanism:" + ABCSaslClient.ABC_MECHANISM);
        }
        String mech = ABCSaslClient.ABC_MECHANISM;
        String username = null;
        String password = null;
        if(cbh == null) {
            throw new SaslException("Callback handler to get username/password required");
        } else {
            try {
                String namePrompt = mech + " authentication id: ";
                String passwordPrompt = mech + " password: ";
                NameCallback nameCallBack = new NameCallback(namePrompt);
                PasswordCallback passwdCallBack = new PasswordCallback(passwordPrompt, false);
                cbh.handle(new Callback[]{nameCallBack, passwdCallBack});
                char[] pwdBytes = passwdCallBack.getPassword();

                password = null;
                if(pwdBytes != null) {
                    password = new String(pwdBytes);
                    passwdCallBack.clearPassword();
                }

                username = nameCallBack.getName();
            } catch (IOException var11) {
                throw new SaslException("Cannot get password", var11);
            } catch (UnsupportedCallbackException var12) {
                throw new SaslException("Cannot get userid/password", var12);
            }
        }
        return new SaslClient(username,password);
    }

    @Override
    public String[] getMechanismNames(Map<String, ?> props) {
        return myMechs;
    }

}

SaslClient

SaslClient中需要实现以下几个功能:

  • 保存username和password
  • 保存Sasl客户端的校验状态(complete)
  • 判断服务端的报文,是需要发送校验数据还是校验通过
  • 构造加密报文

本例中的加密报文构造方法是

  1. 先构造出一个byte数组:格式为[0,username.getBytes(),0,password.getBytes()]
  2. 将构造出的byte数组加密(方法略),可以直接使用SHA-256不可逆加密,得到一个新的byte数组encrypt。
  3. 将用户名放在encrypt前面变成最终发送给服务端的报文,格式为[0, username.getBytes(), 0, encrypt]
public class ABCSaslClient implements SaslClient{
    public static final String ABC_MECHANISM="ABC";
    private String username;
    private String password;
    private boolean complete=false;
    private byte SEP=0;
    
    public ABCSaslClient(String username, String password) throws SaslException {
        if (username == null || password == null) {
            throw new SaslException("username/password must be specified");
        }

        this.username=username;
        this.password=password;
    }

    @Override
    public String getMechanismName() {
        return ABC_MECHANISM;
    }

    // hasInitialResponse()返回false表示INITIAL发送一个空包给服务端,返回true则表示会发送一个自定义的包,详见SaslClientAuthenticator.createSaslToken
    @Override
    public boolean hasInitialResponse() {
        return false;
    }

    @Override
    public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
        if(this.complete) {
            throw new IllegalStateException("Authentication already completed");
        } else {
            //收到服务端返回的空包,表示通过校验,complete置为true并退出
            if (challenge.length == 0) {
                this.complete = true;
                return null;
            }
            /**
             * 如果收到的不是空包,表示可以构造校验报文了,这里有个小trick,可以在服务端生成随机数或时间戳,作为server token发送到客户端
             * 客户端可以将server token中的数据放到加密串中,增强报文的安全性。
             */

            byte[] data = ABCUtil.join(SEP, username.getBytes("UTF-8"),password.getBytes("UTF-8"));
            byte[] encrypt = ABCUtil.encrypt(data);

            byte[] response = ABCUtil.join(SEP, username.getBytes("UTF-8"), encrypt);
            return response;
        }
    }

    @Override
    public boolean isComplete() {
        return this.complete;
    }

    @Override
    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
        if(this.complete) {
            throw new SaslException("PLAIN supports neither integrity nor privacy");
        } else {
            throw new IllegalStateException("PLAIN authentication not completed");
        }
    }

    @Override
    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
        if(this.complete) {
            throw new SaslException("PLAIN supports neither integrity nor privacy");
        } else {
            throw new IllegalStateException("PLAIN authentication not completed");
        }
    }

    @Override
    public Object getNegotiatedProperty(String propName) {
        if(this.complete) {
            return propName.equals("javax.security.sasl.qop")?"auth":null;
        } else {
            throw new IllegalStateException("PLAIN authentication not completed");
        }
    }

    @Override
    public void dispose() throws SaslException {
        password=null;
    }

}

至此,完成客户端的部分。下面构造服务端的SaslServer。

SaslServerFactory

SaslServerFactory和Client很类似,需要实现的功能有:

  • 判断client的安全机制是否是ABC
  • 构造出SaslServer

代码比SaslClientFactory更简单一些

public class ABCSaslServerFactory implements SaslServerFactory{
    private static final String[] myMechs = new String[]{ABCSaslServer.ABC_MECHANISM};

    @Override
    public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
            CallbackHandler cbh) throws SaslException {
        if (!mechanism.equals(ABCSaslClient.ABC_MECHANISM)) {
            throw new SaslException("Mechanism ERROR, only support mechanism:" + ABCSaslServer.ABC_MECHANISM);
        }       
        return new ABCSaslServer(cbh);
    }

    @Override
    public String[] getMechanismNames(Map<String, ?> props) {
        return myMechs;
    }
}

SaslServer

SaslServer需要处理两种报文,需要实现的功能和SaslClient类似:

  • 记录校验状态(complete)
  • 收到client端INITIAL阶段送的报文,处理并返回server token
  • 收到client端的校验数据,校验信息
  • 校验通过返回空包

这里有两个点需要注意,一个是complete一定要有,还有就是authorizationID一定要有,因为都是在Kafka中会被调用的方法。

public class ABCSaslServer implements SaslServer {
    public static final String ABC_MECHANISM = "ABC";
    private boolean complete = false;
    private String authID;

    public ABCSaslServer(CallbackHandler cbh) throws SaslException {

    }

    @Override
    public String getMechanismName() {
        return ABC_MECHANISM;
    }

    @Override
    public boolean isComplete() {
        return this.complete;
    }

    @Override
    public Object getNegotiatedProperty(String propName) {
        if (!complete)
            throw new IllegalStateException("Authentication exchange has not completed");
        return null;
    }

    @Override
    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
        if (!complete)
            throw new IllegalStateException("Authentication exchange has not completed");
        return Arrays.copyOfRange(incoming, offset, offset + len);
    }

    @Override
    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
        if (!complete)
            throw new IllegalStateException("Authentication exchange has not completed");
        return Arrays.copyOfRange(outgoing, offset, offset + len);
    }

    @Override
    public void dispose() throws SaslException {
    }

    @Override
    public byte[] evaluateResponse(byte[] response) throws SaslException {
        //收到客户端INITIAL阶段发送的空包,返回一个server token
        if (response.length == 0) {
            return "ABC by MisterCH".getBytes("UTF-8");
        } 
        String[] tokens;
        try {
            tokens = new String(response, "UTF-8").split("\u0000");
        } catch (UnsupportedEncodingException e) {
            throw new SaslException("UTF-8 encoding not supported", e);
        }
        if (tokens.length != 3)
            throw new SaslException("Invalid SASL/ABC response: expected 3 tokens, got " + tokens.length);
        // 从包中提取出用户名和加密串
        authID = tokens[0];
        String username = tokens[1];
        String encrypt = tokens[2];

        if (username.isEmpty()) {
            throw new SaslException("Authentication failed: username not specified");
        }
        if (authID.isEmpty())
            authID = username;
        // 在服务端拿到用户对应的密码
        String password = ABCUtil.getPassword(username);

        byte[] data = ABCUtil.join(SEP, username.getBytes("UTF-8"),password.getBytes("UTF-8"));
        byte[] expectedToken = ABCUtil.encrypt(data);
        
        // 比对服务端加密的串和客户端加密的串是否一致(对称加密),一致则通过校验
        if (!ABCUtils.isEqual(encrypt, expectedToken);) {
            throw new SaslException("Authentication failed: Invalid username or password");
        }
        complete = true;
        return new byte[0];
    }

    @Override
    public String getAuthorizationID() {
        return this.authID;
    }
}

配置修改

写完这些代码,打包成jar文件,而后放在kafka的libs目录下。服务端需要配置jaas文件作为jaas的入口,并修改server.properties。

KafkaServer{
  com.company.MisterCH.ABCLoginModule required
  username="test"
  password="test"
  user_ch="chsss";
};

修改Kafka的启动脚本不赘述,server.properties中需要修改几个地方

#broker内部通信使用PLAIN机制9092端口,外部通信使用SASL_PLAINTEXT的9093端口
listeners=PLAINTEXT://hostname:9093,SASL_PALINTEXT://hostname:9092
sasl.mechanism=ABC
security.protocol=SASL_PLAINTEXT

客户端可以不用配置jaas文件,直接在producer.properties或consumer.properties中修改如下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=ABC
sasl.jaas.config=com.company.MisterCH.ABCLoginModule required \
   ch \
   chsss \;

DONE

开脑洞环节

通过这本文中的代码,大家可以自己构造出属于自己的安全机制。本文中省略了密码加密,通过用户获取密码这些方法,这些方法都可以自己实现。此外,更进一步还可以实现这些功能:

  1. 客户端密码加密:客户端配置明文,在Provider初始化时,判断是否是明文,如果是则将明文中的密码给自动加密。
  2. 客户端密码获取:从别的位置获取用户密码,这个可以在LoginModule中配置username和password的获取方法,比如从用户中心中拿到等等。
  3. 密码密文传输:看了下阿里的源码,阿里的校验报文构造时,会将服务端的server token中的时间戳和随机数放到报文中,保证每次传输的加密报文都不一样。
  4. 服务端密码获取:从客户端的报文中拿到用户名以后,服务端可以从任意位置获取密码,比如用户中心,比如一个定时热加载的配置文件。由于kafka一般是集群部署,推荐有一个集中的位置来存放用户密码信息,这样可以实现动态用户密码的更新。当然,服务端去用户中心获取密码的动作不能是由客户端连接触发,而是应该另开线程定时更新。
  5. 客户端其他信息校验:服务端收到的客户端鉴权信息中,可以要求服务端带上其他信息,比如kafka客户端的版本号,比如客户端的IP信息等等,实现版本控制,IP黑白名单等等简单的功能,还能将每个通过校验的连接具体信息发送到管理系统,用于视图,监控等等。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356