1、流式数据处理的利器
Kafka 最初是 LinkedIn 的一个内部基础设施系统。
本质上是“数据架构”的一部分,一个能够处理持续数据流的组件。
Kafka 一开始被用在社交网络的实时应用和数据流当中,而现在已经成为下一代数据架构的基础。
Kafka 在这当中充当了怎样的角色?它与现有的系统有什么区别?
Kafka 是一个流平台:在这个平台上可以发布和订阅数据流,并把它们保存起来、进行处理,这就是构建 Kafka 的初衷。
Kafka 经常会被拿来与现有的技术作比较:企业级消息系统、大数据系统(如 Hadoop)和数据集成或 ETL工具。
Kafka 有点像消息系统,允许发布和订阅消息流。从这点来看,它类似于 ActiveMQ、
RabbitMQ 或 IBM 的 MQSeries 等产品。尽管看上去有些相似,但 Kafka 与这些传统的消
息系统仍然存在很多重要的不同点
2、数据连接层
消息系统只会传递
消息,而 Kafka 的流式处理能力让你只用很少的代码就能够动态地处理派生流和数据集。
Kafka 的这些独到之处足以让你刮目相看,它不只是“另一个消息队列”。
3、实时版的 Hadoop
Hadoop用来处理“存量”的大型数据集合数据文件(离线处理)。
Kafka可以存储和持续处理大型的数据流。
很多人将新兴的流式处理看成批处理的超集。它们之间的最大不同体现在持续的低延迟处理和批处理之间的差异上。
Hadoop 和大数据主要应用在数据分析上,而 Kafka 因其低延迟的特点更适合用在核心的业务应用上。业务事件时刻在发生,Kafka 能够及时对这些事件作出响应,基于Kafka 构建的服务直接为业务运营提供支撑,提升用户体验。
Kafka 与 ETL 工具或其他数据集成工具的共同点都是“搬运数据”。
以数据流为中心的架构
Kafka(流式数据平台)=实时的->消息系统(发布与订阅)+数据存储+hadoop(数据分析)+ETL(数据搬运)
持续数据流构建应用程序->要学会基于持续数据流构建应用程序也着实是一个巨大的思维转变。
如何移动数据,从数据的发生点移动到可以分析和处理的位置上。
花费越少的精力在数据移动上,就越能专注于核心业务。
这就是为什么在一个以数据为驱动的企业里,数据管道会成为关键性组件。
如何移动数据,几乎变得与数据本身一样重要。
4、发布-订阅消息系统
消息、批次、模式、主题、分区
流是一组从生产者移动到消费者的数据。
目标:
使用推送和拉取模型解耦生产者和消费者;
为消息传递系统中的消息提供数据持久化,以便支持多个消费者;
通过系统优化实现高吞吐量;
系统可以随着数据流的增长进行横向扩展。
kafka broker配置、
硬件(磁盘IO能力(磁盘性能影响生产者,写入到磁盘),磁盘存储,内存(内存会影响到消费者,一般从页面缓存中读取数据))
不建议把 Kafka 同其他重要的应用程序部署在一起:它会用很多的页面缓存
网络:网络吞吐量决定了 Kafka 能够处理的最大数据流量。
CPU:影响小,kafka不是CPU密集型的,最大的计算:对消息数据进行压缩,解压后设置偏移量,再进行压缩
磁盘的三个时间元数据:创建时间、最后修改时间和最后访问时间,每次都要更新
避免在创建分区之后再修改分区的数量
我们把更新分区当前位置的操作叫作提交。
自动提交(默认间隔是5s),代码去控制提交:同步(阻塞,还有可能导致偏移量前后覆盖)和异步(回调,在回调里面去控制失败重试的逻辑,避免偏移量覆盖)
在应用程序的业务处理之后提交偏移量;或者自定义提交的条件,比如处理了1000条记录后就要提交一个集群里面只允许一个控制器的存在
控制器使用epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器。
复制功能是 Kafka 架构的核心。在 Kafka 的文档里,Kafka 把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在 broker 上,每个 broker 可以保存成百上千个属于不同主题和分区的副本。
Kafka 使用零复制技术向客户端发送消息——也就是说,Kafka 直接把消息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是 Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。
Kafka 的基本存储单元是分区。
Kafka 管理员、Linux 系统管理员、网络和存储管理员以及
应用程序开发者,所有人必须协同作战,才能构建一个可靠的系统。
所以,了解系统的保证机制对于构建可靠的应用程序来说至关重要,这也是能够在不同条件下解释系统行为的前提。那么 Kafka 可以在哪些方面作出保证呢?
1)Kafka 可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息 B 在消息 A 之后写入,那么 Kafka 可以保证消息 B 的偏移量比消息 A 的偏移量大,而且消费者会先读取消息 A 再读取消息 B。
2)只有当消息被写入分区的所有同步副本时(但不一定要写入磁盘),它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认,或者在消息被写入首领副本时的确认,或者在消息被发送到网络时的确认。
3)只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。
4)消费者只能读取已经提交的消息。
我们的系统需要什么可靠性,针对可靠性有什么保证措施?
这种权衡一般是指消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本的重要程度之间的权衡。
Kafka 的复制机制和分区的多副本架构是 Kafka 可靠性保证的核心。
broker中的分区副本的“同步性质”,依赖于broker与zookeeper连通性,连通性又依赖于它们之间的心跳,broker和zookeeper都是JVM系的,那不合理的GC可能会导致整个程序停止响应,导致broker与zookeeper断开连接。
broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。与其他配置参数一样,它们可以应用在 broker 级别,用于控制所有主题的行为,也可以应用在主题级别,用于控制个别主题的行为。在主题级别控制可靠性,意味着 Kafka 集群可以同时拥有可靠的主题和非可靠的主题。
broker(n)一定是≥副本系统数,n如果小于副本系统数的话,那比如有两个相同的副本放到同一个broker,就失去备用的意义了。
从另一个角度讲,就是要保证相同的副本分散到各个不同的broker中。
副本的分布也很重要。默认情况下,Kafka 会确保分区的每个副本被放在不同的 broker 上。不过,有时候这样仍然不够安全。如果这些 broker 处于同一个机架上,一旦机架的交换机发生故障,分区就会不可用,这时候把复制系数设为多少都不管用。为了避免机架级别的故障,我们建议把 broker 分布在多个不同的机架上,并使用 broker.rack 参数来为每个broker 配置所在机架的名字。如果配置了机架名字,Kafka 会保证分区的副本被分布在多个机架上,从而获得更高的可用性。
不完全的首领选择:
如果把 unclean.leader.election.enable 设为 true,就是允许不同步的副本成为首领(也就是“不完全的选举”),那么我们将面临丢失消息的风险。如果把这个参数设为 false,就要等待原先的首领重新上线,从而降低了可用性。我们经常看到一些对数据质量和数据一致性要求较高的系统会禁用这种不完全的首领选举(把这个参数设为 false)。银行系统是这方面最好的例子,大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支付事务,也不会冒险处理错误的消息。不过在对可用性要求较高的系统里,比如实时点击流分析系统,一般会启用不完全的首领选举。
可靠性,不单单要考虑broker,也要考虑生产者和消费者的可靠性
(生产者和消费者处理不好与broker之间的交互,可能会导致消息丢失,这个问题不是broker导致的,可能是生产者或消费者自己弄丢的)
不仅仅只是考虑消息丢失了,也考虑消息重复了(重复也是不可靠的,要么做到“消息的幂等性,也就是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。”)
例如,消息“这个账号里有 110 美元”就是幂等的,因为即使多次发送这样的消息,产生的结果都是一样的。不过消息“往这个账号里增加 10 美元”就不是幂等的。
在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息。
如果 broker 返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。还有一类是无法通过重试来解决的错误,称之为“不可重试”错误。
消费者唯一要做的是跟踪哪些消息是已经读取过的,哪些是还没有读取过的。
在遇到可重试错误时,把错误写入一个独立的主题,然后继续。一个独立的消费者群组负责从该主题上读取错误消息,并进行重试,或者使用其中的一个消费者同时从该主题上读取错误消息并进行重试,不过在重试时需要暂停该主题。这种模式有点像其
他消息系统里的 dead-letter-queue。
暂停主题?
Kafka 的跨数据中心复制工具(MirrorMaker)默认会进行无限制的重试(例如 retries=MAX_INT)。作为一个具有高可靠性的复制工具,它决不会丢失消息。
客户端是在轮询的过程中一并向broker发送心跳的,对于在轮询中发送“长处理”消息的情况会影响到发送心跳的频率,被误以为故障了;
所以对于客户端在消费处理消息时出现“长处理”的情况,可以把这个“长处理”交给独立的线程池去处理,让客户端本身保持合理的轮询,避免因为“长处理”而阻塞了轮询发送心跳。
消息“至少一次”和“仅一次”的语义,都应该得到支持。但是kafka不能完全支持“仅一次”的语义——生产者能保证“至少一次”但无法保证“仅一次”,而消费者可以保证“仅一次”——有时候需要消费者程序做些判断,或者保证消息的幂等性;
消费者将消息读取出来写到外部实现“仅一次”的的简单方法:1)找到唯一标识一个 Kafka 记录的ID,把它写到支持唯一键的存储系统支持等性写入)里面。
验证系统可靠性:
1)配置验证:Kafka 的代码库里包含了大量测试用例;
2)应用程序验证以:检查自定义的错误处理代码、偏移量提交的方式、再均衡监听器以及其他使用了 Kafka 客户端的地方
3)生产环境的应用程序监控:测试应用程序是很重要的,不过它无法代替生产环境的持续监控,这些监控是为了确保数据按照期望的方式流动。
监控:
1)对于生产者来说,最重要的两个可靠性指标是消息的 error-rate 和 retry-rate(聚合过的)。如果这两个指标上升,说明系统出现了问题。
2)对于消费者来说,最重要的指标是 consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想情况下,该指标总是为 0,消费者总能读到最新的消息。不
3)这种端到端的监控系统实现起来很耗费时间,具有一定挑战性。据我们所知,目前还没有开源的实现。Confluent 提供了一个商业的实现版本,它是 Confluent Control Center 的一部分。
数据管道:
在使用 Kafka 构建数据管道时,通常有两种使用场景:第一种,把 Kafka 作为数据管道的两个端点之一,例如,把 Kafka 里的数据移动到 S3 上,或者把 MongoDB 里的数据移动到 Kafka 里;第二种,把 Kafka 作为数据管道两个端点的中间媒介,例如,为了把 Twitter的数据移动到 ElasticSearch 上,需要先把它们移动到 Kafka 里,再将它们从 Kafka 移动到ElasticSearch 上。
数据源----数据管道----数据池,数据管道难免要做一些数据处理,更为灵活的方式是尽量保留原始数据的完整性,让下游的应用自己决定如何处理和聚合数据。
Kafka 为数据管道带来的主要价值在于,它可以作为数据管道各个数据段之间的大型缓冲区,有效地解耦管道数据的生产者和消费者。Kafka 的解耦能力以及在安全和效率方面的可靠性,使它成为构建数据管道的最佳选择。——生产者可以频繁地向 Kafka 写入数据,也可以按需写入;消费者可以在数据到达的第一时间读取它们,也可以每隔一段时间读取一次积压的数据。
可靠性:我们要避免单点故障,并能够自动从各种故障中快速恢复;数据传递保证(满足“至少一次”和“仅一次”的语义)
吞吐量:高吞吐量,动态吞吐量(应对突发的吞吐量变化)
安全性:数据安全?谁能操作管道?生产者与消费者的认证机制?
kafka的安全机制:1)数据加密;2)认证机制;3)审计日志
每个 Kafka 主题对应 ElasticSearch 里的一个索引,主题的名字与索引的名字相同。
kafka监控
告警疲劳
自己不能监控自己:要确保 Kafka 的监控和告警不依赖Kafka 本身。
很多组织使用 Kafka 收集应用程序和系统的度量指标与日志,然后供中心监控系统使用,这样可以很好地解耦应用程序和监控系统。不过,对于Kafka 本身来说却存在一个问题,如果使用这个监控系统来监控 Kafka,那么当 Kafka 崩溃时,我们很可能无法感知到,因为监控系统的数据流也随着消失了。
borker的监控(度量)指标:重要的,非同步分区的数量
主题的监控(度量)指标
JVM监控
在讨论监控时,如果不涉及日志,那么这个监控就是不完整的。
对于 Kafka 来说,磁盘是最重要的子系统。所有的消息都保存在磁盘上,所以 Kafka 的性能严重依赖磁盘的性能。我们需要对磁盘空间和索引节点进行监控,确保磁盘空间不会被用光。对于保存数据的分区来说就更是如此。对磁盘 IO 进行监控也是很有必要的,它们揭示了磁盘的运行效率。我们需要监控磁盘的每秒种读写速度、读写平均队列大小、平均等待时间和磁盘的使用百分比。
什么是流式处理。
数据流:
1)数据流是无边界数据集的抽象表示。无边界意味着无限和持续增长。无边界数据集之所以是无限的,是因为随着时间的推移,新的记录会不断加入进来。
2)数据流是有序的;
3)数据流不能被改变,发生了就发生了,不可能回到过去修改。
4)数据流是可以重播的,但对于大多数业务来说,重播发生在几个月前(甚至几年前)的原始事件流是一个很重要的需求。
流式处理是指实时地处理一个或多个事件流。流式处理是一种编程范式,就像请求与响应范式和批处理范式那样。
编程范式
1)请求与响应,OLTP线上交易系统,特点:延迟最小,稳定,阻塞
2)批处理,按照设定的时间启动某个处理程序,读取所有数据处理之后输出结果,比如汇总生成报表等,特点:有高延迟和高吞吐量
3)流式处理,这种范式介于上述两者之间。大部分的业务不要求亚毫秒级的响应,不过也接受不了要等到第二天才知道结果。大部分业务流程都是持续进行的,只要业务报告保持更新,业务产品线能够持续响应,那么业务流程就可以进行下去,而无需等待特定的响应,也不要求在几毫秒内得到响应。一些业务流程具有持续性和非阻塞的特点,比如针对可疑信用卡交易的警告、网络警告、根据供应关系实时调整价格、跟踪包裹。
流的定义不依赖任何一个特定的框架、API 或特性。只要持续地从一个无边界的数据集读取数据,然后对它们进行处理并生成结果,那就是在进行流式处理。重点是,整个处理过程必须是持续的。一个在每天凌晨两点启动的流程,从流里读取 500 条记录,生成结果,然后结束,这样的流程不是流式处理。
推荐阅读 Justin Sheehy 的论文“There is No Now”。
流式处理系统一般包含如下几个时间概念:
1)事件事件,事件发送的时间和记录的创建时间
2)日志追加时间,指数据(事件)保存到 broker 的时间
3)处理时间,应用程序在收到事件之后要对其进行处理的时间。这个时间可以是在事件发生之后的几毫秒、几小时或几天。或者两个线程处理(两个时间了),所以它不可靠。
注意时区问题
在将表与流进行对比时,可以这么想:流包含了变更——流是一系列事件,每个事件就是一个变更。表包含了当前的状态,是多个变更所产生的结果。所以说,表和流是同一个硬币的两面——世界总是在发生变化,用户有时候关注变更事件,有时候则关注世界的当前状态。如果一个系统允许使用这两种方式来查看数据,那么它就比只支持一种方式的系统强大。
为了将表转化成流,需要捕捉到在表上所发生的变更,将“insert”、“update”和“delete”事件保存到流里。大部分数据库提供了用于捕捉变更的“Change Data Capture”(CDC)解决方案,Kafka 连接器将这些变更发送到 Kafka,用于后续的流式处理。为了将流转化成表,需要“应用”流里所包含的所有变更,这也叫作流的“物化”。首先在内存里、内部状态存储或外部数据库里创建一个表,然后从头到尾遍历流里的所有事件,逐个地改变状态。在完成这个过程之后,得到了一个表,它代表了某个时间点的状态。