基础
事务和ACID
何为事务?
事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。
事务(transaction)所应该具有的四个要素:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)。这四个基本要素通常称为ACID特性。
- 原子性(Atomicity)
一个事务是一个不可再分割的工作单位,事务中的所有操作要么都发生,要么都不发生。 - 一致性(Consistency)
事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。这是说数据库事务不能破坏关系数据的完整性以及业务逻辑上的一致性。 - 隔离性(Isolation)
多个事务并发访问,事务之间是隔离的,一个事务不影响其它事务运行效果。这指的是在并发环境中,当不同的事务同时操作相同的数据时,每个事务都有各自完整的数据空间。事务查看数据更新时,数据所处的状态要么是另一事务修改它之前的状态,要么是另一事务修改后的状态,事务不会查看到中间状态的数据。
事务之间的相应影响,分别为:脏读、不可重复读、幻读、丢失更新。 - 持久性(Durability)
意味着在事务完成以后,该事务锁对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。
ACID的实现原理
事务可以保证ACID原则的操作,那么事务是如何保证这些原则的?解决ACID问题的两大技术点是:
- 预写日志(Write-ahead logging)保证原子性和持久性
- 锁(locking)保证隔离性
这里并没有提到一致性,是因为一致性是应用相关的话题,它的定义一个由业务系统来定义,什么样的状态才是一致?而实现一致性的代码通常在业务逻辑的代码中得以体现。
注:锁是指在并发环境中通过读写锁来保证操作的互斥性。根据隔离程度不同,锁的运用也不同。
Hive的事务
Hive事务应用场景
Hive 0.13以前,原子性,一致性,和持久性都只支持到分区的级别。隔离性可以通过zookeeper或内存的锁机制来实现。在0.13以后,Hive支持行级别的ACID语义,这样一个应用在同一分区下可以添加数据行不会干扰其他应用的数据读取。
Hive的ACID语义可以完成以下使用场景
- 流数据的接入。许多用户都使用 Apache Flume, Apache Storm, or Apache Kafka 将流式数据导入Hadoop集群。 这些工具都是每秒百万行级的数据写入,而Hive只能每十五分钟到一个小时添加一次分区。快速的增加分区会对表中的分区数量形成压力。当然可以事先创建好分区再将数据导入,但这样会引起脏读,而且目录下生成的小文件会对namenode造成很大的压力。而新特性可以很好的解决上述问题
- 减少维度的变化。 减少多维表的变化
- 数据的更新。INSERT, UPDATE
- 通过SQL MERGE 批量的更新。
限制条件
- BEGIN, COMMIT, ROLLBACK 暂时不支持,所有操作自动提交
- 目前只支持ORC 的文件格式。参考:ORC File Format存储格式 和 ORC file format
- 默认事务是关闭的,需要设置开启
- 要是使用这些特性,表必须是分桶的
- 必须这是事务管理器 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ,否则事务表无法工作
- 目前支持快照级别的隔离。就是当一次数据查询时,会提供一个数据一致性的快照
- 已有的zookeeper和内存的锁管理和Hive的事务不冲突
- LOAD DATA. 语句目前在事务表中暂时不支持
流式APIs
Hive提供数据数据接入和修改的api
- Hive HCatalog Streaming API
- HCatalog Streaming Mutation API (available in Hive 2.0.0 and later)
语法上的修改
- 加入了 INSERT...VALUES, UPDATE, and DELETE 参考: LanguageManual DML
- SHOW TRANSACTIONS ,显示事务信息。Show Transactions
- SHOW COMPACTIONS,显示数据合并信息。 Show Compactions
- SHOW LOCKS,显示锁信息。Show Locks
- ALTER TABLE,可以通过该命令来请求表和分区的的数据合并。一般情况下系统会自动发现并实施合并数据,当合并数据关闭时,可以通过该命令开启。Alter Table/Partition Compact
- ABORT TRANSACTIONS,丢弃事务。Abort Transactions
基础设计
HDFS是不支持文件的修改,并且当有数据追加到文件,HDFS不对读数据的用户提供一致性的。为了在HDFS上支持以上的特性,我们借鉴了其他数据仓库工具的方法。表和分区的数据都被存在base files。 新的记录和更新,删除都存在delta files。一次事务操作创建一系列的delta files。在读取的时候,将基础文件和修改,删除合并,最后返回给查询。
Base and Delta Directories
如表t的base files and delta directory
hive> dfs -ls -R /user/hive/warehouse/t;
drwxr-xr-x - ekoifman staff 0 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022
drwxr-xr-x - ekoifman staff 0 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000
-rw-r--r-- 1 ekoifman staff 610 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000/bucket_00000Compactor
Compactor是运行Metastore 里以支持ACID的一系列的后台操作。它包括,Initiator, Worker, Cleaner, AcidHouseKeeperService等等。
可以通过SHOW COMPACTIONS的命令查看当前和最近合并的操作Delta File 合并
随着对表的数据的修改,会产生越来越多的delta files. 为了保持性能,需要对这些文件进行合并。分为Minor compaction和Major compaction
Minor compaction :将已有的delta files重写到一个单独的delta file,每个分桶一个。
Major compaction: 将delta文件和base 重写到一个新的base file,每个分桶一个。 这个合并操作的代价更大。
所有的合并操作都是后台进行,不会影响并行的数据读取和写入。合并完成之后,系统会等到所以的读操作完成再删除旧的文件。Initiator(初始化器)
这个模块负责发现表和分区中需要数据合并。这个可以在Metastore 中通过设置hive.compactor.initiator.on=true 进行开启。 在事务表的配置有一些属性可以控制合并任务的数目,一个合并任务负责一个partition。同时也可以配置任务失败时重试的次数。Worker
一个Worker处理一个合并任务,一个合并任务是一个MapReduce job。每个worker提交一个job给集群,并排队处理。hive.compactor.worker.threads 确定每个Metastore中Worker的个数。Cleaner
该进程删除合并之后不再需要的delta filesAcidHouseKeeperService
该进程检查在 hive.txn.timeout 内没有心跳的事务并丢弃。对于一个初始化过事务的client,如果心跳停止了,它所锁住的资源会被释放。事务/锁管理
与之前的库/表/分区的锁管理器(默认是:hive.lock.manager=org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager)不同,hive引入了事务管理器。 事务管理器是为了管理事务中的锁。在旧版本中默认的DummyTxnManager,不支持事务,通过使用hive.lock.manager属性的配置,为库/表/分区创建锁管理器。而新加入的DbTxnManager 通过DbLockManager 在hive的metastore 中管理所有的锁和事务。(锁和事务的信息是持久化存储的,即便server挂掉也不会有影响)这也意味着,一旦开启式了事务,之前在zookeeper中的锁行为也不在支持。为了防止客户端因为挂掉而导致事务和锁无法释放,在锁的持有者和事务的初始化器到server的metastore 间会维持一个周期性的心跳。一旦心跳超时,锁和事务就会被释放。
从hive 1.30起,DbLockManger 持续获取锁的周期可以通过 hive.lock.numretires 和 hive.lock.sleep.between.retries. 两个属性设置。如果DbLockManger 获取锁失败,过一段时间之后会进行重试。为了支持短查询同时不对metastore造成负担,DbLockManger 在每次重试后加倍等待时长。
注意:DbTxnManager 可以获取所有的表的锁,即便那些没有设置transactional=true属性的表。默认对一个非事务表的插入操会获取一个排他锁从而阻止其他的插入和读取。技术上正确的,但是这背离了hive之前的工作方式。为了向后兼容,可以设置hive.txn.strict.locking.mode 属性来使锁管理器在对非事务表的插入操作时,获取共享锁。保留之前的语义,还有一个好处就是能防止表在读取是被删除。而对于事务表,插入总是获取的共享锁。是因为这些表实现了MVCC的架构,在存储的底层实现了很强的读一致性(快照隔离的方式),甚至能应对并发的修改操作。
(共享锁【S锁】
又称读锁,若事务T对数据对象A加上S锁,则事务T可以读A但不能修改A,其他事务只能再对A加S锁,而不能加X锁,直到T释放A上的S锁。这保证了其他事务可以读A,但在T释放A上的S锁之前不能对A做任何修改。
排他锁【X锁】
又称写锁。若事务T对数据对象A加上X锁,事务T可以读A也可以修改A,其他事务不能再对A加任何锁,直到T释放A上的锁。这保证了其他事务在T释放A上的锁之前不能再读取和修改A。)
具体的锁操作细节,参考:Hive事务管理避坑指南
配置
为了开启hive的事务支持,以下是需要开启的最少的hive配置:
- 客户端
hive.support.concurrency – true
hive.enforce.bucketing – true (Hive 2.0 默认)
hive.exec.dynamic.partition.mode – nonstrict
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager - 服务端 (Metastore)
hive.compactor.initiator.on – true
hive.compactor.worker.threads – a positive number
表属性
如果一个表需要使用ACID 的数据操作,表属性一定要设置 "transactional=true" ,一旦被设置为事务表是不可以被撤销的。
如果不需要系统对表进行数据合并,可以设置表属性 "NO_AUTO_COMPACTION"。 通过可以通过Alter Table/Partition Compact 命令手动处理合并
CREATE TABLE table_name ( id int,name string)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
"compactor.mapreduce.map.memory.mb"="2048", -- MRjob的属性
"compactorthreshold.hive.compactor.delta.num.threshold"="4", -- 超过4个deta目录触发小合并
"compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- 如果deta文件的大小超过base文件的50%触发大合并
);
ALTER TABLE table_name COMPACT 'minor'
WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="3072");
ALTER TABLE table_name COMPACT 'major'
WITH OVERWRITE TBLPROPERTIES ("tblprops.orc.compress.size"="8192");
参考文章:
Hive事务管理避坑指南
Hive Transactions
Streaming Data Ingest
HCatalog Streaming Mutation API
HCatalog UsingHCat