注意: 这一章还未完成编辑,你阅读的内容中还存在大量机器翻译的结果
第2章 数据集成
我要研究的第一个应用程序是数据集成。首先,让我解释一下数据集成的含义以及为什么我认为它很重要,然后我们将了解它与日志的关系。
数据集成意味着将组织拥有的所有数据提供给需要它的所有服务和系统。
“数据集成”一词并不常见,但我不知道有什么更好的。术语ETL(extract提取,transform转换和load加载)通常更易于理解,它仅涵盖数据集成的一小部分-填充关系数据仓库。但是,我所描述的大部分内容都可以看作是ETL,它被概括为还包含实时系统和处理流程。
围绕大数据的想法而来的所有令人喘不过气的兴趣和炒作,您很少听到有关数据集成的信息;但是,我认为,使数据可用这一平凡的问题是组织可以关注的更有价值的目标之一。
有效使用数据遵循Maslow的一种需求层次结构。图2-1中所示的金字塔的基础涉及捕获所有相关数据并将其放到适用的处理环境中(无论是精美的实时查询系统还是仅文本文件和Python脚本)。这些数据需要以统一的方式建模,以使其易于读取和处理。一旦满足了以统一方式捕获数据的基本需求,便可以在基础架构上以各种方式处理该数据,这是合理的:MapReduce,实时查询系统等。
值得注意的是,显而易见:没有可靠且完整的数据流,Hadoop集群只不过是一个非常昂贵且难以组装的空间加热器。一旦数据和处理可用,您就可以继续处理更完善的问题,例如良好的数据模型和一致且易于理解的语义。最后,注意力可以转移到更复杂的处理上:更好的可视化,报告以及算法处理和预测。
以我的经验,大多数组织在此金字塔的基础上都有巨大的漏洞-他们缺乏可靠,完整的数据流,但希望直接跳至深度学习和高级数据建模技术。这是完全落后的。
问题是,我们如何在组织中的所有数据系统中建立可靠的数据流?
数据集成:两个并发症
两种趋势使数据集成成为一个越来越困难的问题。
数据更多样化
我想,如果您在15年前问一家公司,他们拥有什么数据,他们会描述他们的交易数据,例如用户,产品,订单以及关系数据库中表中保存的其他项目。
但是,我们的定义已经扩大。现在,大多数公司还将包括事件数据。事件数据记录发生的事情而不是发生的事情。在网络系统中,这意味着需要记录用户活动,以及可靠地操作和监视数据中心的机器所需的机器级事件和统计信息。人们通常把这种“日志数据”称为“日志数据”,因为它经常被写入应用程序日志中,但是这会使形式与功能混淆。这些数据是现代Web的核心:毕竟,Google的财富是由建立在点击和印象(即事件)上的相关管道产生的。
这些东西不仅限于网络公司,还只是网络公司已经完全数字化,因此它们更易于测量和度量。财务数据长期以来都是以事件为中心的。RFID将这种跟踪添加到物理对象。我认为这种趋势将随着传统业务和活动的数字化而继续。尽管有点流行,“物联网”试图描述将物理设备连接到数字世界的趋势。这样做的主要动机之一是记录有关这些物理设备正在做什么的事件数据,以便我们可以将数字建模扩展到物理世界,并扩大可以用软件编程和优化的事物的范围。
这种类型的事件数据颠覆了传统的数据集成方法,因为它往往比事务数据大几个数量级。
专用数据系统的爆炸式增长
第二个趋势是专用数据系统的爆炸式增长,在过去的五年中,这种数据系统已经变得很流行并且通常免费提供。存在用于OLAP,搜索,简单的在线存储,批处理,图形分析等的专用系统。
更多种类的更多数据的组合以及将这些数据导入更多系统的愿望导致了巨大的数据集成问题。
日志结构的数据流
我们如何解决这个问题?事实证明,日志是用于处理系统之间数据流的自然数据结构。办法很简单:
获取组织的所有数据,并将其放入中央日志以进行实时订阅。
每个逻辑数据源都可以建模为自己的日志。数据源可以是记录事件(例如单击或页面浏览)的应用程序,也可以是记录修改的数据库表。每个订阅系统都将尽快从该日志中读取日志,将每个新记录应用于其自己的存储,并提升其在日志中的位置。订户可以是任何类型的数据系统:高速缓存,Hadoop,另一个站点中的另一个数据库,搜索系统,等等(见图2-2)。
日志概念为每次更改提供了一个逻辑时钟,由此可以测量所有订户。这使得关于不同订户系统相对于彼此的状态的推论要简单得多,因为每个订户系统都有一个已读取的时间点。
为了更加具体,请考虑一个简单的情况,即有一个数据库和一组缓存服务器。日志提供了一种同步所有这些系统更新的方法,并根据它们在日志中的运行时间来推断其当前状态。假设我们用日志条目X编写了一条记录,然后需要从缓存中进行读取。如果我们要保证不会看到过时的数据,则只需确保我们不会从尚未复制到X的任何缓存中读取数据。
日志还充当缓冲区,使数据生产与数据消耗异步。由于许多原因,这一点很重要,但特别是当有多个用户以不同的速率消费时。这意味着订阅系统可能会崩溃或停机进行维护,并在恢复后赶上来:订户以其控制的速度消费。诸如Hadoop或数据仓库之类的批处理系统可能仅每小时或每天消耗一次,而实时查询系统则可能需要实时更新。原始数据源和日志都不了解各种数据目标系统,因此可以在不更改管道的情况下添加和删除使用者系统。
尤其重要:目标系统仅知道日志,而不知道知道起源系统的任何细节。消费者系统不必关心自己数据是否来自关系数据库,新的键值存储或是由某些应用程序直接生成的。这似乎是一个小问题,但实际上,致命!
我在这里使用“日志”一词,而不是“消息系统”或“ pub sub”,因为它在语义上更加具体,并且在实际实现中为支持数据复制所需的内容更加详尽地描述。我发现“发布订阅”不仅仅意味着间接寻址消息,如果你比较两个都承诺发布-订阅的消息传递系统,你会发现它们保证了截然不同的事物,并且大多数模型在此域。你可以将日志视为一种具有持久性保证和强大排序语义的消息传递系统。在分布式系统中,这种通信模型有时会使用原子广播的名称(有点可怕)。
值得强调的是,日志仍然只是基础架构。这不是精通数据流的故事的结局:故事的其余部分围绕元数据,模式,兼容性以及处理数据结构和演化的细节。除非有可靠的通用方法来处理数据流机制,否则语义细节是次要的。
我在领英的经验
在LinkedIn期间,随着我们从集中式关系数据库转移到分布式系统的集合中,我不得不看到这种数据集成问题迅速出现。我将简要介绍这些想法的经历。
这段时间里,LinkedIn运行的主要数据系统包括:
- Search
- Social graph 社交图
- Voldemort (键值对存储)
- Espresso (文档存储)
- Recommendation engine and ad serving systems 推荐系统引擎和广告投放系统
- OLAP query engine OLAP查询引擎
- Hadoop
- Terradata
- Ingraphs (监视图表和指标服务)
- Newsfeed (在首页上提供更新的系统)
这些都是专业的分布式系统,在其专业领域提供高级功能。
在我到达那之前,使用日志进行数据流的想法就一直在LinkedIn上流传。我们开发的最早的基础架构之一是一项称为数据总线的服务。Databus在我们早期的Oracle表之上提供了日志缓存抽象,以扩展对数据库更改的预订,因此我们可以提供我们的社交图和搜索索引。
在我们交付键值商店之后,我自己对此的参与始于2008年左右。我的下一个项目是尝试使Hadoop正常运行,并将我们的一些推荐流程移到那里。由于在这方面经验很少,我们自然会预算几个星期来获取和获取数据,而其余时间则用于实现花哨的预测算法。于是开始了漫长的log。
我们最初计划只是从现有的Oracle数据仓库中刮取数据。第一个发现是,迅速地从Oracle中获取数据是一件黑手艺。更糟糕的是,数据仓库处理不适合生产批次我们为Hadoop计划的处理-大多数处理是不可逆的,并且特定于所完成的报告。我们最终避免了数据仓库,而直接访问源数据库和日志文件。最后,我们实现了另一个管道以将数据加载到键值存储中以提供结果。
这些管道中的每一个最终都是一个重要的工程项目。它们必须进行扩展才能在多台计算机上运行。必须对其进行监视,测试和维护。这种平凡的数据复制最终成为我们正在开发中的主要项目之一。更糟糕的是,任何时候只要其中一个管道出现问题,就像当年那样,Hadoop系统在很大程度上都是无用的—对不良数据运行奇特的算法只会产生更多的不良数据。
尽管我们以相当通用的方式构建事物,但是每个新数据源都需要自定义配置才能进行设置。它也被证明是大量错误和失败的根源。我们已经在Hadoop上实现的站点功能变得很流行,并且我们发现了一大堆感兴趣的工程师。每个用户都有一个他们想要与之集成的系统列表以及一堆他们想要的新数据源。
我逐渐明白了几件事。
首先,尽管有些混乱,我们建立的管道实际上还是非常有价值的。仅仅在新处理系统(Hadoop)中使数据可用的过程就释放了许多可能性。可以对以前很难做到的数据进行新的计算。许多新产品和分析来自简单地将以前锁定在专用系统中的多个数据组合在一起。
其次,很明显,可靠的数据加载将需要数据管道的深入支持。如果我们捕获了所需的所有结构,则可以使Hadoop数据加载完全自动化,从而在添加新数据源或处理架构变更。数据将神奇地出现在HDFS中,并且将自动为带有相应列的新数据源生成Hive表。
第三,我们的数据覆盖率仍然很低。也就是说,如果你查看LinkedIn在Hadoop中可用的数据的总体百分比,那么它仍然是非常不完整的。鉴于操作每个新数据源需要付出大量的精力,因此要完成工作并非易事。
我们一直在进行的方法-为每个数据源和目标建立自定义数据负载-显然是不可行的。我们有数十个数据系统和数据存储库。连接所有这些将导致在每对系统之间建立自定义管道,如图2-5所示。
请注意,数据经常双向流动,因为许多系统(数据库和Hadoop)都是数据传输的源和目的地。这意味着我们最终将为每个系统构建两个管道:一个用于获取数据,另一个用于获取数据。
显然,这将需要大量人员来建设,而且永远无法运作。当我们接近完全连接时,最终会遇到O(N2)管道。
相反,我们需要一些通用的东西,如图2-6所示。
我们需要尽可能地将每个使用者与数据源隔离开。理想情况下,消费者应该只与单个数据存储库集成,从而可以访问所有内容。
这样做的想法是,添加新的数据系统(无论是数据源还是数据目标)都应该创建集成工作,仅将其连接到单个管道而不是每个数据使用者。
这种经验使我专注于构建Kafka,以将我们在消息传递系统中看到的内容与在数据库和分布式系统内部流行的日志概念相结合。我们希望某些东西首先充当所有活动数据的中央管道,并最终用于许多其他用途,包括Hadoop之外的数据部署,监视数据等等。
长期以来,Kafka作为基础架构产品有点独特(有些人会说奇怪),数据库,日志文件收集系统或传统消息传递系统。但是最近,亚马逊提供的服务帽非常类似于Kafka所说的运动学。相似之处在于处理分区和保留数据的方式,以及高低级使用者之间Kafka API中相当奇怪的划分。我对此感到非常高兴。你创建了良好的基础架构抽象的标志是,亚马逊将其作为服务来提供!他们对此的愿景似乎与我所描述的相似:连接所有分布式系统(DynamoDB,RedShift,S3)的管道以及使用EC2进行分布式流处理的基础。Google遵循了数据流和处理框架,Microsoft已开始向其Azure Service Bus产品迈进。
ETL和数据仓库的关系
结构清晰,集成的数据可支持分析。这是一个好主意。对于那些没有众所周知,数据存储方法涉及从源数据库中提取数据,将其汇总为某种可理解的形式,然后将其加载到中央数据仓库中。拥有一个包含所有数据的干净副本的中央位置,对于数据密集型分析和处理来说是一笔非常宝贵的资产。总体而言,无论你使用传统的数据仓库,这种方法都不会改变太多例如Oracle,Teradata或Hadoop,尽管你可能会更改加载和处理的顺序。
包含干净的集成数据的数据仓库是一项了不起的资产,但是获取这些数据的机制有些过时了。
以数据为中心的组织的关键问题是将干净的集成数据耦合到数据仓库。数据仓库是一个很好的批处理查询基础架构适用于多种报告和临时分析,尤其是当查询涉及简单的计数,汇总和过滤时。但是拥有批处理系统是唯一的干净,完整数据的存储库,意味着该数据对于需要实时提要的系统不可用:实时处理,搜索索引,监视系统等。
ETL实际上是两件事。首先,这是一个提取和数据清理过程,从本质上释放了组织中各种系统中锁定的数据,并消除了任何特定于系统的废话。其次,为数据仓库查询重组数据(即使其适合关系数据库的类型系统,被强制为星型或雪花模式,可能分解为高性能列格式,依此类推)。混淆这两个角色是一个问题。干净,集成的数据存储库还应该实时可用,以进行低延迟处理以及在其他实时存储系统中建立索引。
ETL和组织可伸缩性
我认为这具有使数据仓库ETL在组织上更具可伸缩性的额外好处。数据仓库团队的典型问题是他们负责收集和清理组织中其他每个团队生成的所有数据。激励措施不统一;数据生产者通常对数据仓库中数据的使用不是很了解,最终会创建难以提取的数据,或者需要进行繁重,难以规模转换才能将其转换为可用形式的数据。当然,中央团队从来没有设法按照组织的其他部门的规模进行扩展,因此数据覆盖范围始终参差不齐,数据流脆弱,变化缓慢。
更好的方法是使用中央管道(日志)和定义明确的API来添加数据。与该管道集成并提供干净,结构良好的数据馈送的责任在于该数据馈送的生产者。这意味着作为在他们的系统设计和实现中,他们必须考虑将数据取出并以结构良好的形式传递到中央管道的问题。新存储系统的添加对数据仓库团队没有影响,因为他们拥有集成的中心点。数据仓库团队仅处理以下较简单的问题:从中央日志加载结构化的数据提要,并执行特定于其系统的转换(请参见图2-7)。
当组织考虑采用传统数据仓库之外的其他数据系统时,关于组织可伸缩性的这一点变得尤为重要。例如,假设你希望在组织的整个数据集上提供搜索功能。或者,说你想通过实时趋势图和警报来提供亚秒级的数据流监视。无论哪种情况,传统数据仓库甚至Hadoop集群的基础架构都是不合适的。更糟糕的是,为支持数据库负载而构建的ETL处理管道可能不需要为这些其他系统提供数据,这使得引导这些基础架构的工作像采用数据仓库一样大。这可能不可行,并且可能有助于解释为什么大多数组织都无法轻松地将这些功能用于所有数据。相比之下,如果组织已经建立了统一的,结构良好的数据源,那么让任何新系统完全访问所有数据都只需要一点点集成管道即可。
我们应该在哪里进行数据转换?
此体系结构还针对特定的清理或转换可以驻留的位置提出了一组不同的选项:
- 可以在将数据添加到公司范围的日志之前由数据生产者完成。
- 可以将其作为日志上的实时转换(依次生成新的,转换后的日志)。
- 可以将其作为加载到目标数据系统中的一部分。
最好的模型是让数据发布者在将数据发布到日志之前进行清理。这意味着确保数据采用规范形式,并且不会保留产生该数据的特定代码或可能保存了该数据的存储系统的任何保留。这些细节最好由创建数据的团队处理,因为该团队最了解自己的数据。在此阶段应用的任何逻辑都应该是无损且可逆的。
可以实时进行的任何类型的增值转换都应作为对原始日志提要的后处理。这将包括事件数据的会话化或添加其他普遍感兴趣的字段之类的东西。原始日志仍然可用,但是此实时处理会生成包含增强数据的派生日志。
最后,在加载过程中,仅应执行特定于目标系统的聚合。这可能包括将数据转换为特定的星型或雪花模式,以便在数据仓库中进行分析和报告。因为这个阶段(最自然地映射到传统ETL流程)现在是在一组更干净,更统一的流上完成的,因此应该大大简化。
去耦系统
让我们来谈谈这种架构的附带好处:它可以实现去耦,事件驱动系统。
Web行业中活动数据的典型方法是将其注销到文本文件中,然后可以将其废弃到数据仓库或Hadoop中进行聚合和查询。此问题与所有批处理ETL的问题相同:它将数据流与数据仓库的功能和处理计划耦合在一起。
在LinkedIn,我们以日志为中心的方式构建了事件数据处理。我们将Kafka用作中央的多用户事件日志(见图2-8)。我们定义了数百种事件类型,每种类型都捕获有关特定类型的操作的独特属性。它涵盖了从页面浏览,广告展示,搜索到服务调用和应用程序异常的所有内容。
要了解其优点,请想象一个简单的事件,该事件在作业页面上显示一个作业过帐。作业页面应仅包含显示作业所需的逻辑。但是,在一个动态的站点中,很容易将其与与显示工作无关的其他逻辑缠结在一起。例如,假设我们需要集成以下系统:
- 我们需要将此数据发送到Hadoop和数据仓库以进行脱机处理目的。
- 安全系统需要对视图进行计数,以确保查看者未尝试某种内容抓取。
- 我们需要汇总此视图以显示在职位发布者的分析页面中。
- 工作推荐系统需要记录视图以确保我们正确展示次数上限(针对该用户的任何工作建议)(我们不希望显示一遍又一遍)。
- 监控系统需要跟踪作业的显示率和应用率,以确保系统运行良好。
很快,显示工作的简单动作就变得非常复杂。当我们添加显示作业的其他位置(移动应用程序等)时,必须保留此逻辑,并且复杂性也会增加。更糟糕的是,我们现在需要与之交互的系统有些交织在一起–显示工作的人员需要了解许多其他系统和功能,并确保它们正确集成。这只是问题的玩具版本,任何实际应用程序都将更加复杂,而不是更少。
事件驱动样式提供了一种简化方法。现在,作业显示页面仅显示了一个作业,并记录了已显示该作业的事实以及该作业的相关属性,查看者以及有关该作业显示的任何其他有用事实。其他每个感兴趣的系统-推荐系统,安全系统,职位发布者分析系统和数据仓库-都只订阅提要并进行处理。如果添加了新的数据使用者,则显示代码不需要知道这些其他系统或进行更改。
扩展日志
当然,将发布者与订阅者分开并不是什么新鲜事。但是,如果你想保留一个提交日志,该日志可以充当多用户实时日志,记录消费者规模网站上发生的所有事情,则可伸缩性将是主要挑战。如果我们无法构建出足够快,便宜且可扩展的日志,并且在该领域中不切实际,那么将日志用作通用集成机制绝不会是一种幻想。
分布式系统的人们经常将分布式日志视为一种缓慢的,重量级的抽象(通常仅将其与Zookeeper可能适合的元数据使用类型相关联)。如果考虑周到的实现专注于记录大型数据流,则不必如此。例如,LinkedIn每天向生产Kafka集群写入数千亿条消息。
我们在Kafka中使用了一些技巧来支持这种规模:
- 分区日志
- 通过批量读写来优化吞吐量
- 避免不必要的数据复制
为了允许水平缩放,我们将日志分成多个分区,如图2-9所示。
每个分区都是完全有序的日志,但是分区之间没有全局排序(可能你的消息中可能包含一些挂钟时间)。编写器控制将消息分配给特定分区,大多数用户选择按某种键(例如用户ID)进行分区。分区允许日志追加在分片之间不协调的情况下发生,并允许系统的吞吐量随Kafka群集大小线性扩展,同时仍保持分片密钥内的顺序。
每个分区跨可配置数量的副本进行复制,每个副本都具有分区日志的相同副本。在任何时候,一个分区都将充当领导者;如果领导者失败,其中一个副本将接管领导者。
跨分区缺乏全局顺序是一个限制,但我们尚未发现它是主要问题。确实,与日志的交互通常来自成百上千个不同的进程,因此谈论其行为的总顺序是没有意义的。取而代之的是,我们提供的保证是每个分区都是按顺序保留的,而Kafka保证从单个发件人追加到特定分区的附件将按照其发送顺序进行交付。
类似于文件系统的日志很容易针对线性读取和写入模式进行优化。该日志可以将较小的读取和写入分组到一起,从而进行较大的高吞吐量操作。Kafka积极追求这一优化。在发送数据,写入磁盘,在服务器之间进行复制,将数据传输到使用者以及确认已提交的数据时,将从客户端到服务器进行批处理。
最后,Kafka使用一种简单的二进制格式,该格式在内存日志,磁盘日志和网络内数据传输之间维护。这使我们能够利用多种优化方法,包括零拷贝数据传输。
这些优化的累积效果是,即使维护的数据集大大超出内存,通常也可以以磁盘或网络支持的速率写入和读取数据。例如,单个线程可以每秒大约750k消息的速率写入100字节消息,每条消息均以3倍复制存储。读取速度更快,每秒约90万条消息。提供更多基准。
本文并非主要与Kafka有关,因此我将不进一步讨论。你可以阅读LinkedIn的方法的更详细概述以及Kafka的设计的全面概述。
章节列表: