kafka入门

Apache Kafka 是一款开源的消息引擎系统,由LinkedIn 公司内部孵化的项目。Raft 算法和 Paxos 算法分布式系统的一致性算法,zookeeper等的原理。Google 的 Protocol Buffer 或 Facebook 的 Thrift,rpc框架。 Kafka使用的是纯二进制的字节序列kafka同时支持点对点模式和发布订阅模式(通过group机制实现)

JMS 是 Java Message Service,它也是支持上面这两种消息引擎模型的。JMS是一组API,是一个规范,很多消息引擎也支持,比如 ActiveMQ、RabbitMQ、IBM 的 WebSphere MQ 和 Apache Kafka。kafka等消息引擎的主要作用就是解耦生产和发布。

Kafka 的三层消息架构:

1、第一层是主题层(topic),每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本,一条消息只能在一个分区

2、第二层是分区层(partition),每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。

3、第三层是消息层(message),分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。

4、最后,客户端程序只能与分区的领导者副本进行交互。


消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。

主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。

消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。

副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

生产者:Producer。向主题发布新消息的应用程序。

消费者:Consumer。从主题订阅新消息的应用程序。

消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。

消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。


1、Apache Kafka,也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。

2、Confluent Kafka,Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。

3、CDH/HDP Kafka,大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

kafka版本:

kafka-2.11-2.1.1    2.11代表scala版本,2.1.1代表大版本,小版本和patch。

0.8.2.0 版本社区引入了新版本 Producer API,即需要指定 Broker 地址的 Producer。老版本需指定zookeeper地址。


消息压缩:

Producer 端压缩、Broker 端保持、Consumer 端解压缩。producer和broker都有一个参数compression.type,尽量保持一致,避免不必要的解压缩,否则broker端的cpu很可能飙升。每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIPSnappy LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)。

在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

高性能原因:

1、页缓存技术(系统page cache) + 磁盘顺序写,类似各种db

2、零拷贝技术,省略了把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,省去2此内存拷贝,系统内存直接到IO。

3、吞吐量高,线性增加。partiton、replica机制。

Exactly-once:kafka默认at least once。

最多一次(at most once):消息可能会丢失,但绝不会被重复发送。禁止重试即可

至少一次(at least once):消息不会丢失,但有可能被重复发送。 Broker 的应答没有成功发送回 Producer 端,它只能选择重试。默认

精确一次(exactly once):消息不会丢失,也不会被重复发送。

幂等性(Idempotence):

其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。

幂等性缺陷:幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务:实际生产中很少使用,性能影响较大

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

1、和幂等性 Producer 一样,开启 enable.idempotence = true。

2、设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。

producer.initTransactions();

try {

            producer.beginTransaction();

            producer.send(record1);

            producer.send(record2);

            producer.commitTransaction();

} catch (KafkaException e) {

            producer.abortTransaction();

}

在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:

1、read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。

2、read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

producer端:

1、使用 producer.send(msg, callback)。一定要有回调,接收异常消息。

2、acks,0、1(一个成功即可)、-1或者all(副本全部成功才算已提交,不会丢失,但吞吐量低)。

3、配置retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

broker端:

1、设置 unclean.leader.election.enable = false。不允许落后太多的broker参与选主。

2、设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

3、置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

4、确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

consumer端:

1、设置enable.auto.commit=false,关闭consumer端自动提交,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。


kafka拦截器:

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测消息审计等多种功能在内的场景。可实现端到端的统计和审计。

Producer端:

Properties props = new Properties();

List<String> interceptors = new ArrayList<>();

interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1

interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

1、onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。

2、onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路。

consumer端:

指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

1、onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。

2、onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

TCP管理:

对最新版本的 Kafka(2.1.0)而言,Java Producer 端管理 TCP 连接的方式是:

1、KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。

2、KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。

3、如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。

4、如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。


消费者组:

1、Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。

2、Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。

3、Consumer Group 下所有实例订阅的主题的单个分区只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。极端情况下可实现点对点模型(所有consumer在一个组)和发布 / 订阅模型(每个consumer单独一个组)。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。但是ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。新版本中,将位移保存在 Broker 端的内部主题中,__consumer_offsets。

Rebalance:

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区

1、组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。

2、订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。

3、订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

要避免不必要的rebalance,否则影响broker性能。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置session.timeout.ms 和 heartbeat.interval.ms的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

设置 session.timeout.ms = 6s。

设置 heartbeat.interval.ms = 2s。

要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer 消费数据时需要将消息处理之后写入到 MongoDB。显然,这是一个很重的消费逻辑。MongoDB 的一丁点不稳定都会导致 Consumer 程序消费时长的增加。此时,max.poll.interval.ms参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。

Consumer 端的 GC,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。

位移主题:__consumer_offsets用于保存消费者位移信息的内部主题

如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。

Consumer端位移提交:

位移提交的语义保障是由consumer来负责的,Kafka 只会“无脑”地接受你提交的位移。从用户的角度来说,位移提交分为自动提交手动提交;从 Consumer 端的角度来说,位移提交分为同步提交异步提交

开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit,把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true,即 Java Consumer 默认就是自动提交位移的。如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。

enable.auto.commit=true; # 开启自动提交

auto.commit.interval.ms=5000; # 设置自动提交间隔,默认5s

当在默认提交的时间间隔内,若发生rebalance,将会发生重复消费

consumer.commitSync(); # 同步提交offset,会影响consumer性能

consumer.commitAsync(); # 不能重试

对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。


CommitFailedException异常:

1、缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。

2、增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。

3、减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。

4、下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。在专栏后面的内容中,我将着重和你讨论一下多线程消费的实现方案。

副本机制:

kafka追随者副本不提供服务,所有读写都在主broker上。因为如果提供读服务可能会有写入到读取的延时,同时akfka不像mysql它的读一般是一次性的,跟写有对应关系,提供读服务将不能保证分区间的线性读,实现起来复杂。

In-sync Replicas(ISR)副本,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。而加入isr副本的标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

Unclean 领导者选举(Unclean Leader Election),不在isr副本集合中的追随者被认为是unclean leader,即落后leader较多的副本。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。并且isr会根据follower延时时间动态调整,能加入也能剔除。

一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。若是允许unclean选举,则因为其延时可能会丢数据,但是保证了系统的可用性,即C和A的选择。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容