Hive使用者都有的一个共识,即Update真不是Hive的菜(笑)
至于原因呢,就是Hive默认底层使用hadoop存储,而HDFS不支持修改文件,这是无法绕过的一个天堑。
A Hive的行级Update
来源
参考
官方资料:
Update资料 <=> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
Join资料 <=> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
网上参考资料:
Update资料 <=> http://www.aboutyun.com/thread-12155-1-1.html
为Hive配置Update功能
1.编辑hive-site.xml文件:
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>false</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>
二.Update语法
1.创表语句
Hive对使用Update功能的表有特定的语法要求, 语法要求如下:
要执行Update的表中, 建表时必须带有buckets(分桶)属性
要执行Update的表中, 需要指定格式,其余格式目前赞不支持, 如:parquet格式, 目前只支持ORCFileformat和AcidOutputFormat
要执行Update的表中, 建表时必须指定参数('transactional' = true);
eg:
create table student (id bigint,name string) clustered by (name) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');
B 拉链表
一.什么是拉链表
拉链表是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。
我们先看一个示例,这就是一张拉链表,存储的是用户的最基本信息以及每条记录的生命周期。我们可以使用这张表拿到最新的当天的最新数据以及之前的历史数据。
我们暂且不对这张表做细致的讲解,后文会专门来阐述怎么来设计、实现和使用它。
1.1 拉链表的使用场景
在数据仓库的数据模型设计过程中,经常会遇到下面这种表的设计:
有一些表的数据量很大,比如一张用户表,大约10亿条记录,50个字段,这种表,即使使用ORC压缩,单张表的存储也会超过100G,在HDFS使用双备份或者三备份的话就更大一些。
表中的部分字段会被update更新操作,如用户联系方式,产品的描述信息,订单的状态等等。
需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态。
表中的记录变化的比例和频率不是很大,比如,总共有10亿的用户,每天新增和发生变化的有200万左右,变化的比例占的很小。
那么对于这种表该如何设计呢?下面有几种方案可选:
方案一:每天只留最新的一份,比如我们每天用Sqoop抽取最新的一份全量数据到Hive中。
方案二:每天保留一份全量的切片数据。
方案三:使用拉链表。
1.2 为什么使用拉链表
现在我们对前面提到的三种进行逐个的分析。
方案一
这种方案就不用多说了,实现起来很简单,每天drop掉前一天的数据,重新抽一份最新的。
优点很明显,节省空间,一些普通的使用也很方便,不用在选择表的时候加一个时间分区什么的。
缺点同样明显,没有历史数据,先翻翻旧账只能通过其它方式,比如从流水表里面抽。
方案二
每天一份全量的切片是一种比较稳妥的方案,而且历史数据也在。
缺点就是存储空间占用量太大太大了,如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费。
当然我们也可以做一些取舍,比如只保留近一个月的数据。但是,需求是无耻的,数据的生命周期不是我们能完全左右的。
方案三
拉链表在使用上基本兼顾了我们的需求。
首先它在空间上做了一个取舍,虽说不像方案一那样占用量那么小,但是它每日的增量可能只有方案二的千分之一甚至是万分之一。
其实它能满足方案二所能满足的需求,既能获取最新的数据,也能添加筛选条件也获取历史的数据。
所以我们还是很有必要来使用拉链表的。
二、拉链表的设计和实现
下面我们来举个例子详细看一下拉链表。
我们先看一下在Mysql关系型数据库里的user表中信息变化。
在2017-01-01这一天表中的数据是:
在2017-01-02这一天表中的数据是, 用户002和004资料进行了修改,005是新增用户:
在2017-01-03这一天表中的数据是, 用户004和005资料进行了修改,006是新增用户:
说明
- t_start_date表示该条记录的生命周期开始时间,t_end_date表示该条记录的生命周期结束时间。
t_end_date = ‘9999-12-31’表示该条记录目前处于有效状态。
如果查询当前所有有效的记录,则select * from user where t_end_date = ‘9999-12-31’。
如果查询2017-01-02的历史快照,则select * from user where t_start_date <=
‘2017-01-02’ and t_end_date >= ‘2017-01-02’。(此处要好好理解,是拉链表比较重要的一块。)
三、在Hive中实现拉链表
在现在的大数据场景下,大部分的公司都会选择以Hdfs和Hive为主的数据仓库架构。目前的Hdfs版本来讲,其文件系统中的文件是不能做改变的,也就是说Hive的表只能进行删除和添加操作,而不能进行update。基于这个前提,我们来实现拉链表。
还是以上面的用户表为例,我们要实现用户的拉链表。在实现它之前,我们需要先确定一下我们有哪些数据源可以用。
我们需要一张用户全量表。至少需要用它来初始化。
每日的用户更新表。
而且我们要确定拉链表的时间粒度,比如说拉链表每天只取一个状态,也就是说如果一天有3个状态变更,我们只取最后一个状态,这种天粒度的表其实已经能解决大部分的问题了。
另外,补充一下每日的用户更新表该怎么获取,有3种方式拿到或者间接拿到每日的用户增量,因为它比较重要,所以详细说明:
我们可以监听Mysql数据的变化,比如说用Canal,最后合并每日的变化,获取到最后的一个状态。
假设我们每天都会获得一份切片数据,我们可以通过取两天切片数据的不同来作为每日更新表,这种情况下我们可以对所有的字段先进行concat,再取md5,这样就ok了。
可以使用每日的变更流水表。
3.1 创建表结构
现在我们来看一下我们用户资料切片表的结构:
CREATE EXTERNAL TABLE ods.user (
user_num STRING COMMENT '用户编号',
mobile STRING COMMENT '手机号码',
reg_date STRING COMMENT '注册日期'
COMMENT '用户资料表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS ORC
LOCATION '/ods/user';
)
然后我们还需要一张用户每日更新表,前面已经分析过该如果得到这张表,现在我们假设它已经存在。
CREATE EXTERNAL TABLE ods.user_update (
user_num STRING COMMENT '用户编号',
mobile STRING COMMENT '手机号码',
reg_date STRING COMMENT '注册日期'
COMMENT '每日用户资料更新表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS ORC
LOCATION '/ods/user_update';
)
现在我们创建一张拉链表,这张就是我们最后希望的表。
CREATE EXTERNAL TABLE dws.user_his (
user_num STRING COMMENT '用户编号',
mobile STRING COMMENT '手机号码',
reg_date STRING COMMENT '用户编号',
t_start_date ,
t_end_date
COMMENT '用户资料拉链表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
STORED AS ORC
LOCATION '/dws/user_his';
)
3.2 实现sql语句
首先全量更新,我们先到2017-01-01为止的数据。
初始化,先把2017-01-01的数据初始化进去。这个比较简单,可以参看博客。
我们这里重点说下怎么样更新拉链表。
现在我们假设我们已经初始化了2017-01-01的日期,然后需要更新2017-01-02那一天的数据,我们有了下面的Sql。
然后把两个日期设置为变量就可以了。
INSERT OVERWRITE TABLE dws.user_his
SELECT * FROM
(
SELECT A.user_num,
A.mobile,
A.reg_date,
A.t_start_time,
CASE
WHEN A.t_end_time = '9999-12-31' AND B.user_num IS NOT NULL THEN '2017-01-01'
ELSE A.t_end_time
END AS t_end_time
FROM dws.user_his AS A
LEFT JOIN ods.user_update AS B
ON A.user_num = B.user_num
UNION
SELECT C.user_num,
C.mobile,
C.reg_date,
'2017-01-02' AS t_start_time,
'9999-12-31' AS t_end_time
FROM ods.user_update AS C
) AS T
我在这里解释一下上述代码的含义。结合图1,图2和图4来理解:
UNION下面代码的含义就是新增数据的插入,即图2的005.
UNION上面代码的含义有2点需要注意: 第一,是左连接,更新表B表里有用户资料表A用户编号的数据完全接入;是对A表进行更新的。 第二,CASE WHEN这里,如果用户资料表A里的用户在1月2号的更新表B里(即1月2号对该用户进行了更新),该用户为有效记录且B表的用户编号不为空,那么说明A表的该数据应该设为历史数据,设定其t_end_time为1月1日;否则,说明没有对该用户进行更新,保持原来的t_end_time不变。
四、总结
我们分析了拉链表的原理、设计思路、并且在Hive环境下实现了一份拉链表,下面对拉链表做一些小的补充。
4.1 拉链表和流水表
流水表存放的是一个用户的变更记录,比如在一张流水表中,一天的数据中,会存放一个用户的每条修改记录,但是在拉链表中只有一条记录。
这是拉链表设计时需要注意的一个粒度问题。我们当然也可以设置的粒度更小一些,一般按天就足够。
4.2 查询性能
拉链表当然也会遇到查询性能的问题,比如说我们存放了5年的拉链数据,那么这张表势必会比较大,当查询的时候性能就比较低了,个人认为两个思路来解决:
在一些查询引擎中,我们对start_date和end_date做索引,这样能提高不少性能。
保留部分历史数据,比如说我们一张表里面存放全量的拉链表数据,然后再对外暴露一张只提供近3个月数据的拉链表。
4.3 心得
使用拉链表的时候可以不加t_end_date,即失效日期,但是加上之后,能优化很多查询。
可以加上当前行状态标识,能快速定位到当前状态。
在拉链表的设计中可以加一些内容,因为我们每天保存一个状态,如果我们在这个状态里面加一个字段,比如如当天修改次数,那么拉链表的作用就会更大。
参考文献
C SQL改写
然而,今天一位前辈教会我,不能以貌识人,既然不让本地修改,那么把数据读出来,改完之后再写到新表里头,不也就另一种的实现Update了么(计划通:)。
INSERT OVERWRITE TABLE t_court_info
SELECT
REGIONID,
DISTRICTID,
B.court_id,
COURT_TYPE,
COURT_NAME,
GRID_NAME,
GRID_ID,
LONGITUDE,
LATITUDE,
AVG_PRICE,
B.BUILDING_COUNT,
B.room_count,
FAMILY_COUNT,
PROPERTY_COMPANY,
DEVELOPER,
CONTACT,
PROPERTY_FEE,
PLOT_RATIO,
GREEN_SPACE,
PARK,
STATUS,
COMMENTS,
GRID_TYPE,
COURT_ADDRESS,
EDITDATE,
AREA_ID,
AREA_NAME,
OBU_ID,
OBU_NAME,
B.block_id,
BLOCK_NAME,
YX_GRID_ID,
YX_GRID_NAME,
IF_CLEAN,
CONTACT_TEL,
YD_WLAN_COUNT,
LT_WLAN_COUNT,
GD_WLAN_COUNT,
QT_WLAN_COUNT,
YD_ACCESS,
LT_ACCESS,
GD_ACCESS,
QT_ACCESS,
STATE,
CREATE_PERSON,
REGION_NAME,
COMPTT_STTID,
COMPTT_STT,
BLOCK_TYPE_1NAME,
BLOCK_TYPE_2NAME,
BLOCK_TYPE_3NAME,
IF_MATURE_COURT,
STATUS_INFO,
DX_WLAN_COUNT,
DX_ACCESS
FROM
t_court_info a
JOIN
(SELECT
t1.block_id,
t1.court_id,
SUM(t1.user_cnt) room_count,
COUNT(DISTINCT t1.building_id) building_count
FROM
tb_sand_building_info t1
GROUP BY t1.block_id,
t1.court_id) b
ON a.block_id = b.block_id
AND a.court_eid = b.court_id
UNION
ALL
SELECT
REGIONID,
DISTRICTID,
COURT_EID,
COURT_TYPE,
COURT_NAME,
GRID_NAME,
GRID_ID,
LONGITUDE,
LATITUDE,
AVG_PRICE,
BUILDING_COUNT,
ROOM_COUNT,
FAMILY_COUNT,
PROPERTY_COMPANY,
DEVELOPER,
CONTACT,
PROPERTY_FEE,
PLOT_RATIO,
GREEN_SPACE,
PARK,
STATUS,
COMMENTS,
GRID_TYPE,
COURT_ADDRESS,
EDITDATE,
AREA_ID,
AREA_NAME,
OBU_ID,
OBU_NAME,
BLOCK_ID,
BLOCK_NAME,
YX_GRID_ID,
YX_GRID_NAME,
IF_CLEAN,
CONTACT_TEL,
YD_WLAN_COUNT,
LT_WLAN_COUNT,
GD_WLAN_COUNT,
QT_WLAN_COUNT,
YD_ACCESS,
LT_ACCESS,
GD_ACCESS,
QT_ACCESS,
STATE,
CREATE_PERSON,
REGION_NAME,
COMPTT_STTID,
COMPTT_STT,
BLOCK_TYPE_1NAME,
BLOCK_TYPE_2NAME,
BLOCK_TYPE_3NAME,
IF_MATURE_COURT,
STATUS_INFO,
DX_WLAN_COUNT,
DX_ACCESS
FROM
t_court_info a
WHERE NOT EXISTS
(SELECT
1
FROM
(SELECT
t1.block_id,
t1.court_id
FROM
tb_sand_building_info t1
GROUP BY t1.block_id,
t1.court_id) b
WHERE a.block_id = b.block_id
AND a.court_eid = b.court_id)