Kafka系列之(6)——Kafka Consumer

1、Consumer工作过程

Consumer工作过程.png

(1)、在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers。在ConsumerMetadataResponse中,它接收消费者对应的消费组所属的协调节点GroupCoordinator的位置信息。
(2)、消费者连接协调节点GroupCoordinator,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点GroupCoordinator已经在初始化平衡。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前消费组的新的generation编号。这个时候消费组管理已经完成,消费者就可以开始抓取数据,并为它拥有的partitions提交offsets。
(3)、如果HeartbeatResponse没有错误返回,消费者会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。
交互数据格式
图片.png

对于每个消费者组,GroupCoordinator存储的数据:

  1. 对每个存在的topic,可以有多个消费组订阅同一个topic(对应消息系统中的广播)
  2. 对每个消费组,元数据如下:
    消费组订阅的topics列表
    Group配置信息,包括session timeout等
    组中每个消费者的元数据。消费者元数据包括主机名,consumer id
    每个正在消费的topic partition的当前offsets
    Partition的ownership元数据,包括consumer到分配给消费者的partitions映射
    GroupCoordinator工作过程参加上篇:
    Consumer id的分配
    消费者启动后,从协调者接收到的第一次JoinGroupResponse中有consumer id。从这里开始,消费者的每次心跳以及提交offset请求都必须要包含这个consumer id,作为消费者的唯一标识。协调者在成功rebalance时,会为消费者分配一个consumer id。(rebalance之前,协调者也会根据JoinGroupRequest中consumer id判断是否消费者都重新申请入组)
    如果消费者发送的JoinGroupRequest带了consumer id,但是不匹配当前组成员的ids,协调者会在JoinGroupResponse中返回UnknownConsumer错误码,避免这个消费者加入到不认识的消费组中。这也不会触发组中其他消费者的rebalance操作。

2、KafkaConsumer示例和offset提交

消费者可以定时自动地提交offset,或者手动控制什么时候提交offset。
手动提交时,使用commitSync手动提交commitOffset,会阻塞调用线程,直到offsets成功被提交,或者在提交过程中发生错误。使用commitAsync则是非阻塞方式,会在成功提交或者失败时,触发OffsetCommitCallback回调函数的执行。
手动提交适合的场景
手动提交适合消息消费和业务处理逻辑耦合的场景。比如我们消费了一批记录,并且在内存中暂时保存,当有足够的记录时插入到数据库中,插入数据库成功,才允许提交offset。这样可以保存数据插入数据库成功才算正常消费消息。但是也会出现另外一种情况,插入数据库成功后,应用挂掉,导致提交offset失败,应用恢复是就会重新消费该消息。就是说,对于kafka而言,只能保证消息”至少发送一次”,但不能保证”正好一次”(交给了客户端自己实现,如可以将offset存储到kafka外部,保存offset和消费结果放在同一个事务里)。
自动提交

// 配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");

// 创建消费者实例, 并且订阅topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

// 消费者消费消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

手动提交

props.put("enable.auto.commit", "false");  // 设置autoCommit为false

int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
        if (buffer.size() >= commitInterval) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }
    }
}

消费者订阅指定分区(静态订阅)
在动态分配partition的场景下,消费者的加入和删除,都会导致partition的重新分配给其他的消费者。而静态分配partition下,如果消费者挂掉后,分配给这个消费者的partition并不会负载给其他消费者。静态分配partition的模式,消费者不是订阅主题,而是订阅指定的partition。

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
KafkaConsumer consumer = new KafkaConsumer(props);

// subscribe to some partitions of topic foo
TopicPartition partition0 = new TopicPartition("foo", 0);
TopicPartition partition1 = new TopicPartition("foo", 1);
TopicPartition[] partitions = new TopicPartition[2];
partitions[0] = partition0;
partitions[1] = partition1;
consumer.subscribe(partitions);

// seek to the last committed offsets to avoid duplicates
Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
consumer.seek(lastCommittedOffsets);        

// find the offsets of the latest available messages to know where to stop consumption
Map<TopicPartition, Long> latestAvailableOffsets = 
    consumer.offsetsBeforeTime(-2, partition0, partition1);
boolean isRunning = true;
Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
while(isRunning) {
    Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
    Map<TopicPartition, Long> lastConsumedOffsets = process(records);
    consumedOffsets.putAll(lastConsumedOffsets);
    // commit offsets for partitions 0,1 for topic foo to custom store,offset存储到kafka外部
    commitOffsetsToCustomStore(consumedOffsets);
    for(TopicPartition partition : partitions) {
        if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
            isRunning = false;
        else isRunning = true;
    }            
}      
commitOffsetsToCustomStore(consumedOffsets);   
consumer.close();

多线程消费
kafka的消费者(KafkaConsumer对象)并不是线程安全的。客户端代码需要自己确保多线程的访问是同步的。 唯一例外的是wakeup方法(是线程安全的):它可以被外部线程用来安全地中断一个进行中的操作。对于阻塞在wakeup方法上的线程会抛出WakeupException。可以被另外的线程用来作为关闭consumer的钩子。

public class KafkaConsumerRunner implements Runnable {
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final KafkaConsumer consumer;

   public void run() {
       try {
           consumer.subscribe("topic");
           while (!closed.get()) {
               ConsumerRecords records = consumer.poll(10000);
               // 处理新的记录
           }
       } catch (WakeupException e) {
           if (!closed.get()) throw e; //如果关闭了忽略异常
       } finally {
           consumer.close();
       }
   }
   // 关闭钩子,可以在另一个线程中调用
   public void shutdown() {
       closed.set(true);
       consumer.wakeup();
   }
}
  1. 一个线程一个消费者

每个线程都有自己的消费者实例,消息消费逻辑和消息处理逻辑都在消费者线程中完成。这种方式的利弊:

优点:很容易实现,执行很快,因为没有线程之间的交互和协调。
优点:对于每个partition要保证顺序处理比较容易实现。每个线程只需要按照顺序处理它接收到的消息即可。
缺点:更多的消费者意味着集群的TCP连接也很多。不过kafka处理连接是很高效的,所以这个代价并不是很大。
缺点:多个消费者意味着发送更多的请求给服务器,每一批发送的数据变少(发送更多批),就会降低I/O吞吐量。
缺点:所有进程之间的线程数量会被partitions的数量所限制。
  1. 解耦消费和处理逻辑,共享消费者线程

另一种方式是有一个或多个消费者线程用来消费消息,并将消费结果ConsumerRecords转移一个阻塞队列中,
它会被消息处理线程池消费,消息处理线程顾名思义就是处理消息的线程。这种方式的利弊:

优点:可以相互独立地扩展消费者数量和处理器数量。可以只用一个消费者线程服务于多个处理线程,避免partitions的限制。
缺点:在处理器线程之间保证消息处理的顺序是比较困难的。因为线程之间是独立的,线程之间的顺序是无法保证的。所以即使是比较早的数据块也有可能比靠后面的数据块更晚被处理到。如果要求消息的处理是无序的,当然是没有问题的。
缺点:手动提交offset变得困难,因为它需要所有的线程协调起来确保这个partition的消息已经被处理完毕。

解决上面的缺点有多种方式。比如每个处理线程都可以有自己的队列,消费者可以对TopicPartition的hash结果放入不同处理线程的队列中,这样也可以确保消息被顺序地消费,并且简化提交offset的逻辑。

ConsumerRebalanceListener
ConsumerRebalanceListener用于在Rebalance之后,添加回调逻辑。
onPartitionsAssigned监听分区分配事件
onPartitionsRevoked监听分区撤销事件

KafkaConsumer consumer = new KafkaConsumer(props,
    new ConsumerRebalanceListener() {
        boolean rewindOffsets = true;  // should be retrieved from external application config
        public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
            Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
            if(rewindOffsets)
                Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
            consumer.seek(newOffsets);
        }
        public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
            consumer.commit();
        }
        // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
        private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
                                                        long numberOfMessagesToRewindBackTo) {
            Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
            for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) 
                newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
            return newOffsets;
        }
});
consumer.subscribe("foo", "bar");
//...同上调用了process消费消息,并保存到consumedOffsets内存中
consumer.close();

控制消费者的position
kafka允许通过seek(TopicPartition,long)指定新的位置,或者seekToBeginning,seekToEnd定位到最早或最近的offset。

int commitInterval = 100;
int numRecords = 0;
boolean isRunning = true;
Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
while(isRunning) {
    Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
    try {
        Map<TopicPartition, Long> lastConsumedOffsets = process(records);
        consumedOffsets.putAll(lastConsumedOffsets);
        numRecords += records.size();
        // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
        if(numRecords % commitInterval == 0) consumer.commit();
    } catch(Exception e) {
        try {
            // rewind consumer's offsets for failed partitions
            // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
            List<TopicPartition> failedPartitions = failedPartitions();   
            Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
            for(TopicPartition failedPartition : failedPartitions) {
                // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
                // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
                offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
            }
            // seek to new offsets only for partitions that failed the last process()
            consumer.seek(offsetsToRewindTo);
        } catch(Exception e) {  break; } // rewind failed
    }
}
consumer.close();

private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
     Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
     for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
          List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
          for(int i = 0;i < recordsPerTopic.size();i++) {
               ConsumerRecord record = recordsPerTopic.get(i);
               // process record
               processedOffsets.put(record.partition(), record.offset());                
          }
     }
     return processedOffsets; 
}

refer:
http://blog.csdn.net/u014393917/article/details/52043317
http://www.cnblogs.com/huxi2b/p/6124937.html
http://blog.csdn.net/louisliaoxh/article/details/51577117
http://blog.csdn.net/chunlongyu/article/details/52663090
http://blog.csdn.net/u014393917/article/details/52043317
http://zqhxuyuan.github.io/2016/02/22/2016-02-22-Kafka-Consumer-new/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容