[Kafka] kafka consumer 0.9笔记

变化

  1. 支持ssl/sasl
  2. 支持group management protocol,允许consumer groups随着broker数增大
  3. 更小的依赖,不需要依赖kafka core

概念

  1. Consumer group
    一组消费同一个Topic的Consumer的集合,每个consumer的加入和离开会导致rebalance partition在各个consumer的分配。
    一个Brokers会充当coordinator,其保存partition的分配,和这个组的member成员。
  2. Offset Management
    从配置文件中读取offset的起始位置(最早或者最晚),提交offset有自动模式和手动模式。自动模式会每隔一段时间自动提交一次

配置

  1. Core Configuration
    总是把bootstrap.servers设置一个client.id
  2. Group Configuration
  • 设置group.id
  • session.timeout.ms,正常是30s,如果程序中使用consumer和处理在同一个线程,建议提升这个值,避免rebalance过快。唯一的缺点是探测consumer失败的时间过长导致某些partition消费慢,但是通常情况下一个consumer退出会立刻通知coordinator
  • heartbeat.interval.ms,提升他以减少rebalance
  1. Offset Management

    • enable.auto.commit
    • auto.commit.interval.ms
    • auto.offset.reset(earliest/latest)

管理

  • list Groups

    bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
    
  • Describe Group
    bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo

例子

Basic Poll Loop

public abstract class BasicConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;
  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics; this.shutdown = new AtomicBoolean(false);
  this.shutdownLatch = new CountDownLatch(1); }
  public abstract void process(ConsumerRecord<K, V> record);
  public void run() {
    try { 
      consumer.subscribe(topics);
      while (!shutdown.get()) {
        // 这里 这里可以采用wakeup模式,此处
        // ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        // 可以在另一个线程consumer.wakeup();
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
      }
      } finally {
        consumer.close();
        shutdownLatch.countDown();
      }
  }
  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    // 确定consumer.close关闭才返回
    shutdownLatch.await();
  }
}

提交offset

autoCommitOffset的问题是如果重启可能会有数据重复处理的问题,可以通过减少commit interval的方式来减少这种重复。

同步提交最保险:

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
}

如果group已经被rabalanced,此时commit会失败,抛出CommitFailedException。在处理event的时候可能sessionTimeout,有两种方法:

  1. 调整session.timeout.ms足够大,调整max.partition.fetch.bytes减少一次batch的返回事件数。
  2. 把事件处理放到另一个线程里面做。比如把event放到一个BlockingQueue里,
    但是这有个问题,就是heartbeat request要在两个poll()调用之间处理,如果在offer处理中block时间很长,会导致该节点被踢出去。
    参考:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

推荐1)

Delivery Guarantees####

  • at least once:
    auto-commit
  • at most once:
private boolean doCommitSync() {
  try {
      consumer.commitSync();
      return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
}
  • Exactly-once delivery
    不支持

异步offset提交

异步提交可以提升吞吐,但是会有风险:如果commit失败不会retry。
在callback中自行记录失败的offset

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (e != null)
            log.debug("Commit failed for offsets {}", offsets, e);
        }
      });
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

offsetcommit 失败通常不会引起太大的问题,因为并没有重复读原数据,一种比较常见的方式是在poll中异步提交,同时在rebalance和shutdown时同步提交:

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

异步提交只能处理at least once 这种情况,对于at most once这种情况,由于不能在确认是否commit成功再consumer数理数据,除非我们有unread语意!

多线程

Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException
.The only exception to this rule is wakeup()
, which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException
will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.

 closed.set(true);
 consumer.wakeup();

We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.

  1. One Consumer Per Thread
    A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:PRO: It is the easiest to implement
    PRO: It is often the fastest as no inter-thread co-ordination is needed
    PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them).
    CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost.
    CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput.
    CON: The number of total threads across all processes will be limited by the total number of partitions.

  2. Decouple Consumption and Processing
    Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords
    instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:

PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem.
CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.

There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.


http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容