本文是对Hadoop构建数据仓库实践中订单销售系统案例的学习与总结
- 组件架构:Hadoop2.7.2、Hive2.3.4、Sqoop1.4.6、Mysql5.7、Centos6.8,(组件均为Apache原生版,VMware虚拟机搭建集群)
- 本文按照Kimball数仓思想,按照维度建模思想,对销售订单系统进行数据仓库建模总结。采用星型结构模型。
- 案例来源为 《Haoop构建数据仓库实践》,进行自由发挥和思考。
系列文章
数仓--DW--Hadoop数仓实践Case-01-模型设计及数据准备
数仓--DW--Hadoop数仓实践Case-02-数据抽取
数仓--DW--Hadoop数仓实践Case-03-数据初始化装载
数仓--DW--Hadoop数仓实践Case-04-数据定期装载
数仓--DW--Hadoop数仓实践Case-05-维度表增加列
数仓--DW--Hadoop数仓实践Case-06-维度子集
数仓模型设计
- 数据仓库分层
- ods:原始日志
- source:维度表
- dw:事实表
- test:临时数据及测试数据存放
- 按照Kimball数仓架构思想,进行数据仓库建模"四步走":
- 选择业务过程;
- 2.声明粒度(业务粒度/维度粒度);
- 3.确认维度;
- 4.确认事实。
构建数仓总线架构
- 1.选择业务过程;
- 销售订单
- 2.声明粒度;
- ETL处理时间周期为每天一次,事实表中存储最细粒度的订单事务记录
- 3.确认维度
- 维度为日期维度、产品维度、顾客维度、订单维度(特意设计,后期可做退化维度处理)
- 4.确认事实
- 销售订单是作为事实表
实现步骤:ER模型-->>总线矩阵-->>高层气泡模型-->>维度建模
订单系统ER关系模型如下:
总结出数仓的总线矩阵,如下图所示:
维度模型设计
-
根据总线架构和ER关系模型,转换到维度模型
气泡图如下所示(维度中字段参考下文):
日期维度
-
日期维度,通过Java、Python、Mysql、Shell脚本直接生成即可;
表关系如下:
订单维度
表关系如下:
产品维度
产品维度表关系如下:
顾客维度
表关系如下:
维度建模总结
- 在根据总线架构矩阵进行维度建模的时候,需要根据业务进行指导,在设计维度表的时候,需要对字段、名称、SCD类型、源表等进行一一的定义。这样,在进行数据建表的时候会节省很多精力。
- 采用理论指导实践的原则和指导思想,对Kimball数仓思想进行理解和实践总结。
测试数据初始化
Mysql数据准备
- consumer 表
-- 建表语句
CREATE TABLE `sale_order` (
`order_number` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单号,主键',
`consumer_number` int(11) DEFAULT NULL COMMENT '客户编号',
`product_code` int(11) DEFAULT NULL COMMENT '产品编码',
`order_date` datetime DEFAULT NULL COMMENT '订单时间',
`entry_date` datetime DEFAULT NULL COMMENT '登记时间',
`order_mount` decimal(10,2) DEFAULT NULL COMMENT '销售金额',
PRIMARY KEY (`order_number`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=latin1;
数据内容:
- 其中,6-consumer_street_address去掉update为原始数据,7-consumer_name 字段去掉update为原始数据,添加update为下文测试所用。
- product表
-- 建表语句
CREATE TABLE `product` (
`product_code` int(11) NOT NULL AUTO_INCREMENT COMMENT '产品编码,主键',
`product_name` varchar(50) DEFAULT NULL COMMENT '产品名称',
`product_category` varchar(50) DEFAULT NULL COMMENT '产品分类',
PRIMARY KEY (`product_code`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=latin1;
数据准备:
- 其中3-product_name的原始值为mi,redmi为修改后的值
- sale_order表
-- 建表语句
CREATE TABLE `sale_order` (
`order_number` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单号,主键',
`consumer_number` int(11) DEFAULT NULL COMMENT '客户编号',
`product_code` int(11) DEFAULT NULL COMMENT '产品编码',
`order_date` datetime DEFAULT NULL COMMENT '订单时间',
`entry_date` datetime DEFAULT NULL COMMENT '登记时间',
`order_mount` decimal(10,2) DEFAULT NULL COMMENT '销售金额',
PRIMARY KEY (`order_number`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=latin1;
数据准备
Hive数据准备
- 在hive建表中,因为需要对维度表进行SCD处理,所以某些表需要创建为ORCFILE格式,需要对hive的配置文件进行修改。
hive-site.xml 基本配置如下,必须配置条件为hive的事务操作和笛卡尔积操作。
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://liyahui-02:3306/hive_2?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>liyahui</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive2/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<!-- 添加如下6个属性以支持事务 -->
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<!--支持hive的笛卡尔积操作 -->
<property>
<name>hive.mapred.mode</name>
<value>nonstrict</value>
<description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to
run</description>
</property>
<property>
<name>hive.strict.checks.cartesian.product</name>
<value>false</value>
</property>
</configuration>
- hive建库建表语句
- 文件命名为1-hive_dw_prepare.hql(包含所有的库和表),采用hive -f 的方式执行。
-- ****************************************************
-- @Author: LiYahui
-- @Date: Created in 2019/04/13 10:32
-- @Description: TODO 初始化hive数据库和脚本
-- @Version: V1.0
-- ****************************************************
-- 创建ods数据库
create database if not exists ods;
-- 创建客户表
create external table if not exists ods.ods_consumer(
customer_number int comment 'number',
customer_name varchar(30) comment 'name',
customer_street_address varchar(30) comment 'address',
customer_zip_code int comment 'zipcode',
customer_city varchar(30) comment 'city',
customer_state varchar(30) comment 'state'
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
NULL defined as ''
STORED AS textfile
location "/user/root/dw/ods/ods_consumer"
;
-- 创建产品表
create external table if not exists ods.ods_product(
product_code int comment 'code',
product_name varchar (30) comment 'name',
product_category varchar(30) comment 'category'
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
NULL defined as ''
STORED AS textfile
location "/user/root/dw/ods/ods_product"
;
-- 创建销售订单表
create external table if not exists ods.ods_sale_order(
order_number int comment 'order number',
customer_number int comment 'customer number',
product_code int comment 'product code',
order_date timestamp comment 'order date',
entry_date timestamp comment 'entry date',
order_amount decimal(10 , 2 ) comment 'order amount'
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
NULL defined as ''
STORED AS textfile
location "/user/root/dw/ods/ods_sale_order"
;
-- 创建source层数据库
create database if not exists source ;
-- 创建日期维度表
create external table if not exists source.source_date_dim(
day_key string comment '日期代理键',
day_id date comment '标准日期格式',
day_of_month_id tinyint comment '月份',
day_of_month_name string comment '月份名称',
day_of_qrt_id string comment '季度',
day_of_year_id smallint comment '年id'
) comment "日期维度表"
row format delimited fields terminated by ','
NULL defined as ''
stored as textfile
location "/user/root/dw/source/source_date_dim"
;
-- 创建客户维度表
create table if not exists source.source_consumer_dim(
consumer_key int comment "代理键",
consumer_number varchar(50) comment "顾客编号",
consumer_name varchar(50) comment "顾客名称",
consumer_street_address varchar(50) comment "顾客地址",
consumer_zip_code varchar(50) comment "邮政编码",
consumer_city varchar(50) comment "城市",
consumer_province varchar(50) comment "省份",
consumer_valid_from date comment "有效期开始日期",
consumer_valid_to date comment "有效期结束日期",
consumer_indicator varchar(50) comment "状态指示器",
consumer_version int comment "顾客变化版本号"
)comment "客户维度表"
clustered by (consumer_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 创建产品维度
create table if not exists source.source_product_dim(
product_key string comment "代理键",
product_code string comment "商品编码",
product_name string comment "商品名称",
product_category string comment "商品分类",
product_valid_from date comment "有效期开始日期",
product_valid_to date comment "有效期结束日期",
product_indicator string comment "状态指示器",
product_version int comment "产品变化版本号"
)comment "产品维度"
clustered by (product_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 创建订单维度表
create table if not exists source.source_order_dim(
order_key int comment "代理键",
order_code int comment "商品编码",
order_valid_from date comment "有效期开始日期",
order_valid_to date comment "有效期结束日期",
order_indicator string comment "状态指示器",
order_version int comment "订单变化版本号"
)comment "订单维度"
clustered by (order_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
--创建dw层数据库
create database if not exists dw;
-- 创建订单事实表
create table if not exists dw.sale_order_fact(
order_sk int comment 'order surrogate key',
customer_sk int comment 'customer surrogate key',
product_sk int comment 'product surrogate key',
order_date_sk string comment 'date surrogate key',
order_amount decimal (10 , 2 ) comment'order amount'
)comment "销售订单事实表"
clustered by (order_sk) into 8 buckets
stored as
orc tblproperties ('transactional'='true')
;
总结步骤
数据抽取、初始化装载、定期装载、定时调度等详见下文。