使用kafka connect,将数据批量写到hdfs完整过程

版权声明:本文为博主原创文章,未经博主允许不得转载

本文是基于hadoop 2.7.1,以及kafka 0.11.0.0。kafka-connect是以单节点模式运行,即standalone。


一. 首先,先对kafka和kafka connect做一个简单的介绍

kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。比较直观的解释就是其有一个生产者(producer)和一个消费者(consumer)。可以将kafka想象成一个数据容器,生产者负责发送数据到这个容器中,而消费者从容器中取出数据,在将数据做处理,如存储到hdfs。

kafka connect:Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。即适合批量数据导入导出操作。


二. 下面将介绍如何用kafka connect将数据写入到hdfs中。包括在这个过程中可能碰到的一些问题说明。

首先启动kafka-connect:

bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties

这个命令后面两个参数,

  第一个是指定启动的模式,有分布式和单节点两种,这里是单节点。kafka自带,放于config目录下。

  第二个参数指向描述connector的属性的文件,可以有多个,这里只有一个connector用来写入到hdfs。需要自己创建。

接下来看看connector1.properties的内容,

name="test" #该connector的名字

#将自己按connect接口规范编写的代码打包后放在kafka/libs目录下,再根据项目结构引用对应

connector connector.class=hdfs.HdfsSinkConnector

#Task是导入导出的具体实现,这里是指定多少个task来并行运行导入导出作业,由多线程实现。由于hdfs中一个文件每次只能又一个文件操作,所以这里只能是1

tasks.max=1

#指定从哪个topic读取数据,这些其实是用来在connector或者task的代码中读取的。 topics=test #指定key以那种方式转换,需和Producer发送方指定的序列化方式一致 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.json.JsonConverter #同上

hdfs.url=hdfs://127.0.0.1:9000  #hdfs的url路径,在Connector中会被读取 hdfs.path=/test/file  #hdfs文件路径,同样Connector中被读取

key.converter.schemas.enable=true  #稍后介绍,可以true也可以false,影响传输格式 value.converter.schemas.enable=true  #稍后介绍,可以true也可以false

三. 接下来看代码,connect主要是导入导出两个概念,导入是source,导出时Sink。这里只使用Sink,不过Source和Sink的实现其实基本相同。

实现Sink其实不难,实现对应的接口,即SinkConnector和SinkTask两个接口,再打包放到kafka/libs目录下即可。其中SinkConnector只有一个,而Task可以有多

先是Connector

publicclassHdfsSinkConnectorextends SinkConnector {

    //这两项为配置hdfs的urlh和路径的配置项,需要在connector1.properties中指定publicstaticfinalString HDFS_URL = "hdfs.url";

    publicstaticfinalString HDFS_PATH = "hdfs.path";

    privatestaticfinalConfigDef CONFIG_DEF =new ConfigDef()

            .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")

            .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");

    private String hdfsUrl;

    private String hdfsPath;

    @Override

    public String version() {

        returnAppInfoParser.getVersion();    }

//start方法会再初始的时候执行一次,这里主要用于配置    @Override

publicvoidstart(Map props) {

        hdfsUrl = props.get(HDFS_URL);

        hdfsPath = props.get(HDFS_PATH);

    }

  //这里指定了Task的类

    @Override

    publicClass taskClass() {

        returnHdfsSinkTask.class;

    }

  //用于配置Task的config,这些都是会在Task中用到

    @Override

    publicList> taskConfigs(int maxTasks) {

        ArrayList> configs =newArrayList<>();

        for(inti = 0; i < maxTasks; i++) {

            Map config =newHashMap<>();

            if(hdfsUrl !=null)

                config.put(HDFS_URL, hdfsUrl);

            if(hdfsPath !=null)

                config.put(HDFS_PATH, hdfsPath);

            configs.add(config);

        }

        return configs;

    }

  //关闭时的操作,一般是关闭资源。

    @Override

    publicvoid stop() {

        // Nothing to do since FileStreamSinkConnector has no background monitoring.    }

    @Override

    public ConfigDef config() {

        return CONFIG_DEF;

    }

}

接下来是Task

publicclassHdfsSinkTaskextends SinkTask {

    privatestaticfinalLogger log = LoggerFactory.getLogger(HdfsSinkTask.class);

    private String filename;

    publicstatic String hdfsUrl;

    publicstatic String hdfsPath;

    private Configuration conf;

    private FSDataOutputStream os;

    private FileSystem hdfs;

    public HdfsSinkTask(){

    }

    @Override

    public String version() {

        returnnew HdfsSinkConnector().version();

    }

  //Task开始会执行的代码,可能有多个Task,所以每个Task都会执行一次

    @Override

    publicvoidstart(Map props) {

        hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);

        hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);

        System.out.println("----------------------------------- start--------------------------------");

        conf =new Configuration();conf.set("fs.defaultFS", hdfsUrl);

        //这两个是与hdfs append相关的设置conf.setBoolean("dfs.support.append",true);

        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");

        try{

            hdfs = FileSystem.get(conf);//            connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);os = hdfs.append(new Path(hdfsPath));

        }catch (IOException e){

            System.out.println(e.toString());

        }

    }

  //核心操作,put就是将数据从kafka中取出,存放到其他地方去

    @Override

    publicvoidput(Collection sinkRecords) {

        for (SinkRecord record : sinkRecords) {

            log.trace("Writing line to {}: {}", logFilename(), record.value());

            try{

                System.out.println("write info------------------------" + record.value().toString() + "-----------------");

                os.write((record.value().toString()).getBytes("UTF-8"));

                os.hsync();

            }catch(Exception e){

                System.out.print(e.toString());

            }

        }

    }

    @Override

    publicvoidflush(Map offsets) {

        try{

            os.hsync();

        }catch(Exception e){            System.out.print(e.toString());        }    }

//同样是结束时候所执行的代码,这里用于关闭hdfs资源    @Override

publicvoid stop() {

        try {

            os.close();

        }catch(IOException e){

            System.out.println(e.toString());

        }

    }

    private String logFilename() {

        returnfilename ==null? "stdout" : filename;

    }

}

这里重点提一下,因为在connector1.propertise中设置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,所以不能用命令行形式的

producer发送数据,而是要用程序的方式,并且在producer总也要设置key的序列化形式为org.apache.kafka.common.serialization.ByteArraySerializer。

编码完成,先用idea以开发程序与依赖包分离的形式打包成jar包,然后将程序对应的jar包(一般就是“项目名.jar”)放到kafka/libs目录下面,这样就能被找到。

四. 接下来对这个过程的问题做一个汇总。

1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的问题。

这个选项默认在connect-standalone.properties中是true的,这个时候发送给topic的Json格式是需要使用avro格式,具体情况可以百度,这里给出一个样例。

{

    "schema": {

        "type": "struct",

        "fields": [{

            "type": "int32",

            "optional": true,

            "field": "c1"

        }, {

            "type": "string",

            "optional": true,

            "field": "c2"

        }, {

            "type": "int64",

            "optional": false,

            "name": "org.apache.kafka.connect.data.Timestamp",

            "version": 1,

            "field": "create_ts"

        }, {

            "type": "int64",

            "optional": false,

            "name": "org.apache.kafka.connect.data.Timestamp",

            "version": 1,

            "field": "update_ts"

        }],

        "optional": false,

        "name": "foobar"

    },

    "payload": {

        "c1": 10000,

        "c2": "bar",

        "create_ts": 1501834166000,

        "update_ts": 1501834166000

    }

主要就是schema和payload这两个,不按照这个格式会报错如下

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

  at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

如果想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。

2.在启动的过程中出现各种各样的java.lang.ClassNotFoundException。

在启动connector的时候,一开始总是会报各个各样的ClassNotFoundException,不是这个包就是那个包,查找问题一直说要么缺少包要么是包冲突。这个是什么原因呢?

其实归根结底还是依赖冲突的问题,因为kafka程序自定义的类加载器加载类的目录是在kafka/libs中,而写到hdfs需要hadoop的包。

我一开始的做法是将hadoop下的包路径添加到CLASSPATH中,这样子问题就来了,因为kafka和hadoop的依赖包是有冲突的,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,两个jar包版本不同,而我们是在kafka程序中调用hdfs,所以当jar包冲突时应该优先调用kafka的。但是注意kafka用的是程序自定义的类加载器,其优先级是低于CLASSPATH路径下的类的,就是说加载类时会优先加载CLASSPATH下的类。这样子就有问题了。

我的解决方案时将kafka和hadoop加载的jar包路径都添加到CLASSPATH中,并且kafka的路径写在hadoop前面,这样就可以启动connector成功。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容