Kafka - 新消费者

Kafka - 新消费者


一、数据来源

数据使用上一个博文所配置的 Flume,将文本数据写入到 Kafka中。不过这次有所改变,数据的监控目录 有所改变,写入的Kafka的主题名也变更为A25

flume写入数据.png-36kB
flume写入数据.png-36kB

这里我们可以看到 Flume 对于新传上去的 A91 数据已经完成消费。

二、消费者代码

2.1 创建消费者

创建消费者所使用的属性和生产者使用的属性差距不是很大:

  1. bootstrap.servers:指定了 Kafka 集群的连接字符串。
  2. key.deserializer 和 value.deserializer 与生产者的 serializer 定义也很类似,不过它们不是使用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。
  3. group.id:非必需,指定了 KafkaConsumer 属于哪一个消费者群组。

创建消费者的代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "TestConsumer");
props.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

2.2 订阅主题

consumer.subscribe(Collections.singletonList("A25"));

因为我们使用Flume对数据进行 Sinks 消费的时候,指定的主题为A25,因此我们这里在对数据进行订阅的时候,也是A25。

同时,可以对订阅的主题传递正则表达式进行匹配,一次订阅多个主题。

2.3 轮询消费

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        logger.info("records length = {}", records.count());

        for (ConsumerRecord record : records) {
            logger.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。

消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。

poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。

在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

运行结果如下:

<center>
kafka消费结果.png-30kB
kafka消费结果.png-30kB

</center>

三、其他配置

3.1 pom文件

 <properties>
    <java.version>1.8</java.version>
    <kafka.version>1.1.0</kafka.version>
</properties>


<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>

3.2 log4j.properties

log4j.rootLogger=INFO,console
log4j.additivity.org.apache=true

# 控制台(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %c  -  %m%n
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容