Presto SQL On Everything

原文地址:https://research.fb.com/publications/presto-sql-on-everything/

presto是一个开源的分布式查询引擎,目前Facebook的大部分SQL分析工作由它支持。
Presto被设计成自适应的、灵活的和可扩展的。它支持具有不同特征的多重查询场景
这些包括:

  • 具有亚秒级延迟要求的面向用户的应用程序;
  • 统计或join tb级数据的多小时ETL作业

Presto Connector API 已为数十个数据源提供高性能的I/O接口,包括Hadoop数据仓库、rdbms、NoSQL系统和流处理系统,在这个文章里, 我将列出一些Presto在Facebook上支持的用例。然后描述它的架构和实现,并给出使它能够支持这些用例的特性和性能优化点。最后,展示了性能测试结果,以展示了我们的主要设计决策及其影响

I. I NTRODUCTION

快速、轻松地从大量数据中提取见解的能力对于以技术为基础的组织来说越来越重要, 因为收集和存储大量数据的成本越来越低, 同时查询这些数据的工具必须变得更快、更容易使用和更灵活。 使用流行的查询语言(如SQL)可以使组织中的更多人能够访问数据并进行分析。 然而,当组织被迫部署多个不兼容的类似sql的系统来解决不同类型的分析问题时,易用性就会受到影响;

Presto是一个开源的分布式SQL查询引擎,从2013年开始在Facebook上运行,现在已经被一些大公司使用,包括Uber, Netflix, Airbnb,Bloomberg,LinkedIn,一些组织例如: Qubole,Treasure Data, and Starburst Data便有一些基于Presto的商用产品;

Presto被设计成自适应的、灵活的和可扩展的系统, 它提供了一个ANSI SQL接口来查询数据,数据可以存储在Hadoop环境,开源和专有的rdbms中,流式系统例如Kafka ,NoSQL系统等等,通过Generic RPC链接器 使得将SQL接口添加到专用系统就像实现6个RPC端点一样简单。 Presto开放一个HTTP API,附带JDBC支持,并与其他行业标准的商业智能(BI)和查询工具兼容,内置的HiveConnectors天然的支持从HDFS和S3等分布式文件系统中读取数据,并且支持几个流行的开源文件格式如ORC,Parquet以及Avro。

截止2018年底, Presto负责支持Facebook的大部分SQL分析工作,包括BI查询,大的ETL job;此外, Presto支持多个面向终端用户的分析工具,提供高性能的Dashboards,为多个NOSQL系统提供SQL接口。并且支持FaceBook的A/B测试基础设施,总的来说, 在Facebook上,Presto每天要处理数百pb的数据和千万亿行数据;

Presto有几个显著的特点

  1. 它是一个自适应的多租户系统,能够同时运行数百个内存、I/O和cpu密集型查询,并扩展到数千个工作节点,同时有效地利用集群资源。

  2. 其可扩展的联邦设计允许管理员设置集群以处理来自许多不同数据源的数据,甚至在一个查询中进行。这减少了集成多个系统的复杂性。

  3. 它是灵活的,可以配置为支持具有不同特征的各种用例

  4. 它是为高性能而构建的,具有几个关键的特性和优化,包括代码生成。多个正在运行的查询共享一个长期存在的Java虚拟机,Worker节点上(JVM)进程,这减少了响应时间,但需要集成调度、资源管理和隔离

本文的主要是描述Presto引擎的设计,讨论实现上述特性所需的具体优化和权衡, 次要是一些关键设计决策和优化的性能结果,以及在开发和维护Presto时获得的经验教训。

II. USE CASES

在这一节中,我将描四个 不同的使用场景,并描述他们的需求;

A : Interactive Analytics(交互分析)

Facebook将一个庞大的多租户数据仓库作为内部服务来运营, 几个业务功能和组织单元共享一组更小的托管集群, 数据存储在分布式文件系统中,元数据存储在单独的服务中, 这些系统分别具有与HDFS和Hive metastore服务类似的api。 我们将其称为“Facebook数据仓库”,并使用Presto“Hive”Connectors的一个变体来读写它。

Facebook的工程师和数据科学家通常会检查少量数据(∼50GB-3TB压缩),测试假设,并构建可视化或指示板; 用户通常依赖于查询创作工具、BI工具或 Jupyter notebooks。 单个集群需要并发支持50-100个具有不同查询类型的查询,并在几秒或几分钟内返回结果。 用户对端到端挂间非常敏感;同时用户可能对查询资源需求没有很好的了解(用户只想获取自己需要的数据对SQL执行需要加载多少数据以及消耗多少资源并不了解); 在执行探索性分析时,用户可能不需要返回整个结果集, 在返回初始结果之后,查询通常会被取消,或者使用LIMIT子句来限制系统应该产生的结果数据的数量。

B Batch ETL

我们在上面描述的数据仓库使用ETL查询定期填充新数据, 查询由工作流管理系统调度,该系统确定任务之间的依赖关系并相应地调度它们。 Presto支持用户从遗留的批处理系统迁移,ETL查询现在占了Facebook上Presto工作负载的很大一部分。 这些查询通常由数据工程师编写和优化。相较于交互式分析用例,这些查询往往是资源密集型的,经常涉及到执行cpu密集型转换和内存密集型(多次)分布式内存的TBs)与其他大表的聚合或连接。 与资源效率和总体集群吞吐量相比,查询延迟的重要性要低一些。

C A/B Testing

Facebook使用A/B Test通过统计测试来评估产品变化的影响, Facebook的A/B Test基础架构是基于Presto构建的。 用户希望在数小时(而不是数天)内获得测试结果,并希望数据完整和准确。 对于用户来说,希望能够在交互延迟约5-30秒内对结果执行任意的切片操作,以便获得更深入的了解。通过预先聚合数据很难满足这一要求,因此必须动态地计算结果, 产生结果需要连接多个大型数据集,包括用户、设备、测试和事件属性。 由于查询是通过编程生成的,所以Query Shape被限制在一个小的集合中。

D Developer/Advertiser Analytics

为外部开发人员和广告客户提供的一些自定义报告工具是基于Presto构建的。此用例的一个示例部署是Facebook Analytics ,它为使用Facebook平台构建应用程序的开发人员提供高级分析工具; 这些部署通常公开一个web接口,该接口可以生成一组受限制的Query Shape, 数据量很大,但查询是高度选择性的,因为用户只能为自己的应用程序或广告访问数据, 大多数Query Shape包含连接、聚合或窗口函数; 数据摄入延迟以分钟为数量级, 有非常严格的查询延迟要求(∼50ms-5),因为工具应该是交互活动的。集群必须具有99.999%的可用性,并且在给定用户数量的情况下支持数百个并发查询;

III. A RCHITECTURE O VERVIEW

Presto集群由一个Coordinator节点和一个或多个Worker节点组成, Coordinator负责接收、解析、规划和优化查询以及查询编排;Worker节点负责执行查询;图一展示了presto 架构的简化视图;


Presto_m1.png

客户端向Coordinator发送一个包含SQL状态的HTTP请求。Coordinator 通过评估队列策略、解析和分析SQL文本、创建和优化分布式执行计划

Coordinator分发计划到Worker节点; 开始执行 tasks,然后开始列举splits:splits是外部存储系统中可寻址数据块的不透明句柄。splits 被分配给负责读取数据的Task;Worker 节点 运行这些任务,并通过从外部系统获取数据来处理这些Splits,或者处理其他Workder产生的中间结果; Worder使用协作多任务并行处理来自多个查询的任务; 尽可能多地使用Pipline执行,数据在任务之间流动; 对于某些Query Shape,Presto能够在处理所有数据之前返回结果, 中间数据和状态尽可能地存储在内存中。在节点之间变换数据时,缓冲被调到最小延迟。

Presto的设计是可扩展的;并提供了一个通用的插件接口; 插件可以提供自定义数据类型、函数、访问控制实现、事件消费者、队列策略和配置属性。更重要的是,插件还提供connectors, 它使Presto能够通过 Connector API与外部数据存储进行通信,主要有四个部分组成:

  1. the Metadata API,

  2. Data Location API,

  3. Data Source API,

  4. and Data Sink API

使用这些api可以的在物理分布式执行引擎中实现高性能Connectors; 开发人员已经向主Presto存储库提供了十几种connectors,我们知道有几种专用Connectors

IV. SYSTEM DESIGN

在本节中,我们将介绍Presto引擎的一些关键设计决策和特性。我们描述了Presto支持的SQL方言,然后介绍从客户端提交查询到分布式执行的整个查询生命周期。同事会介绍一些在Presto中用以支持多租户的资源管理机制。最后,简要讨论下容错处理;

A. SQL Dialect

Presto严格遵循ANSI SQL规范[2]。虽然该引擎没有实现该规范所描述的所有特性,但所提供的特性尽可能符合规范, 为了提高可用性,我们对该语言进行了一些精心选择的扩展, 例如,在ANSI SQL中很难操作复杂的数据类型,比如映射和数组。

为了简化对这些常见数据类型的操作,Presto语法支持匿名函数(lambda表达式)和内置的高阶函数(例如,transform、filter、reduce)。

B. Client Interfaces, Parsing, and Planning

1) Client Interfaces

Presto coordinator主要向客户端提供一个RESTful HTTP接口,并附带一个一流的命令行接口, Presto还提供了一个JDBC客户端,它支持与各种BI工具,包括Tableau和Microstrategy。

2)Parsing

Presto使用基于Antlr的解析器进行转换将SQL语句转换成语法树。分析器使用这棵树来确定类型、强制转换、解析函数和作用域,并提取逻辑组件,如子查询、聚合和窗口函数;

3) Logical Planning

逻辑规划器使用语法树来分析信息以生成计划节点树,每个节点表示一个物理或逻辑操作,而plan节点的子节点是其输入。Planner产生的节点是纯逻辑的,也就是说,它们不包含有关计划如何执行的任何信息。

考虑下面这个简单的查询:

SELECT
    orders.orderkey, SUM(tax)
FROM orders
LEFT JOIN lineitem
    ON orders.orderkey = lineitem.orderkey
WHERE discount = 0
GROUP BY orders.orderkey

此查询的逻辑 计划如图2所示。

Presto_2.png

C. Query Optimization

plan optimizer将逻辑计划转换为表示更加物理的执行策略。 这个过程通过贪婪地评估一组转换规则来完成,直到达到一个固定点,每个规则都有一个模式,该模式可以匹配查询计划的子树,并确定是否应该应用转换。 结果就是用一个一个逻辑上等价的子计划替换匹配的目标。Presto包含几个规则,例如众所周知的优化:如谓词和限制下推、列修剪和去关联等;

我们正在不断的增强优化器,以便使用Cascades框架[13]引入的技术:一种基于成本的评估来对搜索空间进行更全面的探索。然而,Presto已经支持两个基于成本的优化,它们考虑了表和列的统计信息——*** join strategy selectionjoin re-ordering***. 我们将只讨论优化器的几个功能;详细的论述超出了本文的范围。

1) Data Layouts
Optimizer可以利用Connectors Data Layout API提供的数据物理布局。Connectors报告位置和其他数据属性,如分区、排序和分组以及索引。 Connectors 可以为单个表返回多个Layouts,每个Layouts具有不同的属性,Optimizer可以为查询[15][19]以选择最有效的Layouts; 此功能可以用于管理和优化Developer/Advertiser Analytics集群,使能够简单的通过添加物理Layouts来优化新的query shapes。在后面几节中,我们将看到引擎利用这些属性的一些方法

2) Predicate Pushdown
Optimizer可以使用Connectors来决定何时将范围和等式谓词下推到Connectors,从而提高过滤效率。 例如,Developer/Advertiser Analytics利用了构建在Sharded MySQL之上的专用Connectors:Connectors将数据划分为存储在单个MySQL实例中的碎片,并可以将范围或谓词一直推到单个碎片,从而确保只从MySQL读取匹配的数据; 如果存在多个Layouts,引擎将选择在谓词列上建立索引的layouts。高效的基于索引的过滤对于Developer/Advertiser Analytics工具中使用的高选择性过滤非常重要。在Interactive Analytics和Batch ETL用例中,HiveConnectors 利用了分区修剪和文件格式特性(Section V-C)以提高性能

3) Inter-node Parallelism
优化过程的一部分包括确定计划中可以跨Workder并行执行的部分,这些部分被称为“stages”, 每个stages被分配成一个或多个Task,每个Task在不同的输入数据集上执行相同的计算; 引擎将用于在Stage间交换的数据插入缓冲的内存,以完成Stage间的数据交换(Shuffle); Shuffle增加了延迟,消耗了缓冲内存,并且有很高的CPU开销; 因此,Optimizer必须仔细考虑计划中引入的Shuffle的总数。图3展示了如何实现将Plan拆分成Stage,并且使用Shuffle连接它们

Presto_3.png

Data Layout Properties
Physical data layout可以用来优化Plan中的Shuffle数;这是非常有用的尤其是**A/B Testing **用例中,因为这里面有大量的Join,以便获取实验明细和汇总信息;该引擎可以利用一个条件:即两个参与JOIN的表是使用的同一个分区字段,那么可以使用一个co-located join策略去消除资源密集型的Shuffle;

如果Connection显示这样一个Data Layout,其中JOIN字段是有索引的,优化器就可以确定使用Index Nested Loop Join是一个恰当的策略; 当连接生产数据存储系统(key-value类型或者其他),这可以非常高效地操作存储在数据仓库中的规范化数据,这也是Interactive Analytics中常用的特性;

Node Properties
与Connection类似,在Plan Tree中的节点也可以表示输出的属性(例如数据的partitioning,sorting,buckerting,以及数据分组特征),这些节点有required prefered的参数,在介绍Shuuffle时会提到这些属性参数;在有些情况下这些参数可以减少所需的Shuffle数量;Presto会尽可能多的展示所需的属性参数,以尽可能的减少 Shuffle,这就意味着优化器可能选择在较少的列上进行分区,这有时候会导致严重的数据倾斜,在造成类似图三中单个数据处理 Stage的 崩溃;

4) Intra-node Parallelism

优化器使用类似的机制来标识 Plan stage中的部分,这些部分可以从跨单个节点上的线程并行化中获益; 节点内的并行处理比节点间的并行处理效率高得多, 因为几乎没有延迟开销和状态 可以在线程之间有效地共享。添加节点内并行会得到显著的速度提升,特别是对于哪些因并发性而限制下游Stages吞吐量的query shapes;

  • Interactive Analytics包括运行许多短期的一次性查询,用户通常不会花时间去优化这些查询, 因此,分区倾斜是常见的,或者是由于数据的固有属性,或者是由于常见的查询模式(例如,按用户国家分组,同时也过滤到一小组国家)也会导致数据倾斜。这通常表现为在少量节点上对大量数据进行哈希分区。

  • Batch ETL作业通常只需要很少或根本不需要过滤就可以转换大型数据集。在这些场景中,树的较高级别所涉及的节点数量越少,可能不足以快速处理由叶子阶段生成的数据量。任务调度将在IV-D2节中讨论。

在这两种情况下哎,在每个几点多线程并发计算在一定程度上缓解了这个问题。该引擎可以在多线程中运行单个Operators(或者Pipeline), 图4显示了优化器如何并行化连接的一个部分。


Presto_4.png

如图3所示,实现优化方案,显示Tasks、pipelines,operators。pipelines1和pipelines2跨多个线程并行化,以加速散列连接的构建端

D. Scheduling

Coordinator将Stage Plan以可执行Task的形式分发到Workers节点,这些Task可以认为是单个执行单元;之后Coordinator将Tasks从一个Stage连接到另一个Stage,组成一执行树,彼此通过Shuffle连接。数据流从一个Stage到另一个Stage,只要他是可用的;

一个Task可能有多个Pipeline。一个Pipeline包含一连串的Operators,每一个Operatior都执行一个定义好的数据计算。例如,一个Task要执行一个Hash-Join必须包含至少两个Pipeline:一个pipeline创建hash table(build pipeline),另一个从probe获取数据并且执行Join(probe pipeline)。当优化器确定某个pipeline可用通过增加本地并行度来进行优化时,它将拆分这个Pipeline且将拆分的部分独立并行执行;图4展示了Build pipeline是如何拆封成两个Pipeline的,其中一个用于扫描数据,另一个用于构建Hash table的Partition;Pipelines通过本地内存Shuffle完成Join;

当执行一个查询时,引擎将做出两组Scheduling策略);第一个决定各个Stage的执行顺序,另一个决定多少个Task将被调度,以及他们应该运行在哪个节点上;

1. Stage Scheduling
Presto执行两种关于Stage的调度策略:All-At-Once和Phased;All-at-once通过同时并行调度所有的Stage以最小化Wall Clock Time,数据一旦可用就会被处理;这种调度策略适合于延迟敏感型的场景,例如交互查询,Developer/Advertiser分析以及A/B Testing;Phased按照有向数据流标识并执行所有的组件,以避免死锁。例如,如果一个Hash-Join正在以Phased模式执行, 在构建哈希表之前,不会对左侧的流调度任务进行调度。 这极大地提高了批处理场景下的内存效率;

当调度器确定某个阶段应该根据策略进行调度时,它开始将该阶段的任务分配给工作节点。

2. Task Scheduling
Task Scheduler将检查Plan tree并且将Stage划分为Leaf Stage和Intermediate Stage;Leaf Stage从Connectors读取数据;Intermediate Stage仅仅处理来自其他Stage的中间结果;

Leaf Stage:对于Leaf Stage, Task Scheduler在将Tasks分配给Worker节点时会考虑网络和connector施加的约束(takes into account the constraints imposed by the network and connector),例如Shared-Nothing架构要求Worker节点与Storage 节点共存,Scheduler使用Connector Data Layout API决定Task在这些情况下的安排;A/B Testing场景要求高吞吐量低延迟的,这可以通过Raptor Connector满足,Raptor是一个存储引擎,他针对Presto进行了优化,使用Shared-nothing架构,可以在磁盘上存储ORC文件,元数据存储在MySQL中;

分析显示,在我们的生产集群中,大部分CPU时间都花在了对Connectors读取的数据进行解压缩、解码、过滤和应用转换上,这些任务都是高度并行的并且在尽可能多的节点上运行这些Stage将缩短Wall Time:因此, 如果没有约束,并且数据可以被划分为足够的Splits,那么在集群中的每个工作节点上都安排一个Leaf Stage; 对于在共享存储模式下运行的Facebook数据仓库部署(即所有数据都是远程的),集群中的每个节点通常都参与处理Leaf Stage, 这种执行策略可能是网络密集型的。

调度器还可以推断网络拓扑,使用插件提供的层次结构优化读取。 Facebook的Network-constrained部署可以使用这种机制向引擎表达对rack-local reads而不是rack-remote reads的偏好。

Intermediate Stage:Intermediate Stage 的Task有可能在任何Worker节点上执行。然而,引擎依旧需要决定每一Stage需要有多少个Task被调度;这取决于Connector的配置,Plan的参数,数据层的要求,以及其他开发配置,在一些情况下,引擎可以动态的改变运行期间的Task个数。IV-E3部分将详细讨论;

3. Split Scheduling
当一个Leaf Stage的Task开始在Worker节点上执行是,这个节点将可以接受一个多多个Splits(在第三部分讨论)。Splits 包含的信息因Connector而异,当从分布式文件系统读取数据时,一个Split可能包含文件路径和这个文件区域的Offset;对于Redis key-valuse存储,一个Split包含表信息,key和value的格式,要查询的主机列表等内容;

每一个LeafStage的Task必须被分配一个或多个Split才有资格运行;Intermediate Stage 的Task总有资格运行,并且只有当他们失败或者上游Task全部执行完成时他们才算完成;

Split Assignment:当Task分配到Worker节点时,Connector开始为这些Task分配Split。Presto要求Connector列举小批次的Split并且懒惰的将其分配给Task。这是Presto的一个重要特性,这样做有一些好处

  • 将查询响应时间和枚举大量Splits的时间拆分开,例如,可能需要几分钟从HiveConnector枚举partition并列出每个分区中的文件;

  • 那些不需要处理所有数据就可以产生结果的查询(例如通过滤简单查询数据)当满足Limit条件时可以快速的取消或者提前完成;在交互查询场景中,在所有Splits被枚举完成之前查询就已经完成了;

  • Worker维护一个分配该他处理的Split队列。Connector建档将新的Split分配给具有最短队列的Task, 保持这些队列较小可以使系统适应处理不同Splits的CPU成本差异和Worker之间的性能差异;

  • 允许查询执行而不是将所有元数据保存在内存中,这对于Hive Connector非常重要,因为他的查询可能方位数百万个Split并且轻松的消耗掉connector的所有可用内存;

这些特性对于那些运行在FaceBook内基于Hive的数据仓库的交互查询和Batch ETL场景非常有用;但是值得注意的是惰性的Split枚举将使得准确评估报表查询进展非常困难;

E Query Execution

1. Local Data Flow
一旦一个Split被分配给一个线程,他将由Driver loop执行。Presto Driver Loop比流行的Volcano(pull)模型更加复杂【1】,但是提供了更重要的功能,她更适合多任务协作,因为operators可以在释放线程之前快速得到已知状态,而不是无限期阻塞; 此外,通过在那些不需要额外输入的operators间移动数据( 例如,恢复资源密集型或爆发性转换的计算),驱动程序可以最大化每个quanta中的执行效率; 循环的每次迭代都在所有能够取得进展的operators之间移动数据;

Driver loop操作的数据单元称为page,它是对一系列行进行的列编码,Connector Data Source API返回Pages,当他传递一个Split时,并且Operators通常消费输入的pages,执行计算并生成输出Pages;

图5展示Page在内存中的结构;Driver Loop 不断地在操作符之间移动 Pages,直到调度单元完成(在第四节- f1中讨论),或者直到Operators无法取得进展;

2. Shuffles
Presto旨在最小化端到端的延迟并且最大化资源利用,我们的节点间数据流机制反应了这种设计; Presto使用基于HTTP的内存缓冲来交换临时结果。Task生成的数据存储在缓冲区中,供其他Worker使用。 Worker使用HTTP长轮询来请求来自其他Worker的中间结果,直到客户端使用在前一个响应中发送的令牌请求下一个 Segment之前服务会一致保留数据。这使得确认隐含在传输协议中,长轮询机制使得响应时间最小,尤其是在传输少量数据时, 这种机制提供更低的延迟,相较于其他将Shuffle数据持久化到磁盘的系统要低得多,并允许Prest支持延迟敏感的场景,例如交互分析;

引擎对并行性进行调优,以保持Input buffer和 Output buffer的利用率。满的output buffer,将导致splits的执行减缓,并且耗尽宝贵的内存,同时未充分利用的 input buffer将会增加不必要的处理开销;

引擎将持续监控output buffer 的利用率。当利用率一直很高时他通过减少可执行split的数量来降低有效的并发;在共享网络资源的场景下这有效的提升了公平性;这同时也是一个重要的效率优化策略,尤其是当与客户端(终端或者Worker)无法以数据生成的速率消费数据时; 如果没有这个功能,运行复杂多阶段查询的慢客户机可能会在很长一段时间内保存数十gb的缓冲内存;这个方案是常见的,甚至在交互分析中,当少量的结果数据(10-50M)通过BI或者查询编辑工具,基于一个慢的连接被下载,;

在接受方, 该引擎监视每个请求传输的数据,以计算目标HTTP请求并发度,从而在不超过Input buffer容量的情况下填充Input buffer, 这种反压力会导致上游任务在其缓冲区填满时变慢

3. Write
ETL 任务生产的数据通常要写入到其他表中;在远程存储环境中写性能的一个重要驱动是执行写操作的并发性(例如通过Connector Data Sink API 写数据的线程总数)。

考虑使用AmazonS3作为存储的HiveConnector。每一个写S3的并发都会创建一个文件,并且对少量数据的数百次写入可能会创造小文件;除非这些小文件可以稍后被合并掉,否则这些小文件可能会在读取时造成无法接受的高的开销(许多慢的元数据操作和延迟限制读取性能)。然而,使用太小的并发将会把总的写的吞吐降低到不可接受的程度。Presto再次采用了一种自适应的方案,通过在更多节点上添加Tasks动态的增加写并发,当引擎确定当前Stage产生的写的数据超过了Buffer利用率的阈值( 可配置的每个写入器的数据写入阈值),这对于写操作比较重的批处理ETL任务有着重要的性能优化;

F. Resource Management

Presto可以很好的支持多租户的一个很重要的原因就是他包含一个完善的细粒度的资源管理系统。一个集群可以并发执行几百个查询,并且最大化使用CPU,IO和内存资源;

1. CPU Scheduling
Presto主要针对集群整体的吞吐量进行优化,例如处理数据现成的总CPU使用。对于计算比较轻的查询Local scheduler 额外的优化了 low turnaround time,并且在具有类似CPU需求的查询之间公平的共享CPU资源。一个Task的资源使用是其每一个Split的线程CPU时间的总和。为了最小化Coordination负载, Presto跟踪Task级别的CPU资源使用,并且基于本地进行调度;

Presto在每一个Worker节点上并发调度许多Task,实现多租户的同事使用并发任务模型。任何给定Split只允许使用最大的quanta在一个线程上运行一秒,之后,他必须放弃这个线程并返回队列。当output Buffer 满时(下游Stage不能及时消费掉数据),Input buffer是空的(上游stage不能足够快的生产数据),或者系统内存不足,Local Scheduler简单的切换去处理其他的task,甚至在Split的quanta完成之前。这样就释放了一些线程,可以用于执行splits,帮助Presto最大化CPU使用并且最大的适应不同的查询。我们的所有查询场景都得利于这种粒度的资源效率;

当一个Split放弃一个线程时,引擎需要决定哪个Task(关联一个或多个Splits)将会执行。Presto建档的使用Tasks的总CPU时间将其划分为多级反馈队列(multi-level feedback queue)的五个级别[8],而不是提前预测(predict)完成一个 查询需要的资源。当Task积累的CPU时间越多他的级别越高。每一个级别都分配了可用的CPU时间。实时上,在任意工作负载下实现多任务协作的公平都是一个挑战;对于split, IO 以及CPU特性 相差很大(有时甚至同一个Task中的Split),复杂的功能(例如正则表达式) 与其他线程相比,会消耗过多的线程时间。一些Connector不提供异步API,因此Worker的线程可能工作几分钟。

在处理这些约束时,Sheduler必须自动适应; 该系统提供了低成本的成品率信号,因此Operator内长时间运行的计算可以被停止。如果一个Operator执行一个Quanta,Shceduler向Task收取实际的执行时间,并暂时的减少未来的执行次数。这种自适应的操作允许我们在交互分析和ETL处理场景下执行更多样的查询,Presto为查询消耗资源最低的提供更高的优先级。这种选择基于一种理解:用户更希望低成本的查询更快的完成,并不太关心高计算成本的任务的执行时间。同时运行更多的查询以较少聚合队列的时间,即使这是以牺牲更多的上下文切换为代价,因为短的查询会很快的退出系统;

2. Memory Management
在类似Presto这种多租户系统中内存是资源管理的主要挑战之一;在这节中将描述引擎控制整个集群内存分配的机制;
MemoryPools : 所有的分配给Presto的非零散的内存都必须被分类作为User Memery 和System Memeory内存 ,并且将内存保存在相应的内存池中。User Memory是一种用户在使用时只需要考虑系统和输入数据的内存(例如聚合内存的使用量和他关联的基数Cardinality).另一方面,SystemMomory是这样一种内存:很大程度是实现执行策略的副产品(Shuffle Buffer),并且与查询类型和输入数据量无关。

引擎会对User和Total(User+System)施加单独的限制,超过全局限制的查询(在Worker中的聚合)或者超过每个节点限制的查询将会被Kill掉,当一个节点内存不足时,通过停止对任务处理来阻塞查询保留的内存。Total内存的限制通常比User内存的限制高的多,并且在生产环境中只有少数的查询会超出Total 内存限制。

针对查询的每一个节点以及全局用户内存限制通常是不同的;这最大程度的允许使用倾斜。考虑一500个节点的集群,每个节点100G可用的查询内存,并且要求单个查询全局最大可用5TB。在这种情况下,10个查询可以并发分配相同数量的总内存。然而,如果你想允许2:1的倾斜(例如其中一个查询消耗2倍的内存),每一个节点的查询内存限制必须设置为20G,这就意味着只有5个查询肯定可以运行,前提是没有耗尽节点的可用内存。

重要的是我们可以执行多于5个的并发查询,在一个500节点的交互分析或者ETL批处理的集群上。考虑到集群中的这些查询在内存特性方面差异很大(倾斜,分配率,分配暂时的局部性),在任何同一个时间点,5个查询分配在同一个节点上且同时分配到他们的上限的情况基本不可能。因此,只要有在节点内存不足时保持集群健康的机制,那么过度使用集群内存通常是安全的,在Presto中有两种机制:Spilling以及 Reserved Pools

Spilling:当一个节点内存不足时,引擎在满足要求的(eligible Tasks)任务上启用内存回收程序,并且按照执行时间的先后顺序排序进行,当有足够的内存满足最后一个请求时停止回收操作。撤销程序将状态 Spilling到磁盘,Presto支持针对Hash Join和汇总的Spilling ,然而,我们没有将所有的FaceBook部署为Spill,集群大小通常足以支持几个TB的分布式内存,用户倾向于全内存执行的可预期的延迟,而本地磁盘将增加硬件成本(尤其是在FaceBook的共享存储架构的部署中)

Reserved Pool:如果一个节点内存不足,并且集群没有配置Spill,或者已经没有剩余的可撤销( revocable)的内存,Reserved Memory机制用于解锁集群,每一个节点上的查询内存池进一步划分为两个池:General 和 Reserved.。当Worker节点上的General Pool耗尽时,该节点上使用内存最大的查询将‘Promoted’到位于所有节点上的Reserved Pool,在这种情况下,分配给该查询的内存将计入Reserved Pool而不是GeneralPool,为了防止死锁(此时不同的Worker是分摊不同的查询)整个集群中,只有一个查询可以进入Reserved Pool。如果节点上的General已经耗尽,此时Reserved Pool被占用,在该节点上的其他Tasks的所有内存请求都会被禁止。查询会运行在Reserved Pool中直至完成,且只有此时集群才会将所有未完成的查询的内存解锁。这有点浪费,因为必须调整每个节点上Reserved Pool的大小去适应那些适合在本地内存限制下运行的查询。集群可以设置成杀掉查询已解锁大多数节点。

G. Fault Tolerance

Presto可以使用低级别的重试从暂时的错误中恢复。然而,在2018年,Presto没有任何有重大意义的关于Coordinator和Worker节点的崩溃的容错构建。Coordinator失败将导致整个集群的不可用,Worker的崩溃将导致运行在该节点上的所有的查询失败;

在Facebook的生产环境中,我们使用额外的编排机制运行集群在基于不同的查询用的可用模型。在交互分析和批处理ETL的查询运行在单节点的Coordinator,然而,A/B testing和 Developer/Advertiser Analytics运行在多活集群上。外部监控系统识别异常失败的节点,并将其从集群中移除,同时被修复的节点自动加入到集群中。 这些机制一定程度上减少了集群不可用性,但是无法完全避免;

标准检查点或部分恢复技术在计算上非常昂贵, 而且很难在一个系统中实现,这个系统的设计目的是在客户端可以得到结果的时候就将结果流回客户端。基于复制的容错机制会消耗大量的资源,考虑到成本,这种技术的预期价值上不清楚,尤其是考虑到节点的平均故障时间时,1000个节点的集群的测试数据显示大多数查询在几小时内完成,包括批出里的ETL。其他研究人员也得出类似的结论

然而我们正在致力于提升长运行时间查询的容错,我们正在评估是否可以添加可选的检查点和限制不能以流水线方式运行的Plan的子树的重启

V. Q UERY P ROCESSING O PTIMIZATIONS

这一节中我们将描述几个重要的查询处理优化,这些优化可以是大多数的场景受益;

A. Working with the JVM

Presto是基于Java实现的并且运行在JVM中, 想要从实现中获得最佳性能需要充分利用底层平台的优势和限制。例如压缩和检验算法等性能敏感的代码可以通过特定的优化例如对CPU指令进行优化。然而并没有应用级的机制去控制JIT(JAVM JUST-In_Time)如何生成代码,但是构造代码却是可行的,这样它就可以利用JIT编译器提供的优化,例如内联方法(method inLining),循环展开( loop unrolling),以及函数(intrinsics)。我们正在探索Graal[22]在JVM无法生成最优机器码的情况下的使用,比如128位的数学运算。

垃圾收集(GC)算法的选择会对应用程序性能产生显著影响,甚至会影响应用程序实现的选择,Presto使用G1收集器他对于那些大于一定尺寸的Object的处理能力很差, 为了限制这些对象的数量,Presto避免分配大于“humongous”阈值的对象或缓冲区,并在必要时使用分段数组。 由于在G1中维护已记住的集合结构,大型且高度链接的对象图也可能会出现问题【10】。 查询执行的关键路径中的数据结构是在平面内存数组上进行的,以减少引用和对象数量,并使GC的工作更容易。例如HISTOGRAM 聚合存储桶键,并在一组平面数组和哈希表中count所有组,而不是使用单独的Object维护每一个histogram(直方图)。

B. Code Generation

该引擎的主要性能特性之一是代码生成,它以JVM字节码为目标。这有两种形式:

  1. Expression Evaluation:查询的性能部分取决于他计算复杂表达式的速度。Presto包含一个表达式解析器,他可以计算我们用户测试的任意复杂表达式, 但是对于评估数十亿行数据的生产应用来说,这个速度太慢了。 为了加快速度,Presto生成字节码来本地处理常量、函数调用、对变量的引用以及延迟或短路操作

  2. Targeting JIT Optimizer Heuristics: Presto为几个键operators和operators合生成字节码, 生成器利用引擎在计算语义方面的优势来生成更适合于JIT优化的字节码(相比于通常的循环处理)。 生成器主要针对三种行为:

    由于引擎需要在来自不同Task piplines的不同Split的每个Quanta间切换,JIT将无法对一个公共的基于循环的实现进行优化。因为为紧密处理循环收集的分析信息将被其他任务或查询污染

    甚至在单个Task piplines的处理循环中, 引擎知道每个计算所涉及的类型,并可以在列上生成展开的循环。 消除在循环体重的目标类型方差将推到出一个结论, 调用位置是单态的。 允许它内联虚拟方法。

    因为为每个Task生成的字节码被编译到一个单独的Java类中, JIT优化器可以独立地进行优化分析,事实上, JIT优化器进一步调整 对于实际处理的数据的查询的定制程序, 这种分析在每个任务中独立地发生, 在每个任务处理数据的不同分区的环境中,这可以提高性能。 此外,随着数据的更改,性能概要文件可以在任务的生命周期内更改( 例如,时间序列数据或日志), 导致生成的代码被更新。

生成的字节码还受益于内联的二阶效应, JVM能够扩大优化的范围, 自动向量化较大的计算部分, 并且可以利用基于频率的基本块布局来最小化分支, CPU分支预测也变得更加有效【7】。 字节码生成提高了引擎将中间结果存储在寄存器或缓存中的能力,而不是存储在内存[16]中。

C. File Format Features

Scan operators调用Connector API,他包含了leaf Split信息以及接受以Pages 形式存在的列数据。一个Page包含一些列Blocks ,每一个Block都是一个列,表现为 Flat in-memory。使用 Flat Memory 数据结构对于性能是非常重要的。 特别是对于复杂数据类型。指针跟踪,开箱,以及虚方法调用会给紧凑的循环增加大量的开销。

Hive和Raptor的Connector 尽可能利用特定的文件格式特性【20】。

Presto附带了定制的文件格式阅读器,可以通过使用文件头/页脚中的统计信息有效地跳过数据部分( 例如,最小-最大范围头和Bloom Filter)。 阅读器可以将某些形式的压缩数据直接转换成Blocks这些Block是引擎可以支持操作的( (Section V-E))。

图5展示了每个列都压缩的Page层。字典编码Blocks对于压缩低基数的数据和RLE Block(run-length Encoded)是非常有效的。几个Page可能共享一个字典(Dictionary),这大大提高了内存效率, 在ORC文件中整个“Stripe”(多达百万行)可使用一个字典。


Presto_5.png

D. Lazy Data Loading

Presto支持延迟物化数据, 该功能可以利用ORC、Parquet和RCFile等文件格式的柱状压缩特性。

Connector可以生成懒加载的blocks(lazy blocks) 只有在实际访问时才读取、解压和解码数据, 假设大部分CPU时间都花在了解压和解码上,并且过滤器通常具有很高的选择性,当列很少访问时这种优化非常高效,

一个来生产环境的自批处理ETL用例的测试 延迟加载减少了78%的数据获取,单元加载22%,总CPU时间14%。

E. Operating on Compressed Data

Presto对来自Connector的压缩数据进行操作(例如字段和run-length -encoded blocks)。图5展示了这些Block在Page中的结构,当一个Page处理器评估转化或者过滤一个字段Block时,他处理字典中的所有值(或者rle block中单独的一个值), 这允许引擎在一个快速无条件循环中处理整个字典,在一些情况下,字典中的值比Block中的行还要多,在这种情况下Page处理器进行推测,未引用的值将在子Block中使用,Page 处理器, 记录实际产生的行数和字典的大小, 与处理所有索引相比,这有助于度量处理字典的效率, 如果行数大于字典的大小,则可能更有效地处理字典, 相反,处理字典可能更有效。当页面处理器遇到按块顺序排列的新字典时,它使用这种启发式方法来确定是否继续推测。

Presto在构建哈希表时还利用了字典块结构(例如Join和统计), 随着索引的处理, operator记录数组中每个字典条目的哈希表位置, 如果对后续索引重复该条目, 它只是重用位置,而不是重新计算它。当连续的block共享一个字典, page processor保留数组以进一步减少必要的计算。

Presto还会在执行期间生成中间压缩结果, 例如,当连接处理器更高效时,它会生成以dictionary或run-length编码的块。 对于散列连接,当连接的探测端在散列表中查找键时 它将值索引记录到数组中,而不是复制实际数据, 操作符只是生成一个dictionary块,其中索引列表是该数组,而dictionary是对哈希表中的块的引用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352