目录
- 消息系统(3种消息溢出的选择,直发,消息代理(负载均衡和扇出),分区日志)
- 流和数据库(变更数据捕获 和 event sourcing)
- 流处理(5种应用)
- 时间推理(3个时间,4种窗口)
- 流连接(流流,流表,表表)
- 容错(微批量和存档点,事务,幂等,失败后重建)
文件是批处理作业的输入和输出,而在流处理之中,作业的输入输出等价物是什么呢?
在批处理之中,当输入是文件时,第一个处理步骤通常是将其解析为一连串的记录。在流处理之中,记录通常被称为事件,每个事件都是一个小的、独立的、不可变的对象,通常每个事件包含一个时间戳,表明事件产生的时间。 在流处理之中,事件由生产者产生,然后可能由多个对应消费者,相关的事件通常被分组到同一个主题之中。
可以由数据库来串联生产者与消费者:生产者可以将事件写入数据库,之后每一个消费者定期轮询数据库检查新出现的事件。但是数据库是不适合这种频繁轮询的操作的,因为轮询的次数越多,返回新事件的百分比越低,由此产生额外的开销也就越高。 (其实可以通过触发器的方式实现,但是数据库触发器也是基于数据库内部的关联的表进行操作的),所以引入了消息系统来处理流处理的需求。
1.消息系统
消息系统的运行逻辑很简单:由生产者发送包含事件的消息,然后将消息推送给消费者,可以由多个生产者节点发送消息到同一个主题,并允许多个消费节点在一个主题中接收消息。 但是消息系统会有这样几个问题:
如果生产者发送消息的速度比消费者处理的速度快,系统会怎么样处理呢 ?
- 删除消息
- 在队列中缓存消息
- 负反馈(也称为流量控制,阻止生产者发送更多消息)
如果节点崩溃或暂时离线,会出现消息丢失吗?消息系统与数据库相似,需要实现消息持久化需要一些进行磁盘读写或消息复制,这显然是有代价的。如果可以容忍消息丢失,那么可以在同一硬件上获得更高的吞吐量和更低的延迟。
消息的传递机制
直接传递
许多消息系统使用生产者和消费者之间的直接网络通信,而无需通过中间节点,如ZeroMQ 采取了TCP/IP组播的形式。所以如果消费者在网络上公开服务,生产者可以直接通过HTTP或RPC请求将消息推送给消费者。这就是webhooks背后的想法。
虽然直接消息传递的系统在通常情况下在协议检测和消息重传的机制下工作的很好,但是应用程序通常需要能够容忍消息丢失的情况,因为有一个问题很明显生产者和消费者不一定时刻在线。 而如果消费者离线,它可能错过消息。有些协议允许生产者重试失败的消息,但一旦生产者崩溃,这种方法可能失效,因为重试的消息的缓冲区会丢失。
消息队列
而另一种广泛使用方案是通过消息队列来发送消息,它作为与生产者和消费者的中间连接而存在,生产者将消息写入消息队列,而消费者从消息队列读取需要接收的消息。 通过消息队列传输的数据,系统容忍消费者和生产者的在线问题,消息持久性选择被交给了消息队列。这时我们可以更加灵活的处理消息,有些消息可以仅仅保存在内存中,而某些消息将写入磁盘,以便在消息队列崩溃时不会丢失这些消息。 面对处理速度缓慢的消费者,消息队列通常允许无界的排队规则,而不是丢弃消息或负反馈调整,这些机制都成为可以定制的选项。 但是消息队列的消息传递是异步的:当生产者发送消息时,它通常只等待消息队列的确认,而不会等到消费者处理消息。
消息的分发与确认
当多个消费者读取消息时,消息系统存在两种分发模型:
- 负载均衡
每个消息传递给所有消费者中的一个,由所有消费者共享处理主题中的消息的工作。消息队列可以任意的向消费者分配消息,来实现负载均衡。
- 消息广播
每条消息都传递给所有的消费者。消息广播使所有消费者收到同样的消息,而不影响彼此流,相当于有几个不同的批处理作业读取相同的输入文件。
这两种模式可以进行合并:例如,两个独立的消费者组可以各自订阅一个主题,使得每个组集体接收所有消息,但在每个组中,只有一个节点接收每个消息。
消费者可能在任意时刻崩溃,所以向消费者传递的消息未必会被处理或者只是在崩溃前部分处理它。 为了保证消息不丢失,消息代理使用确认机制:消费者需要明确反馈给消息队列,对应的消息得到了处理,消息队列会在队列之中移除对应的消息。 如果消费者的连接关闭或超时,而消息队列没有收到确认,则它假定消息没有被处理,因此它将消息再次发送给另一个消费者。(注意,可能会出现消息完全被处理的情况,但是确认在网络中丢失了,再次处理消息时需要确保消息的处理是幂等的。)所以如下图所示,这种情况会导致消息的交付顺序与生产者的发送的顺序不一致:
通常来说如果消息是完全独立的,那么消息的重新排序不会产生问题,但是如果消息之间有因果依赖关系,这回导致因果的不一致性,为了避免这个问题,可以为每个消费者使用单独的队列,但是这样就失去了负载均衡的优势。
分区日志
对于有持久化需求的消息队列,则考虑通过日志来实现持久化存储,来满足消息队列低延迟的要求。在前文之中我们讨论过日志的模式,同样相同的日志模型可以用来实现消息队列的持久化:生产者将消息追加到日志的末尾,而消费者通过依次读取日志来接收消息。如下图所示:为了比单个磁盘所能提供更高的吞吐量,可以对日志进行分区操作。在不同的代理节点上托管不同的分区,使每个分区保存独立的日志:
在每个分区之中,每个消息都会有一个单调递增的序列号,这样能够保证分区之中所有的消息是完全有序的,而不同分区之间的消息则没有顺序保证。通过这种方式可以很容易地分辨出哪些消息已被处理,比当前偏移量小的消息已经被处理,而后面的消息还没有被处理。因此,消息队列不需要追踪每一个消息,它只需要定期记录消费者偏移。这样有助于提高基于日志系统的吞吐量。而一旦消费者节点失效,则消费者组中的另一个节点被分配到日志分区,并开始在最后记录的偏移量上消费消息。 但如果之前的消息处理了偏移量之后的消息,但没有记录新的偏移量,则这些消息会被二次处理。
如果消费者无法跟上生产者发送消息的速率,则日志记录消息可以作为一种缓冲机制 。 当一个消费者所需要的消息比比日志上保留的信息要老,它将丢失过旧消息。所以需要监视消费者的消费速率,如果它显著落后,则发出警报。由于基于日志的磁盘缓冲区很大,有足够的时间让管理员介入。即使消费者落后太多,开始出现丢失消息的情况,也只有单个消费者受到影响,它不会破坏其他消费者的运行。 前文提到的消息确认是一种破坏性的操作,因为它会导致消息被消息队列删除。而在基于日志的消息队列中,消息的读取时只读的操作,不会改变日志。除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。这使得基于日志的消息队列更像是前文提及的批处理过程。
2. 流和数据库
上文已经提到过,没有一个系统能够满足所有的数据存储、查询和处理需求。在实践中,应用需要结合不同的技术来满足要求,所以本节我们来看看消息队列与数据库是怎么样并肩作战的。
保持系统同步
由于相同或相关的数据出现在了不同的地方,因此相互间需要保持同步:如果某个项目在数据库中被更新,它也应当在缓存,搜索索引和数据仓库中被更新。
如果周期性的完整数据库转储过于缓慢,有时会使用的替代方法是双写(dual write),其中应用代码在数据变更时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存项失效
但是双写会有竞争条件,和部分成功部分失败的问题。
确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的(2pc)
如果实际上只有一个领导者 —— 例如,数据库 —— 而且我们能让搜索索引成为数据库的追随者,情况要好得多。但这在实践中可能吗?
变化数据捕获(CDC) 是常常被使用到的技术,通过观察所有写入数据库的数据变化并将它们转换成可复制到其他系统数据的过程。如下图所示,通过捕获到数据库中的更改,并继续对搜索索引等应用更改,通过以相同的顺序应用更改日志,搜索索引中的数据与数据库中的数据相匹配。
变化数据捕获的实现
变化数据捕获是一种机制,用于确保对记录系统的所有更改也反映在派生数据系统中,以便派生系统具有准确的数据副本。 从本质上讲,更改数据捕获使一个数据库成为Leader,并将其他数据系统变成Follower。基于日志的消息队列很适合从源数据库接受消息的变化,并且保留的消息的顺序。 数据库的触发器同样可用于实现变化数据捕获,通过观察数据表的所有变化并将变化添加到记录表之中,但是触发器会带显著的性能开销。变化数据捕获通常是异步的:记录数据库系统在提交之后不会等待更改应用于消费者。
快照与日志压缩
如果拥有对数据库所做的所有更改的日志,那么可以通过日志来重建数据库的整个状态。但是将所有更改保存在内存中,会耗费大量的磁盘空间,并且载入并应用日志将耗费太长的时间,因此需要截断日志并配合快照来使用。构建一个新的全文索引需要整个数据库的完整副本,这里可以通过快照开始,并且载入快照后生成的日志便可以将索引恢复到最新的状态。所以数据库快照必须与日志中的偏移量相对应,以便确定在处理完快照后,在哪一点开始应用日志更改。
因为只能保留有限的日志记录,所以每次需要添加新的派生数据系统时,都需要经历快照的过程。这增加了系统的复杂性,而日志压缩提供了一个很好的替代方案,日志压缩的原理很简单:存储引擎周期性地查找具有相同Key的日志记录,丢弃重复的记录,并且只保存每个Key的最新值。 日志的压缩和合并过程在后台运行,如果需要重建派生数据系统(如:搜索索引)时,可以从压缩日志中启动一个新的用户,并依次扫描日志中的所有消息,就可以获取数据库内容的完整副本,而不必通过额外的快照。
event sourcing
与变更数据捕获类似,事件溯源涉及到将所有对应用状态的变更存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了几个不同的抽象层次上:
- 在变更数据捕获中,应用以可变方式(mutable way)使用数据库,任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免图11-4中的竞态条件。
- 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。
不可变事件的优点
不可变的事件也包含了比当前状态更多的信息。例如在购物网站上,顾客可以将物品添加到他们的购物车,然后再将其移除。虽然从履行订单的角度,第二个事件取消了第一个事件,但对分析目的而言,知道客户考虑过某个特定项而之后又反悔,可能是很有用的。也许他们会选择在未来购买,或者他们已经找到了替代品。这个信息被记录在事件日志中,但对于移出购物车就删除记录的数据库而言,这个信息在移出购物车时可能就丢失
3. 流处理
你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选项:
你可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后能被其他客户端查询。
你能以某种方式将事件推送给用户,例如发送报警邮件或推送通知,或将事件流式传输到可实时显示的仪表板上。在这种情况下,人是流的最终消费者。
你可以处理一个或多个输入流,并产生一个或多个输出流。流可能会经过由几个这样的处理阶段组成的流水线,最后再输出(选项1或2)。
流处理的应用
流分析
使用流处理的另一个领域是对流进行分析。 CEP与流分析之间的边界是模糊的,但一般来说,分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合与统计指标 —— 例如:
测量某种类型事件的速率(每个时间间隔内发生的频率)
滚动计算一段时间窗口内某个值的平均值
将当前的统计值与先前的时间区间的值对比(例如,检测趋势,当指标与上周同比异常偏高或偏低时报警)
维护物化视图
数据库的变更流可以用于维护衍生数据系统(如缓存,搜索索引和数据仓库),使其与源数据库保持最新。
在流上搜索
例如,媒体监测服务可以订阅新闻文章Feed与来自媒体的播客,搜索任何关于公司,产品或感兴趣的话题的新闻。这是通过预先构建一个搜索查询来完成的,然后不断地将新闻项的流与该查询进行匹配。
消息传递和RPC
消息传递系统可以作为RPC的替代方案,即作为一种服务间通信的机制,比如在Actor模型中所使用的那样。
4. 时间推理
流处理与数据库相比最核心的差别是:查询和数据之间的关系是相反的。通常,数据库会持久地存储数据,而查询是一个临时的操作。而流处理反转两者的角色:查询是长期存储的,输入流的事件不断地流过,并寻找查询模式匹配的数据。所以,二者的应用场景也差距很大,流处理擅长监控变化的数据并且给予反馈。一旦涉及到变化,则是一个时间敏感问题,数据是随着时间的推移而变化的,流处理通常需要处理时间,特别是用于分析的数据变化时,需要使用时间窗口。例如 “过去五分钟的平均时间”。许多流处理框架使用了本地系统时钟来确定时间窗口。如果事件的发生和事件的处理之间的延迟很小,这个模型就十分简单易行。然而,前文我们提到了,事件很有可能会产生延迟,事件的处理可能明显晚于事件的发生。
事件时间与处理时间
有许多原因会导致处理的延迟如:排队、网络故障,消息队列处理缓慢,代码的bug等。消息延迟会导致事件的不可预知排序。例如,假设用户首先创建一个Web请求(由Web服务器A处理),然后再进行第二个请求(由服务器B处理)。a和b发出描述它们所处理请求的事件,但b事件在事件发生前到达消息代理。现在流处理器将首先看到b事件,然后才是a事件,尽管它们实际上是以相反的顺序发生的。
事件发生的时间和事件的处理时间是两个完全不同的概念,混淆他们会导致数据的损坏。如下图所示,Web服务器上事件发生的频率是稳定的,但是流处理器需要定期处理事件,可能这时会停下来一分钟,处理挤压的事件,如果这时以事件的处理事件来测量数据,会导致异常的波动结果。
如何确定时间戳
确定事件的时间戳是一件很困难的事,按理来说,事件上的时间戳应该是与用户交互发生的时间,但是,用户控制的设备上的时钟通常不能被信任,因为它可能是偶然或故意设置到错误的时间。服务器接收到事件的时间(根据服务器的时钟)更可能是准确的,但在描述用户交互方面没有什么意义。所以这里有三个时间戳的法则:
1 .事件发生的时间 (设备时钟)
2 设备将事件发送到服务器的时间 (设备时钟)
3 服务器接收事件的时间 (服务器时钟)
由第三个时间戳减去第二个时间戳,可以估计设备时钟和服务器时钟之间的偏移量,通过这样的方式来估计事件实际发生的真实时间。
窗口的类型
当你知道如何确定一个事件的时间戳后,下一步就是如何定义时间段的窗口。然后窗口就可以用于聚合,例如事件计数,或计算窗口内值的平均值。有几种窗口很常用
- 滚动窗口(Tumbling Window)
滚动窗口有着固定的长度,每个事件都仅能属于一个窗口。例如,假设你有一个1分钟的滚动窗口,则所有时间戳在10:03:00和10:03:59之间的事件会被分组到一个窗口中,10:04:00和10:04:59之间的事件被分组到下一个窗口,依此类推。通过将每个事件时间戳四舍五入至最近的分钟来确定它所属的窗口,可以实现1分钟的滚动窗口。
- 跳动窗口(Hopping Window)
跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。例如,一个带有1分钟跳跃步长的5分钟窗口将包含10:03:00至10:07:59之间的事件,而下一个窗口将覆盖10:04:00至10:08之间的事件: 59,等等。通过首先计算1分钟的滚动窗口,然后在几个相邻窗口上进行聚合,可以实现这种跳动窗口。
- 滑动窗口(Sliding Window)
滑动窗口包含了彼此间距在特定时长内的所有事件。例如,一个5分钟的滑动窗口应当覆盖10:03:39和10:08:12的事件,因为它们相距不超过5分钟(注意滚动窗口与步长5分钟的跳动窗口可能不会把这两个事件分组到同一个窗口中,因为它们使用固定的边界)。通过维护一个按时间排序的事件缓冲区,并不断从窗口中移除过期的旧事件,可以实现滑动窗口。
- 会话窗口(Session window)
与其他窗口类型不同,会话窗口没有固定的持续时间,而定义为:将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时(例如,如果30分钟内没有事件)窗口结束。
5. 流连接
流流连接(窗口连接)
流处理器需要维护状态:例如,按会话ID索引最近一小时内发生的所有事件。无论何时发生搜索事件或点击事件,都会被添加到合适的索引中,而流处理器也会检查另一个索引是否有具有相同会话ID的事件到达。如果有匹配事件就会发出一个表示搜索结果被点击的事件;如果搜索事件直到过期都没看见有匹配的点击事件,就会发出一个表示搜索结果未被点击的事件。
流表连接(流扩展)
数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“Map端连接”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则可以是内存中的散列表,比较大的话也可以是本地磁盘上的索引。
批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持更新。这个问题可以通过变更数据捕获来解决
表表连接
推特时间线例子时说过,当用户想要查看他们的主页时间线时,迭代用户所关注人群的推文并合并它们是一个开销巨大的操作。
相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,因而读取时间线时只需要简单地查询即可。物化与维护这个缓存需要处理以下事件:
当用户u发送新的推文时,它将被添加到每个关注用户u的时间线上。
用户删除推文时,推文将从所有用户的时间表中删除。
当用户开始关注用户时,最近的推文将被添加到的时间线上。
当用户取消关注用户时,的推文将从的时间线中移除。
要在流处理器中实现这种缓存维护,你需要推文事件流(发送与删除)和关注关系事件流(关注与取消关注)。流处理需要为维护一个数据库,包含每个用户的粉丝集合。以便知道当一条新推文到达时,需要更新哪些时间线。
6. 容错
批处理框架可以很容易地容错:如果MapReduce作业中的任务失败,可以简单地在另一台机器上再次启动,并且丢弃失败任务的输出。
在流处理中也出现了同样的容错问题,但是处理起来没有那么直观:等待某个任务完成之后再使其输出可见并不是一个可行选项,因为你永远无法处理完一个无限的流。
微批量与存档点
一个解决方案是将流分解成小块,并像微型批处理一样处理每个块。这种方法被称为微批次(microbatching),它被用于Spark Streaming 。批次的大小通常约为1秒,这是对性能妥协的结果:较小的批次会导致更大的调度与协调开销,而较大的批次意味着流处理器结果可见之前的延迟要更长。
微批次也隐式提供了一个与批次大小相等的滚动窗口(按处理时间而不是事件时间戳分窗)。任何需要更大窗口的作业都需要显式地将状态从一个微批次转移到下一个微批次。
Apache Flink则使用不同的方法,它会定期生成状态的滚动存档点并将其写入持久存储。如果流算子崩溃,它可以从最近的存档点重启,并丢弃从最近检查点到崩溃之间的所有输出。存档点会由消息流中的壁障(barrier)触发,类似于微批次之间的边界,但不会强制一个特定的窗口大小。
在流处理框架的范围内,微批次与存档点方法提供了与批处理一样的恰好一次语义。但是,只要输出离开流处理器(例如,写入数据库,向外部消息代理发送消息,或发送电子邮件),框架就无法抛弃失败批次的输出了。在这种情况下,重启失败任务会导致外部副作用发生两次,只有微批次或存档点不足以阻止这一问题。
分布式事务
在第8节,我们讨论了分布式事务传统实现中的问题(如XA)。
幂等性
幂等操作是多次重复执行与单次执行效果相同的操作。例如,将键值存储中的某个键设置为某个特定值是幂等的(再次写入该值,只是用同样的值替代),而递增一个计数器不是幂等的(再次执行递增意味着该值递增两次)。
即使一个操作不是天生幂等的,往往可以通过一些额外的元数据做成幂等的。例如,在使用来自Kafka的消息时,每条消息都有一个持久的,单调递增的偏移量。将值写入外部数据库时可以将这个偏移量带上,这样你就可以判断一条更新是不是已经执行过了,因而避免重复执行。
失败后重建状态
一种选择是将状态保存在远程数据存储中,并进行复制,每个消息都要查询远程数据库可能会很慢。
另一种方法是在流处理器本地保存状态,并定期复制。