Kafka 新版消费者 API(四):优雅的退出消费者程序、多线程消费者以及独立消费者

1. 优雅的退出消费者程序

package com.bonc.rdpe.kafka110.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/**
 * @author YangYunhe
 * @date 2018-07-17 11:05:39
 * @description: 优雅的退出消费者
 */
public class QuitConsumer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", "dev3-yangyunhe-topic001-group005");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
        consumer.subscribe(Arrays.asList("dev3-yangyunhe-topic001"));
        
        final Thread mainThread = Thread.currentThread();
        
        /*
         * 退出循环需要通过另一个线程调用consumer.wakeup()方法
         * 调用consumer.wakeup()可以退出poll(),并抛出WakeupException异常
         * 我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式
         * consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的方法
         * 如果循环运行在主线程里,可以在 ShutdownHook里调用该方法
         */ 
        Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 System.out.println("Starting exit...");
                 consumer.wakeup();
                 try {
                     // 主线程继续执行,以便可以关闭consumer,提交偏移量
                     mainThread.join();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
        });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() 
                        + ", offset = " + record.offset());
                }
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            // 不处理异常
        } finally {
            // 在退出线程之前调用consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向组协调器发送消息,告知自己要离开群组。
            // 接下来就会触发再均衡,而不需要等待会话超时。
            consumer.commitSync();
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }   
    }
}

2. 多线程消费者

KafkaConsumer是非线程安全的,多线程需要处理好线程同步,多线程的实现方式有多种,这里介绍一种:每个线程各自实例化一个KakfaConsumer对象,这种方式的缺点是:当这些线程属于同一个消费组时,线程的数量受限于分区数,当消费者线程的数量大于分区数时,就有一部分消费线程一直处于空闲状态

多线程消费者的线程实现类代码如下:

package com.bonc.rdpe.kafka110.thread;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:48:45
 * @description: 多线程消费者的线程实现类
 */
public class ConsumerLoop implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop(int id, String groupId, List<String> topics) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(props);
      }

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    data.put("value", record.value());
                    System.out.println(this.id + ": " + data);
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }

}

多线程消费者主程序代码如下:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.bonc.rdpe.kafka110.thread.ConsumerLoop;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:39:25
 * @description: 多线程消费者主程序
 */
public class MultiThreadConsumer {

    public static void main(String[] args) { 
          
        int numConsumers = 3;
        String groupId = "dev3-yangyunhe-topic001-group004";
        List<String> topics = Arrays.asList("dev3-yangyunhe-topic001");
        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

        final List<ConsumerLoop> consumers = new ArrayList<>();
        for (int i = 0; i < numConsumers; i++) {
            ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
            consumers.add(consumer);
            executor.submit(consumer);
        }

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                for (ConsumerLoop consumer : consumers) {
                    consumer.shutdown();
                } 
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

运行结果:
2: {partition=2, offset=1216, value=...}
......
1: {partition=1, offset=1329, value=...}
......
0: {partition=0, offset=1292, value=...}
......

3. 独立消费者

有时候你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。以下是独立消费者的示例代码:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/**
 * @author YangYunhe
 * @date 2018-07-17 12:44:50
 * @description: 独立消费者
 */
public class AloneConsumer {
    
    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        // 独立消费者不需要设置消费组
        // props.put("group.id", "dev3-yangyunhe-topic001-group003");
        props.put("auto.offset.reset", "earliest");
        props.put("auto.commit.offset", false);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /*
         * consumer.partitionsFor(topic)用于获取topic的分区信息
         * 当有新的分区加入或者原有的分区被改变后,这个方法是不能动态感知的
         * 所以要么周期性的执行这个方法,要么当分区数改变的时候,你需要重新执行这个程序
         */
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("dev3-yangyunhe-topic001");
        List<TopicPartition> partitions = new ArrayList<>();
        
        if(partitionInfos != null && partitionInfos.size() != 0) {
            for (PartitionInfo partition : partitionInfos) {
                partitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            
            consumer.assign(partitions);
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
                    }
                    consumer.commitAsync();
                }
            } finally {
                consumer.commitSync();
                consumer.close();
            } 
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 原文地址 当kafka最初被创建的时候,它附带一个Scala的生产者和消费者客户端。随着时间的推移,我们逐渐意识到...
    明翼阅读 6,604评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 1. 订阅主题 (1)订阅主题的全部分区 (2) 用正则表达式来订阅主题的全部分区 (3) 订阅指定的分区 2. ...
    CoderJed阅读 3,022评论 0 2
  • 骑行中,无意间看到一棵树,瞬间有一种被震撼到的感觉。也许是落日相映,也许是心有所悟,仿佛在苍穹之中勃然英发。 独自...
    牧戎云越阅读 338评论 0 0
  • 也许一个人最好的状态,就是你的本事配的上你的情怀,你可以脚踏实地,又可以仰望星空,从容不迫的与岁月相处。而你心中所...
    左眼殇暮光阅读 250评论 5 12