kafka 0.10.1一些使用经验

概述

  • 最近公司对老版本的kafka做升级,我们的集群很小,就三台机器。主要用来爬取数据实时任务传输用的。老版本用的0.8版本的,这个版本zookeeper的依赖还是比较大,每次kafka读取消费者topic偏移量都是从zk上读过来,连接消耗比较大。在kafka0.9版本后,就不依赖zk去记录offset的位置了,而是统一记录在broker端,通过一个内置的一个topic :_consumer_offsets (来记录各个消费组内格topic的位置。下面就集群搭建和使用过程中一些坑记录一下,记录下学习笔记

集群搭建

  • 集群搭建很简单,主要步骤网上都有,我这里就记录下自己主要配置
    zookeeper的

dataDir=/home/maijia/zookeeper-data-new --这个是zk日志目录
clientPort=2182 --配置的zk端口
initLimit=10
syncLimit=5
tickTime=2000
server.1=192.168.xx.xx:4888:5888
server.2=192.168.xx.xx:4888:5888 ---配置的zk选取端口,采用的是内网ip,只要互通都没有问题
server.3=192.168.xx.xx:4888:5888

  • 下面是kafka broker部分配置,其他都是采用默认

broker.id=4 --broker代号,这个唯一,不同机器不同即可
port=9093
listeners=PLAINTEXT://xx.xx.xx.xx:9093 --监听当台机器外网ip地址和kafka的端口。消费者和生产者都会连接这个地址进行通信
log.dirs=/home/maijia/kafka-logs-new
zookeeper.connect=192.168.xx.xx:2182,192.168.xx.xx:2182,192.168.xx.xx:2182
zookeeper.connection.timeout.ms=20000
delete.topic.enable=true --配置为true就是删除topic比较方便,命令行可直接删除无用topic

这里说明一下几个broker参数


image.png
  • 这几个和listeners只需要使用listeners,1和3是过时的,老版本出现过。第二个主要是把监听发布到zk上。总之这几个只需配置listeners这一个就行了。启动kafka时候需要解压包里bin下命令文件直接启动。具体命令行命令下面再说。我之前就是一直用老版本的bin目录下启动脚本配上上面的配置,消费者死活不成功。所以一定要版本一致。

kafka命令行使用

生产者和消费者爬坑

使用API maven地址

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
  • 生产者还好,没有遇到大问题,采用java客户端api最好和集群的kafka的版本一致,这样防止出现一些莫名其妙的幺蛾子,生产者注意发送内容的key和value的字符格式解析,

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
我这边是使用key采用string解析,value采用字符数组

消费者使用详细说下。新版本的消费者变化比较大
1.首先消费者现在可以支持手动提交offset,并且手动支持俩种方式同步和异步,我记得老版本好像都是只是自动提交。
2.消费者可以自己指定消费的分区和位置。以前老版本想改的,只能通过修改zk上对应节点值才能做到。
这俩点在使用中变化比较大

  • 下面说下具体的消费者配置,常规的
        props.put("enable.auto.commit", "true");
        //自动提交间隔
        props.put("auto.commit.interval.ms", 1000);
        props.put("max.poll.interval.ms",300000);
        props.put("max.poll.records",10);  
        //设置消费者心跳间隔
        props.put("heartbeat.interval.ms",3000);
        props.put("session.timeout.ms", 10000); 

使用过程中重点遇到问题消费者消费一段时间后,停止消费了,offset位置一直没发生变化。调了很久才知道,consumer.poll(100);消费者每次poll阻塞拉取的时候拉取的任务太多,然而数据处理程序太慢,俩次poll之间时间差超过max.poll.interval.ms这个配置里的值,broker就认为这个消费者挂了,就会重新把它从组内删除,并且重新平衡。后来通过设置max.poll.records这个值来设定每次poll拉取最多拉取任务就可以了。poll方法里的参数是每次拉取的阻塞时间ms。

下面具体配置说明一下

session.timeout.ms 这个值是会话超时时间,什么意思了,就是说如果发送心跳时间超过这个时间,broker就会认为消费者死亡了,默认值是10000ms,也就是10s(这个值一般默认没问题)
heartbeat.interval.ms 这个值是心跳时间,表示多长时间想broker报告一次,这个默认值3000ms,这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)
enable.auto.commit 是否启用自动提交。
auto.commit.interval.ms 自动提交间隔
max.poll.interval.ms 每俩次poll拉取数据时间间隔最大超时时间,超过这个值,broker就会认为你这个消费者挂了,并且重新平衡,这时候就消费不到信息了,如果你用kafka自带的命令行工具查看
sh kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group group2 --describe 就会有这样的显示
Consumer group group2 is rebalancing
max.poll.records 这个值的意思是每次poll拉取数据的最大任务数,设置为5,就是一次poll里拉取5条偏移量数据
key.deserializer 序列化解析key值这个根据消费者配置而来org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 序列化解析value值。org.apache.kafka.common.serialization.StringDeserializer
如果生产者或者消费者采用不同字符解析器,采取对应配置,例如 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
我这个comsumer使用的是string字符解析格式 还有ByteArraySerializer字符数组这种字符格式。org.apache.kafka.common.serialization都是在这个包里

  • 贴一下代码

        Properties props = new Properties();
        //服务器位置
        props.put("bootstrap.servers", "xxxxx");
        //消费组id
        props.put("group.id", "group3");
        //是否启动自动提交
        props.put("enable.auto.commit", "false");
        //自动提交间隔
        props.put("auto.commit.interval.ms", 1000);
        props.put("auto.offset.reset", "latest");
        props.put("max.poll.interval.ms",300000);
        props.put("max.poll.records",10);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//        动态从分区获取消息,负载均衡的获取消息,如果想手动指定位置分区和offset,则使用consumer.assign();
         consumer.subscribe(Arrays.asList("fsc"));

//        手动指定方式从分区获取数据
//        TopicPartition p0=new TopicPartition("fsc",0);
//        TopicPartition p1=new TopicPartition("fsc",1);
//        TopicPartition p2=new TopicPartition("fsc",2);
//        List<TopicPartition> topicPartitionList=new ArrayList<TopicPartition>();
//        topicPartitionList.add(p0);
//        topicPartitionList.add(p1);
//        topicPartitionList.add(p2);
////        指定分区和offset方式消费数据,
//        consumer.assign(topicPartitionList);
////         调到所有分区最开始的位置
//        consumer.seekToBeginning(topicPartitionList);
////        调到分区最后的位置
//        consumer.seekToEnd(topicPartitionList);
////        指定分区和offset进行消费
//        consumer.seek(p2,40);
//        consumer.seek(p1,40);
        System.out.println("start consumering");
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String,String> record : records) {
//              在这里进行插入数据库操作,数据在valus里的json格式
                System.out.println("分区:"+record.partition()+" offset:"+record.offset()+"key:"+record.key());
//              同步提交     
                consumer.commitSync();
            }
        }

记录完毕

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,715评论 13 425
  • 发行说明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0发行版中解决的JIRA问题的摘要。有关该...
    全能程序猿阅读 2,854评论 2 7
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,311评论 1 15
  • 记忆 如落叶浮萍 镜花水月 存在时多绚丽 再回首就有多苦痛 记忆 如利刀重锤 一雕一刻 深入骨髓 对你而言 我不过...
    一笑误终生阅读 428评论 2 3