阿里开源Canal--⑤投递到Kerberos认证的Kafka

在前一章节中,Billow介绍了如何通过1.1.1以上的canal配置将binlog数据投递到kafka。在实际的生产环境中,我们的kafka很多都会集成Kerberos作为安全认证。那么在本节,Billow将介绍如何通过修改源码使Canal可配置为投递数据到Kerberos认证的Kafka集群。

1.导入Canal源码

canal已开源到github。下载地址为:https://github.com/alibaba/canal.git

1.1 在idea中导入git项目。

导入后的项目目录为:


1.2 修改canal启动类

canal独立版本的入口类为:com.alibaba.otter.canal.deployer.CanalLauncher

在该类的main方法中,做了以下几件事情:
1、加载配置。
2、根据配置启动CanalStater

...
...
logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            RemoteConfigLoader remoteConfigLoader = null;
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
...
...
final CanalStater canalStater = new CanalStater();
            canalStater.start(properties);

在CanalStater.start方法中,通过配置项初始化MQ的生产者。此处Billow配置为Kafka,因此我们只关注kafka。


在初始化CanalKafkaProducer之后,会读取配置文件中的mq配置。


在canal.properties中的mq配置如下:

##################################################
#########            MQ              #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false


canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"

其中canal.mq.kafka.kerberos为前缀的配置是Billow的自定义kerberos配置项。说明:

  • canal.mq.kafka.kerberos.enable
    此配置项为true跟false,为true时表示kafka集群开启了kerberos认证,那么会读取接下来的两个配置项内容。

  • canal.mq.kafka.kerberos.krb5FilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为kerberos集群中的krb5.conf文件。示例:

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 default_realm = HADOOP.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true

[realms]
 BETA.COM = {
  kdc = hadoop1.com
  admin_server = hadoop1.com
 }

[domain_realm]
 .hadoop1.com = HADOOP.COM
 hadoop1.com = HADOOP.COM

  • canal.mq.kafka.kerberos.jaasFilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为连接kafka时的jaas配置项。示例:
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="E:/resources/billow.keytab"
   principal="billow@HADOOP.COM"
   client=true;
};

此处Billow在配置文件中配置了自定义的配置项,那么在代码中,需要添加这几项配置项的读取。
CanalStater的buildMQProperties方法中添加配置项的读取。

/**
     * 构造MQ对应的配置
     * 
     * @param properties canal.properties 配置
     * @return
     */
    private static MQProperties buildMQProperties(Properties properties) {
        MQProperties mqProperties = new MQProperties();
        ......
        ......
        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
        if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
            mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
        }
        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
            mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
        }
        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
            mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
        }
        return mqProperties;
    }

对应的CanalConstants类中,添加常量信息配置:

/**
 * 启动常用变量
 *
 * @author jianghang 2012-11-8 下午03:15:55
 * @version 1.0.0
 */
public class CanalConstants {
...
...
    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE    = ROOT + "." + "mq.kafka.kerberos.enable";
    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...

1.3 配置CanalKafkaProducer

上一小节中,Billow介绍了如何添加关于Kerberos的开关配置。在这节我们来看看如何配置kafkaProducer为安全模式。

观察源码发现,在CanalStater的start方法中初始化了一个CanalKafkaProducer对象。在此对象的init方法里面,有关于kafkaproduct的相关配置。
在此处,Billow添加了判断,如果配置文件中开启了kerberos认证,那么就会配置kafkaProperty为安全模式。并添加了系统环境配置。

 if (kafkaProperties.isKerberosEnable()){
            //kafka集群开启了kerberos认证
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }

具体位置为:

public class CanalKafkaProducer implements CanalMQProducer {

...
...


    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        if(kafkaProperties.getTransaction()){
            properties.put("transactional.id", "canal-transactional-id");
        } else {
            properties.put("retries", kafkaProperties.getRetries());
        }
        if (kafkaProperties.isKerberosEnable()){
            //kafka集群开启了kerberos认证
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            producer = new KafkaProducer<String, Message>(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            producer2 = new KafkaProducer<String, String>(properties);
        }
        if (kafkaProperties.getTransaction()) {
            if (!kafkaProperties.getFlatMessage()) {
                producer.initTransactions();
            } else {
                producer2.initTransactions();
            }
        }
    }

...
...
}

2、测试

修改好源码后,编译打包。

mvn clean install -Dmaven.test.skip -Denv=release

命令执行成功后会在项目的target文件夹下面生成压缩包:



将deployer包拷贝至服务器,配置好集群环境的krb5.conf、jaas.conf以及canal.properties文件。启动canal,查看日志,并启动kafka消费者进行数据的消费。
Billow已测试成功,有不懂的童鞋可以私信公众号问~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容