官网地址:https://trino.io/docs/current/
参阅书目《Trino: The definitive guide》
开源社区博客地址:https://blog.starburstdata.com/
Trino博客地址:https://trino.io/blog/
作者均是presto创始人三位,因此这两份材料+2019年的presto论文(见我另一篇博客),是最权威的presto技术文档。然而经过编者阅读,这些还不足够精细,更多的技术文档,在开源社区的博客中有描述,编者会将这些信息补充都相应位置,以使读者获得最新的技术理解。
trino就是presto的别名,历史原因不细说,为了叙述方便,以下统称presto。
本文写作时,是trino356版本。写作日期:2021.5.3
presto的基本术语和概念、安装教程、用途,本文不再赘述,相关文档/教程/博客在中文互联网上有很多,可移步查阅。本文的技术深度在user和contributer之间,不涉及源码阅读,只对软件设计的idea和trade-off做重点解读。之后如有机会,会对源码部分进行解读,更新博客。
第四章 Presto的架构
Presto是典型的主从式架构,总结构图如下。
4.3 节点发现服务
节点启动时会向coordinator discovery service报告/通信,之后定期向coordinator发送心跳以证明存活。
这个节点发现服务专门用于追踪存活节点,和worker节点列表维护的,这个服务被嵌入到presto内部,所以worker节点直接和coordinator的http服务通信即可。
4.4 基于连接器的架构
connector用于连接外部数据源。只要外部数据可以用Presto支持的数据类型表示成行、列、表,就可以创建connector与外部数据源相连接。
Presto提供了SPI(service provider interface),用于实现一个连接器,只要连接器实现了SPI,就可以将Presto引擎和外部数据连接起来。SPI需要实现三个部分:
- 获取表/视图/schema元数据的操作;
- 产生数据分区的逻辑单元的操作,基于这些逻辑单元,presto能够并行读写;
- 在源数据和Presto内存数据格式之间进行转换的操作。
Coordinator和worker需要SPI提供的不同接口,如下图:
Presto是以插件形式,动态加载这些Connector。插件的架构,在Presto中得到广泛使用,比如事件监听器、数据类型、函数类型、访问控制等。
4.7 查询执行模型
在物理计划生成之前,会用到多个SPI接口辅助指定计划:
- Metadata SPI在Parse过程中可以进行语义校验、类型检查、安全检查等;
- Data Statistics SPI在plan指定时提供一些表的统计信息,基于此制定基于代价的查询优化;
- Data Location SPI在制定物理计划时提供数据位置,方便scheduler切片。
源数据以Page形式产生数据,Page就是按照列式存储格式的多个行。
task实际上是在具体一个worker node上,给定具体split的一个stage。task创建时,会为每个split初始化一个Driver,每个driver是一个operator流水线的一个实例,负责处理split中的数据。一个task可能会创建多个driver。
oprertor,即算子,流水线上的基本组成单位,表示一个计算处理过程,常见的包括TableScan、Filter、Join、Aggregator等。
Coordinator首先根据MetaData SPI创建一个splits列表,基于该列表进行任务分配,在查询执行期间,coordinator跟踪所有的task和splits执行进度,一些worker完成了任务,产生了更多的splits待下游处理,那么coordinator继续进行分配任务。
4.8 查询优化
这里我们基于一个具体的SQL例子,对查询优化进行讲解。
如下图,这是一个以地区、国家为维度统计销量的SQL语句,基于国家表、地区表、订单表、用户表来完成,是TPC-H上的一个SQL,我们以它作为例子。
4.8.2 初始查询计划
我们根据字面含义,直接对上面这条SQL进行查询计划指定,得到如下的树状图。这样的树状图是可以执行出结果的,但是简单评估下,两次CrossJoin的复杂度就是级别的操作,这样的查询计划不能在人类有生之年执行完毕。而查询优化器的任务,就是从这样的初始查询计划转化成一个等价的查询计划,能更高效的完成任务。理论上来说,等价的查询计划有指数级别个,optimizer的任务,是在一定的时间内,选取尽量最优的查询计划。
4.9-4.10 优化规则/启发式方法
optimizer是根据一些优化规则对初始查询计划进行优化的。我们可以称之为启发式的方法。
4.9.1 谓词下推
谓词下推很容易理解,尽可能地把谓词下推到靠近叶子节点的位置,使得不会在结果中产生的Records不会过多地在执行过程中被处理。
以刚才的例子,我们可以让crossjoin算子和filter算子部分进行合并,变成inner join,进行谓词下推。
谓词下推之后,查询计划的复杂度降低到了。
4.9.2 Cross Join消除
Cross Join一般没有什么业务含义,只会是一些SQL新手写出来的SQL语句,而Cross Join一般代价极高,很难完成,所以要对Cross join进行特定消除。
Cross join的消除,主要是利用filter的筛选条件,对join的表顺序进行重新排序,力图将所有Cross join都变成非cross join。
以刚才的例子,通过表顺序重排,我们可以将两个谓词全部下推,消除Cross join。
cross join之后,查询计划的复杂度不再主要由join支配,join的复杂度为
4.9.3 TopN
对于order by + limit的组合在SQL使用中是非常常见的。naive的想法是先排序,再取前N个Record,排序很重因而复杂度较高。而利用堆进行TopN的分布式查询算法,已经比较成熟了,所以ORDER BY + LIMIT的操作,一般会被简化为TopN算子。
TopN简化之后,排序不再是限速步,排序的复杂度是。
By the way,在SQL中,ORDER BY 后面只有跟LIMIT或者FETCH FIRST才有用,其他情况下,Presto直接忽视ORDER BY。
4.9.4 局部聚合
一般来说,参与Join的表不会用到它所有的明细信息,尤其是在有GROUP BY查询进行聚合时,通常只会用到join表的有限几列的统计(聚合)信息,那么我们可以在join之前先对join表的相关列进行预聚合,然后再join,减少数据传递过程中的流量。
为了提高并行性,预聚合通常采用局部聚合的方式来实现。当局部聚合能显著减少数据传递量时,能够显著提升性能,但是如果不能有效减少数据量,有可能造成反面效应。(因此presto默认关闭了这条规则,可以手动开启)
4.10.1 Leteral Join去关联化
Leteral Join就是在SELECT位置的关联子查询,对于这种简化写法,Presto Parser会首先把它改写成标准的Left Join形式,如下图:
这种改写在语义上并不一致,尤其在空值和重复值上,所以为了保证语义一致,在查询计划上,要为原查询结果分配unique id进行区分,join结果要进行重复性检测,一旦有重复就要报错。
4.10.2 Semi-Join(IN) 去关联化
这里指的是关联子查询,就是在IN子查询内部,需要用到外部表的信息,即产生了关联。
Presto会去除这种关联化,首先让子查询去除与外部的关联条件(本例子中是
WHERE p.partkey=l.partkey
)之后,计算一次结果,然后利用该结果和外部关联表进行inner join,最后根据IN的key进行filter,最终需要手动去重得到唯一结果。
By the way, 对于不关联的Semi-Join,Presto采用哈希表来完成。
4.11 基于代价的优化器
cost-based优化器(CBO)的一个重要特征:不仅考虑query语句的“形状”,还要考虑输入数据的特征。
Presto的代价由三个部分组成:CPU时间+内存需求+网络带宽占用。
4.11.2 Join的代价
Presto可以基于一个扩展的Hash Join算法进行分布式join。主要思路是,将join的一对表分成build side和probe side。首先基于build side的join key构建哈希表,然后probe side去和hash table进行探测,匹配成功的emit。具体来讲,分成三阶段,构成三层哈希以支持并行:
- 首先根据join key的hash范围将build side 和 probe side两张表的数据发送到各个worker node上去,这是第一层哈希f(x);
- 在每个worker node内部,将build side按照join key再次哈希分散到各个线程上去,这是第二层哈希g(x);
- 在每个线程内部,对build side的join key构建哈希表,这是第三层哈希h(x)。第三层哈希表构建结束之后,交给worker node进行合并;合并完成之后,probe side将join key按照h(x)哈希并在本地哈希表中查找,返回结果,这个探测过程可以直接将probe side数据等分给各个线程查找即可。
可能会有疑问,第三层的哈希为什么要合并?不合并也同样可以分布式查询,即将probe side数据按照g(x)分散给各个线程,再按照h(x)探测各个线程的哈希表,这样就不必合并了。
这确实是一种选择,但是考虑到这种方案probe side首先要按照g(x)在本地分散数据,这是一个额外的负担,而presto的方案直接将probe side数据等分即可,考虑到probe side一般是个更大的表,所以presto的选择是更佳的。
另外,为了合并的方便,可以直接取g(x)=h(x),这样合并就是个O(1)的操作。
接下来对分布式hash join进行代价分析:build side的数据必须全部放置于内存以快速构建和emit,这个内存开销是很大的;join两侧的表都必须通过网络shuffle广播,这个代价也非常大。
为了控制内存开销,CBO会选择小表来当build side。这依赖于表的统计信息。
Join的优化:动态过滤
一般来说,小表的join key值域范围比大表的join key值域范围要小,对于这些超出值域范围的大表Record,1)成段/分区的数据都没有必要读到Presto中来;2)散装的数据没有必要通过网络shuffle。而build side在构建hash table时统计下值域范围,几乎是0代价的,只需要告诉下上游(一般还是本节点,所以是被一个driver控制的进程)分发大表数据的节点即可。
由于这种筛选条件,ORC和Parquet能更好地对动态过滤进行利用。目前仅广播式join才有这种优化,分布式join汇总数据范围会很麻烦(不是本节点控制大表读取)。
Join的优化:动态分区剪裁
在数据仓库(比如Hive)中,举一个例子,事实表(订单表)和维度表(时间表)相join的例子,下面这个SQL。
SELECT COUNT(*) FROM
store_sales JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk
WHERE d_following_holiday='Y' AND d_year = 2000;
维度表的谓词会下推到Hive ORC,进行分区剪裁,惰性读取等各种优化(后面讲Hive Connector会讲到),会提取出来一张小表,很高效,没问题。但是对于庞大的事实表呢,会全量读取,这太可怕了。
这时候大家会想到,刚才有一个优化动态过滤技术,正好用的上。确实,但是只能用在广播式join和ORC/Parquet上,还是有限制,而且即使使用了,也要逐个分区逐个文件扫描一下,才能确定剪裁,我们想从分区层面直接剪裁掉。
具体实现:对动态过滤进行扩展,首先collect小表的join key range,将其传递给coordiantor,然后让coordinator根据HMS的信息去裁剪,然后再进行splits分配。这样的话,完全没有对join方式和存储格式的限制。
4.11.3 表统计信息
SPI Statistics包括以下信息:
- 行的数量;
- 列中唯一值的数量(COUNT(DISTINCT ));
- 列中空值比例;
- 列中极值;
- 列的平均数据大小。
CBO可以利用这些统计信息进行代价度量。
举一个例子,对于刚才的join过程,CBO可以利用这些统计信息中的行数量、列平均数据大小,判断表的大小,以此来决定join的顺序。
比如,如下的join顺序就是纯基于内存考虑的,从最大的lineitem开始进行join,build side依次用更小的表:
然而,如果从总的代价上来考虑,我们开启join顺序重排参数,则会选择小表先join,然后是大表,以避免大表造成多次网络shuffle传输:
对于不同的表和不同的集群,就会有不同的查询方案,这就是CBO的价值。
4.11.4 过滤统计信息
刚才我们用的两条统计信息来进行join顺序重排是不够的,举个例子,如果最大的lineitem表有一个等值谓词限定条件,他会从最大表,立即变成最小表(谓词下推),那么join顺序就完全不一样了,如下图(编者注:图中的join条件应该是写错了,图来自书中原图)
我们可以利用lineitem partkey的空值比例,极值,唯一值个数,算出平均一个值有几行(假设均匀分布):
当然,当数据明显偏斜时,这个方案就会产生问题,如果数据源能提供histogram,那么CBO就可以更精准,避免skew的问题。
对于有分区表和分区统计信息的外部源,CBO也会直接利用分区统计。
4.11.6 join顺序枚举
- 分区枚举:在多个表进行join的场景,顺序至关重要,而n个表join的顺序有种。Presto默认会每9个表,优化一次顺序,否则代价太高了。
- 动态规划:join顺序枚举问题,存在最优子结构,Presto采用自顶向下(递归式)的动态规划来解决这个问题。
- 忽略Cross-join:几乎所有的optimizer都会跳过cross-join,包括Presto,所以cross-join的表不在这里参与排序。
-
两种search space:一般的优化器只考虑left deep模式的join方案,Presto发现Bushy tree的join方案在分布式场景下可能会更好,而且枚举代价也不大,所以Presto枚举这两种搜索空间。
了解了上述基本思路,我们来描述枚举流程:
-
把可以重新定序的nodes和它们之间的谓词限定关系,放到一个multi-join node上去;也就是在一个multi-join node中,所有node是可以自由乱序的,任两个表之间都可以进行join(不包括cross-join)。注意到不是所有的joins都能自由乱序,那他们就不在multi-join nodes里。再注意下,任意两个表之间的join谓词关系,实际上是通过谓词同等推断(equality inference)算法导出的,这个算法就不在这里详述了。
- 接下来,对于每个multi-join node,我们都采用带备忘录的分治策略,将node中的tables和谓词partition,同时保留每个子过程返回的结果,写进内存备忘录,之后会用到;注意到,这里的partition共有种方案(Cn1+Cn2+...+Cn (n/2)),然而由于我们只搜索两种树形,所以只会搜索杨辉三角一行中的末端(Cn1)和中间部分区域(Cn n/2),由于中间部分最大,所以方案数仍然是,但是由于过渡段的大部分剪枝,所以前面的常量因子会很小,仍然在接受的范围内;另一方面,由于动态规划的备忘录,会有大量的剪枝;
- 对于每一种partition方案,我们都要将两个partition合并join,合并join有两种物理执行方式:分布式和广播式(下一节讲到),再次做个选择,选好了之后,记录在备忘录里;
-
备忘录需要的部分填充好了之后,我们就可以选择最好的方案实施join,实施过程中,仍要不断查看备忘录。
4.11.7 广播式Join和分布式join策略
这里是说Hash Join的两种具体策略:
-
广播式:每个worker node都保存build side的一个完整副本和一个完整的哈希表,worker node直接从外部数据源取数据进行join,无需在网络中广播分区probe side。
- 分布式:和之前我们说的hash join流程一致,首先按照f(x)对两侧的表分区,分到不同的worker node,在各个worker node完全独立并行地执行join。
优劣比较:对于build side非常小或者probe side非常大的情况下,广播式一般会更好,因为避免了probe side在网络中全量传输一次;但是如果两侧表都很大,必须分区入驻内存的话,就要分布式join了。
可以看到,这种选择必须用CBO,还要把filter信息考虑进去。
Presto对外部数据源的ANALYZE,可以对统计信息进行收集,对于通过Presto ETL来写入的数据,可以在写入时直接ANALYZE,更方便。
第六章-第七章 连接器
6.2 RDBMS-PostgreSQL Connector
一般来说,任务下发给一个worker node,worker node与postgres server通过JDBC相连接,一个表就会有一个JDBC连接,对于涉及多个表的SQL,可能就会有多个worker noder和多条JDBC连接。这也是RDBMS connector提供的最多的并行性,无论底层的RDBMS是不是分布式的,并行读取数据是不存在的。
6.2.1 查询下推
Presto可以将一条SQL或SQL的部分下推到数据源,让数据源先做一部分查询的处理,减少传输给Presto的数据,减少开销。
举个例子,对于Presto连接Postgres,Presto收到这样一条SQL:
Presto的Worker node会让postgres执行下面这个SQL,告诉自己结果:
可以看到,下推了两个部分:投影列和筛选条件。
Trino新版本中,聚合、Join、LIMIT、TopN也可以下推。
具体某个Connector是否支持,要看具体说明了。
6.4 DFS-Hive Connector
- Hive Connector不仅用于Hive,而且可以用于各种分布式存储平台。非Hadoop分布式存储平台要想办法搞一个HMS的替代品,要不然就不能用Hive Connector。
- presto仅仅查看Hive MetaStore中的信息,而对Hive引擎却从不使用,当然也不会有任何查询下推,只是读取数据而已。
- Presto可以利用到Hive的分区特性和SQL中的分区列,这是Hive最重要的Feature。
- Presto为Hive中的ORC Parquet RCBinary RCText几种文件格式做了大量的优化,保证读取效率。
虽然Hive引擎不会参与到Presto的SQL计算中,但是Hive中特定的文件格式可以参与进来,因为很多流行的格式如ORC RCBinary Parquet等提供了很多统计信息可以利用。
我们以最常用的ORC为例,ORC提供向量化列式读取的接口,这正好presto的需求;ORC Reader还做了两个重要的优化,一个是谓词下推,一个是惰性读取。
- 谓词下推:ORC File中从文件到每10000行在三个级别上都会保存极值,谓词下推可以让ORC Reader跳过很多无关的数据段;
-
惰性读取:对于随机性强的谓词列,下推没有太大意义,为了保证效率,ORC Reader会现在谓词列进行读取然后计算谓词,计算完之后找到相关的Segments,去其他列对应位置补充信息。
这两条性质,让Presto for Hive Connector有了3~8倍的性能提升。
-
谓词过滤顺序:不同的谓词有不同的筛选性,先选择筛选性强的,后选择筛选性弱的,能获得更好的性能,ORC Reader还需要对谓词的筛选性进行评估。另外,是否对筛选通过的数据缓存,也是需要考量的。
- 复杂类型查询下推:通常对于一个复杂类型,我们只需要它的某些子字段,那么就可以把这种投影操作直接下推到TableScan算子的直接下游去;对于复杂类型的查询,也可以进行同样的谓词下推。
7.2 Nosql-Accumulo
Accumulo在架构上和Hbase几乎是一模一样。但是Hbase已经有Phoneix了,所以presto直接连Phoneix连接Hbase即可。所以这里挑了个替代品Accumulo来说明些问题。
- Presto将Nosql中的rowkey拆分以打平一条key-value record成多条关系型数据,以此对应关系型模型。
- 对于NoSQL,coordiator首先和数据库连接获取元数据,然后根据查询范围确定region,对于不同的region server,可以并行读取。
- Presto可以利用Nosql的二级索引进行查询下推;
7.6 联邦查询
Presto可以在一条SQL内同时对多个数据源的表进行查询,对用户来讲非常透明。
联邦查询也有查询下推,原则仍然是,数仓类型的不下推;Nosql只下推投影和过滤;RDBMS部分可以下推join和聚合。
7.7 ETL
Presto也支持ETL,不过不如只做ETL的一些商业化工具做得好。但是Presto也有自己的优势,第一就是通过联邦查询你有可能不需要ETL了,第二就是Presto的ETL全部都是SQL,在多个数据源上切换很方便。
第十二章 生产环境中的Presto
12.2 Presto SQL查询调优
这里主要讲了几个调优的抓手点:
- EXPLAIN Join的顺序
- Join预聚合
- GROUP BY key预聚合
- Join散列表大小
12.3 内存管理
Presto的内存管理都是基于worker node JVM来说的。
用户内存:客户端查询中的操作,如聚合、排序等占用的内存;
系统内存:查询引擎用的内存,比如input/output buffer, table scan buffer等。
Presto有个初始散列分区数的概念,如果设为8的话,假设query.max-memory(用户内存)=50GB,平均每个worker node用户内存占用为50GB/8=6.25GB,如果query.max-memory-per-node设为13GB的话,那么这个配置可允许超过平均值两倍的数据倾斜。
Presto的所有Worker node应该同等配置,否则无法妥善利用。
12.4 任务并发性
- task work threads:默认是CPU核数的2倍,也可以手动增加,不过要考虑上下文切换的代价是否能被cover;
- operator concurrecy:这里的并发实际上说的是并行度,算子(比如join,aggregation)在本地的一个并行度,并行度越高,上下文切换时代价也越大。
12.5 工作节点调度
- 每个工作节点处理split的数量上限、每个task的splits排队上限都可以适当提高以增加单节点负载;
- 如果有数据缓存(RubiX),或者存储和计算节点不是完全隔离的话,可以让Presto调度器的队列中保存更多空间给本地数据,减少网络传播,通常这个模式都会被打开。
12.6 网络数据交换
- 下游向上游拉数据的线程数,可能会增强吞吐量,但是注意内存使用;
- input/output buffer大小,默认32MB,确实太小了,可以放大一些,避免back pressure机制生效,降低查询效率。
性能评估(Benchmark)
对于CBO性能提升的评估:
实验和数据来自starburst blog:https://blog.starburstdata.com/technical-blog/presto-cost-based-optimizer-rocks-the-tpc-benchmarks
实验在8个HDFS数据节点上部署presto worker,1个coordiantor和NameNode、HMS共享节点。
每个节点24核、256GB内存,160GB堆内存;ORC格式,Zlib压缩。
80%以上的查询都在1TB数据集(TPC-DS)以上,包括10TB。