1)对用户行为数据解析
2)对核心数据进行判空过滤。
3)对业务数据采用维度模型重新建模,即维度退化。
1.用户行为启动表数据解析
1.1创建启动表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_start_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引。
1.2get_json_object 函数使用
1)输入数据 xjson
Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男 ","age":"47"}]
2)取出第一个 json 对象
SELECT get_json_object(xjson,"$.[0]") FROM person;
结果是:{"name":"大郎","sex":"男","age":"25"}
3)取出第一个 json 的 age 字段的值
SELECT get_json_object(xjson,"$.[0].age") FROM person;
结果是:25
1.3 向启动表导入数据
hive (gmall) > INSERT overwrite TABLE dwd_start_log PARTITION (dt = '2020-03-10') SELECT
get_json_object (line, '$.mid') mid_id,
get_json_object (line, '$.uid') user_id,
get_json_object (line, '$.vc') version_code,
get_json_object (line, '$.vn') version_name,
get_json_object (line, '$.l') lang,
get_json_object (line, '$.sr') source,
get_json_object (line, '$.os') os,
get_json_object (line, '$.ar') area,
get_json_object (line, '$.md') model,
get_json_object (line, '$.ba') brand,
get_json_object (line, '$.sv') sdk_version,
get_json_object (line, '$.g') gmail,
get_json_object (line, '$.hw') height_width,
get_json_object (line, '$.t') app_time,
get_json_object (line, '$.nw') network,
get_json_object (line, '$.ln') lng,
get_json_object (line, '$.la') lat,
get_json_object (line, '$.entry') entry,
get_json_object (line, '$.open_ad_type') open_ad_type,
get_json_object (line, '$.action') action,
get_json_object (line, '$.loading_time') loading_time,
get_json_object (line, '$.detail') detail,
get_json_object (line, '$.extend1') extend1
from ods_start_log
WHERE
dt = '2020-03-10';
3)测试
hive (gmall)> select * from dwd_start_log where dt='2020-03-10' limit 2;
1.4 DWD 层启动表加载数据脚本
1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim ods_to_dwd_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;
then
do
_date=$1
else
do
_date=`date -d "-1 day" +%F`
fisql=" set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date';
"$hive -e "$sql"
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 ods_to_dwd_log.sh
3)脚本使用
[atguigu@hadoop102 module]$ ods_to_dwd_log.sh 2020-03-11
4)查询导入结果
hive (gmall)> select * from dwd_start_log where dt='2020-03-11' limit 2;
5)脚本执行时间
企业开发中一般在每日凌晨 30 分~1 点
2.用户行为事件表数据解析
使用UDTF将原来的一大条数据抽取出多条多个事件数据
2.1 创建基础明细表
明细表用于存储 ODS 层原始表转换过来的明细数据。
1)创建事件日志基础明细表
hive (gmall) > DROP TABLE
IF EXISTS dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string
) PARTITIONED BY (`dt` string)
stored AS parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志 1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF。
2.2 自定义 UDF 函数(解析公共字段)
UDF 函数特点:一行进一行出。简称,一进一出。
1)创建一个 maven 工程:hivefunction
2)创建包名:com.atguigu.udf
3)在 pom.xml 文件中添加如下内容
<properties>
<hive.version>2.3.0</hive.version>
</properties>
<dependencies>
<!--添加 hive 依赖-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中
-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true -Dmaven.wagon.http.ssl.ignore.validity.dates=true
详见:https://blog.csdn.net/qq_22041375/article/details/103491941
注意 2:如果提示 pentaho-aggdesigner-algorithm.jar 包下载失败,需要在 maven 的 pom 中增 加如下仓库
<repositories>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
注意 3:如果出现如下图片中情况,说明 idea 内存溢出
Exception in thread "main" java.lang.StackOverflowError at sun.nio.cs.UTF_8$Encoder.encodeLoop(UTF_8.java:691) at java.nio.charset.CharsetEncoder.encode(CharsetEncoder.java:579)
修改办法:把-Xmx512 -Xms128m -Xss2m 添加到下图位置。
4)UDF 用于解析公共字段
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieldUDF extends UDF {
public String evaluate(String line, String key) throws JSONException {
String[] log = line.split("|");
if (log.length != 2 || StringUtils.isBlank(log[1])) {
return "";
}
JSONObject baseJson = new JSONObject(log[1].trim());
String result = "";
// 获取服务器时间
if ("st".equals(key)) {
result = log[0].trim();
} else if ("et".equals(key)) {
// 获取事件数组
if (baseJson.has("et")) {
result = baseJson.getString("et");
}
} else {
JSONObject cm = baseJson.getJSONObject("cm");
// 获取 key 对应公共字段的 value
if (cm.has(key)) {
result = cm.getString(key);
}
}
return result;
}
public static void main(String[] args) throws JSONException {
String line = "1583776223469|{\"cm\":{\"ln\":\"-48.5\",\"sv\":\"V2.5.7\",\"os\":\"8.0.9\",\"g\": \"6F76AVD5@gmail.com\",\"mid\":\"0\",\"nw\":\"4G\",\"l\":\"pt\",\"vc\":\"3\",\"hw\": \"750*1134\",\"ar\":\"MX\",\"uid\":\"0\",\"t\":\"1583707297317\",\"la\":\"-52.9\", \"md\":\"sumsung-18\",\"vn\":\"1.2.4\",\"ba\":\"Sumsung\",\"sr\":\"V\"},\"ap\":\"app \",\"et\":[{\"ett\":\"1583705574227\",\"en\":\"display\",\"kv\":{\"goodsid\":\"0\", \"action\":\"1\",\"extend1\":\"1\",\"place\":\"0\",\"category\":\"63\"}},{\"ett\": \"1583760986259\",\"en\":\"loading\",\"kv\":{\"extend2\":\"\",\"loading_time\":\"4\", \"action\":\"3\",\"extend1\":\"\",\"type\":\"3\",\"type1\":\"\",\"loading_way\":\"1 \"}},{\"ett\":\"1583746639124\",\"en\":\"ad\",\"kv\":{\"activityId\":\"1\",\"displa yMills\":\"111839\",\"entry\":\"1\",\"action\":\"5\",\"contentType\":\"0\"}},{\"ett \":\"1583758016208\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1583694079866\", \"action\":\"1\",\"type\":\"3\",\"content\":\"\"}},{\"ett\":\"1583699890760\",\"en \":\"favorites\",\"kv\":{\"course_id\":4,\"id\":0,\"add_time\":\"1583730648134\",\"u serid\":7}}]}";
String mid = new BaseFieldUDF().evaluate(line, "mid");
System.out.println(mid);
}
}
注意:使用 main 函数主要用于模拟数据测试。
2.3 自定义 UDTF 函数(解析事件字段)
UDTF 函数特点:多行进多行出。 简称,多进多出。
1)创建包名:com.atguigu.udtf
2)在 com.atguigu.udtf 包下创建类名:EventJsonUDTF
3)用于展开业务字段
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF {
//该方法中,我们将指定输出参数的名称和参数类型:
public StructObjectInspector initialize(StructObjectInspector argOIs) {
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
//输入 1 条记录,输出若干条结果
@Override public void process(Object[] objects) throws HiveException {
// 获取传入的 et
String input = objects[0].toString();
// 如果传进来的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input)) {
return;
} else {
try {
// 获取一共有几个事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null) return;
// 循环遍历每一个事件
for (int i = 0; i < ja.length(); i++) {
String[] result = new String[2];
try {
// 取出每个的事件名称(ad/facoriters)
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一个事件整体
result[1] = ja.getString(i);
} catch (JSONException e) {
continue;
}
// 将结果返回
forward(result);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
}
// 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override public void close() throws HiveException { } }
2)打包
3)将 hivefunction-1.0-SNAPSHOT.jar 上传到 hadoop102 的/opt/module,然后再将该 jar 包上 传到 HDFS 的/user/hive/jars 路径下
[atguigu@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[atguigu@hadoop102 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars
4)创建永久函数与开发好的 java class 关联
hive (gmall)> create function base_analizer as 'com.atguigu.udf.BaseFieldUDF'
using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';
create function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF'
using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';
5)注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧 jar 包,然后重启 Hive 客户端即可。
2.4 解析事件日志基础明细表
1)解析事件日志基础明细表
hive (gmall)> insert overwrite table dwd_base_event_log partition(dt='2020-03-10')
SELECT
base_analizer (line, 'mid') AS mid_id,
base_analizer (line, 'uid') AS user_id,
base_analizer (line, 'vc') AS version_code,
base_analizer (line, 'vn') AS version_name,
base_analizer (line, 'l') AS lang,
base_analizer (line, 'sr') AS source,
base_analizer (line, 'os') AS os,
base_analizer (line, 'ar') AS area,
base_analizer (line, 'md') AS model,
base_analizer (line, 'ba') AS brand,
base_analizer (line, 'sv') AS sdk_version,
base_analizer (line, 'g') AS gmail,
base_analizer (line, 'hw') AS height_width,
base_analizer (line, 't') AS app_time,
base_analizer (line, 'nw') AS network,
base_analizer (line, 'ln') AS lng,
base_analizer (line, 'la') AS lat,
event_name,
event_json,
base_analizer (line, 'st') AS server_time
FROM
ods_event_log lateral VIEW flat_analizer (base_analizer(line, 'et')) tmp_flat AS event_name,
event_json
WHERE
dt = '2020-03-10'
AND base_analizer (line, 'et') <> '';
2)测试
hive (gmall)> select * from dwd_base_event_log where dt='2020-03-10' limit 2;
2.5 DWD 层数据解析脚本
1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim ods_to_dwd_base_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;
then
do
_date=$1
else
do
_date=`date -d "-1 day" +%F`
fisql=" use gmall;
insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date')
select
base_analizer(line,'mid') as mid_id,
base_analizer(line,'uid') as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name, event_json,
base_analizer(line,'st') as server_time from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as event_name,event_json
where dt='$do_date'
and base_analizer(line,'et')<>''; "$hive -e "$sql"
注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 ods_to_dwd_base_log.sh
3)脚本使用
[atguigu@hadoop102 module]$ ods_to_dwd_base_log.sh 2020-03-11
4)查询导入结果
hive (gmall)> select * from dwd_base_event_log where dt='2020-03-11' limit 2;
5)脚本执行时间 企业开发中一般在每日凌晨 30 分~1 点
3.用户行为事件表获取
3.1 商品点击表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_display_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall)>
INSERT overwrite TABLE dwd_display_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.action') action,
get_json_object (event_json, '$.kv.goodsid') goodsid,
get_json_object (event_json, '$.kv.place') place,
get_json_object (event_json, '$.kv.extend1') extend1,
get_json_object (event_json, '$.kv.category') category,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'display';
3)测试
hive (gmall)> select * from dwd_display_log where dt='2020-03-10' limit 2;
3.2 商品详情页表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_newsdetail_log;
CREATE EXTERNAL TABLE dwd_newsdetail_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`goodsid` string,
`showtype` string,
`news_staytime` string,
`loading_time` string,
`type1` string,
`category` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_newsdetail_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_newsdetail_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.entry') entry,
get_json_object (event_json, '$.kv.action') action,
get_json_object (event_json, '$.kv.goodsid') goodsid,
get_json_object (event_json, '$.kv.showtype') showtype,
get_json_object (
event_json,
'$.kv.news_staytime'
) news_staytime,
get_json_object (
event_json,
'$.kv.loading_time'
) loading_time,
get_json_object (event_json, '$.kv.type1') type1,
get_json_object (event_json, '$.kv.category') category,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'newsdetail';
3)测试
hive (gmall)> select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;
3.3 商品列表页表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_loading_log;
CREATE EXTERNAL TABLE dwd_loading_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_loading_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_loading_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.action') action,
get_json_object (
event_json,
'$.kv.loading_time'
) loading_time,
get_json_object (
event_json,
'$.kv.loading_way'
) loading_way,
get_json_object (event_json, '$.kv.extend1') extend1,
get_json_object (event_json, '$.kv.extend2') extend2,
get_json_object (event_json, '$.kv.type') type,
get_json_object (event_json, '$.kv.type1') type1,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'loading';
3)测试 hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;
3.4 广告表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_ad_log;
CREATE EXTERNAL TABLE dwd_ad_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_ad_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_ad_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.entry') entry,
get_json_object (event_json, '$.kv.action') action,
get_json_object (
event_json,
'$.kv.contentType'
) contentType,
get_json_object (
event_json,
'$.kv.displayMills'
) displayMills,
get_json_object (event_json, '$.kv.itemId') itemId,
get_json_object (
event_json,
'$.kv.activityId'
) activityId,
server_timefrom dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'ad';
3)测试
hive (gmall)> select * from dwd_ad_log where dt='2020-03-10' limit 2;
3.5 消息通知表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_notification_log;
CREATE EXTERNAL TABLE dwd_notification_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_notification_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_notification_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.action') action,
get_json_object (
event_json,
'$.kv.noti_type'
) noti_type,
get_json_object (event_json, '$.kv.ap_time') ap_time,
get_json_object (event_json, '$.kv.content') content,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'notification';
3)测试
hive (gmall)> select * from dwd_notification_log where dt='2020-03-10' limit 2;
3.6 用户后台活跃表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_active_background_log;
CREATE EXTERNAL TABLE dwd_active_background_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_background_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_active_background_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (
event_json,
'$.kv.active_source'
) active_source,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'active_background';
3)测试
hive (gmall)> select * from dwd_active_background_log where dt='2020-03-10' limit 2;
3.7 评论表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_comment_log;
CREATE EXTERNAL TABLE dwd_comment_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` INT,
`userid` INT,
`p_comment_id` INT,
`content` string,
`addtime` string,
`other_id` INT,
`praise_count` INT,
`reply_count` INT,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_comment_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_comment_log PARTITION (dt = '2020-03-10') selectmid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (
event_json,
'$.kv.comment_id'
) comment_id,
get_json_object (event_json, '$.kv.userid') userid,
get_json_object (
event_json,
'$.kv.p_comment_id'
) p_comment_id,
get_json_object (event_json, '$.kv.content') content,
get_json_object (event_json, '$.kv.addtime') addtime,
get_json_object (event_json, '$.kv.other_id') other_id,
get_json_object (
event_json,
'$.kv.praise_count'
) praise_count,
get_json_object (
event_json,
'$.kv.reply_count'
) reply_count,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'comment';
3)测试
hive (gmall)> select * from dwd_comment_log where dt='2020-03-10' limit 2;
3.8 收藏表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_favorites_log;
CREATE EXTERNAL TABLE dwd_favorites_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` INT,
`course_id` INT,
`userid` INT,
`add_time` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_favorites_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_favorites_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.id') id,
get_json_object (
event_json,
'$.kv.course_id'
) course_id,
get_json_object (event_json, '$.kv.userid') userid,
get_json_object (event_json, '$.kv.add_time') add_time,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'favorites';
3)测试
hive (gmall)> select * from dwd_favorites_log where dt='2020-03-10' limit 2;
3.9 点赞表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_praise_log;
CREATE EXTERNAL TABLE dwd_praise_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_praise_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_praise_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (event_json, '$.kv.id') id,
get_json_object (event_json, '$.kv.userid') userid,
get_json_object (
event_json,
'$.kv.target_id'
) target_id,
get_json_object (event_json, '$.kv.type') type,
get_json_object (event_json, '$.kv.add_time') add_time,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'praise';
3)测试
hive (gmall)> select * from dwd_praise_log where dt='2020-03-10' limit 2;
3.10 错误日志表
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log (
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
) PARTITIONED BY (dt string) stored AS parquet location '/warehouse/gmall/dwd/dwd_error_log/' TBLPROPERTIES (
'parquet.compression' = 'lzo'
);
2)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_error_log PARTITION (dt = '2020-03-10') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object (
event_json,
'$.kv.errorBrief'
) errorBrief,
get_json_object (
event_json,
'$.kv.errorDetail'
) errorDetail,
server_time
FROM
dwd_base_event_log
WHERE
dt = '2020-03-10'
AND event_name = 'error';
3)测试
hive (gmall)> select * from dwd_error_log where dt='2020-03-10' limit 2;
3.11 DWD 层事件表加载数据脚本
1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim ods_to_dwd_event_log.sh
在脚本中编写如下内容
#!/bin/bash
# 定义变量方便修改
APP=gmall hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;
then
do
_date=$1
else
do
_date=`date -d "-1 day" +%F`
fi
sql="
INSERT overwrite TABLE "$APP".dwd_display_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'display';
INSERT overwrite TABLE "$APP".dwd_newsdetail_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.showtype') showtype, get_json_object(event_json,'$.kv.news_staytime') news_staytime, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.type1') type1, get_json_object(event_json,'$.kv.category') category,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'newsdetail';
INSERT overwrite TABLE "$APP".dwd_loading_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.loading_way') loading_way, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.extend2') extend2, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.type1') type1,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'loading';
INSERT overwrite TABLE "$APP".dwd_ad_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.contentType') contentType, get_json_object(event_json,'$.kv.displayMills') displayMills, get_json_object(event_json,'$.kv.itemId') itemId, get_json_object(event_json,'$.kv.activityId') activityId,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'ad';
INSERT overwrite TABLE "$APP".dwd_notification_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.noti_type') noti_type, get_json_object(event_json,'$.kv.ap_time') ap_time, get_json_object(event_json,'$.kv.content') content,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'notification';
INSERT overwrite TABLE "$APP".dwd_active_background_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'active_background';
INSERT overwrite TABLE "$APP".dwd_comment_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.p_comment_id') p_comment_id, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.addtime') addtime, get_json_object(event_json,'$.kv.other_id') other_id, get_json_object(event_json,'$.kv.praise_count') praise_count, get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'comment';
INSERT overwrite TABLE "$APP".dwd_favorites_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.course_id') course_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.add_time') add_time,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'favorites';
INSERT overwrite TABLE "$APP".dwd_praise_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.target_id') target_id, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.add_time') add_time,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'praise';
INSERT overwrite TABLE "$APP".dwd_error_log PARTITION (dt = '$do_date') SELECT
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief, get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
FROM
"$APP".dwd_base_event_log
WHERE
dt = '$do_date'
AND event_name = 'error';
"
$hive -e "$sql"
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 ods_to_dwd_event_log.sh
3)脚本使用
[atguigu@hadoop102 module]$ ods_to_dwd_event_log.sh 2020-03-11 4)
查询导入结果
hive (gmall)> select * from dwd_comment_log where dt='2020-03-11' limit 2; 5)
脚本执行时间
企业开发中一般在每日凌晨 30 分~1 点
4.业务数据
4.1商品维度表(全量表)
1)建表语句
hive (gmall) > hive (gmall) > DROP TABLE
IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
`id` string COMMENT '商品 id',
`spu_id` string COMMENT 'spuid',
`price` DOUBLE COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` DOUBLE COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`tm_name` string COMMENT '品牌名称',
`category3_id` string COMMENT '三级分类 id',
`category2_id` string COMMENT '二级分类 id',
`category1_id` string COMMENT '一级分类 id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
`spu_name` string COMMENT 'spu 名称',
`create_time` string COMMENT '创建时间'
) COMMENT '商品维度表' PARTITIONED BY (`dt` string) stored AS parquet location '/warehouse/gmall/dwd/dwd_dim_sku_info/' tblproperties (
"parquet.compression" = "lzo"
);
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_dim_sku_info PARTITION (dt = '2020-03-10') SELECT
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3. NAME category3_name,
c2. NAME category2_name,
c1. NAME category1_name,
spu.spu_name,
sku.create_time
FROM
(
SELECT
*
FROM
ods_sku_info
WHERE
dt = '2020-03-10'
) sku
JOIN (
SELECT
*
FROM
ods_base_trademark
WHERE
dt = '2020-03-10'
) ob ON sku.tm_id = ob.tm_id
JOIN (
SELECT
*
FROM
ods_spu_info
WHERE
dt = '2020-03-10'
) spu ON spu.id = sku.spu_id
JOIN (
SELECT
*
FROM
ods_base_category3
WHERE
dt = '2020-03-10'
) c3 ON sku.category3_id = c3.id
JOIN (
SELECT
*
FROM
ods_base_category2
WHERE
dt = '2020-03-10'
) c2 ON c3.category2_id = c2.id
JOIN (
SELECT
*
FROM
ods_base_category1
WHERE
dt = '2020-03-10'
) c1 ON c2.category1_id = c1.id;
3)查询加载结果 hive (gmall)> select * from dwd_dim_sku_info where dt='2020-03-10';
4.2 优惠券信息表(全量)
把 ODS 层 ods_coupon_info 表数据导入到 DWD 层优惠卷信息表,在导入过程中可以做 适当的清洗。
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_dim_coupon_info;
CREATE external TABLE dwd_dim_coupon_info (
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券信息表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' stored AS parquet location '/warehouse/gmall/dwd/dwd_dim_coupon_info/';
tblproperties (
"parquet.compression" = "lzo"
);
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_dim_coupon_info PARTITION (dt = '2020-03-10') SELECT
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
FROM
ods_coupon_info
WHERE
dt = '2020-03-10';
3)查询加载结果
hive (gmall)> select * from dwd_dim_coupon_info where dt='2020-03-10';
4.3 活动维度表(全量)
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_dim_activity_info;
CREATE external TABLE dwd_dim_activity_info (
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`condition_amount` string COMMENT '满减金额',
`condition_num` string COMMENT '满减件数',
`benefit_amount` string COMMENT '优惠金额',
`benefit_discount` string COMMENT '优惠折扣',
`benefit_level` string COMMENT '优惠级别',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动信息表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' stored AS parquet location '/warehouse/gmall/dwd/dwd_dim_activity_info/';
tblproperties (
"parquet.compression" = "lzo"
);
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_dim_activity_info PARTITION (dt = '2020-03-10') SELECT
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
info.start_time,
info.end_time,
info.create_time from (
SELECT
*
FROM
ods_activity_info
WHERE
dt = '2020-03-10'
) info
LEFT JOIN (
SELECT
*
FROM
ods_activity_rule
WHERE
dt = '2020-03-10'
) rule ON info.id = rule.activity_id;
3)查询加载结果
hive (gmall)> select * from dwd_dim_activity_info where dt='2020-03-10';
4.4 地区维度表(特殊)
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS `dwd_dim_base_province`;
CREATE EXTERNAL TABLE `dwd_dim_base_province` (
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'ISO 编码',
`region_id` string COMMENT '地区id',
`region_name` string COMMENT '地区名称'
) COMMENT '地区省市表'
location '/warehouse/gmall/dwd/dwd_dim_base_province/';
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_dim_base_province SELECT
bp.id,
bp. NAME,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_namefrom ods_base_province bp
JOIN ods_base_region br ON bp.region_id = br.id;
3)查询加载结果
hive (gmall)> select * from dwd_dim_base_province;
4.5 时间维度表(特殊)(预留)
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS `dwd_dim_date_info`;
CREATE EXTERNAL TABLE `dwd_dim_date_info` (
`date_id` string COMMENT '日',
`week_id` INT COMMENT '周',
`week_day` INT COMMENT '周的第几天',
`day` INT COMMENT '每月的第几天',
`month` INT COMMENT '第几月',
`quarter` INT COMMENT '第几季度',
`year` INT COMMENT '年',
`is_workday` INT COMMENT '是否是周末',
`holiday_id` INT COMMENT '是否是节假日'
)
ROW format delimited FIELDS TERMINATED BY '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info/';
2)把 date_info.txt 文件上传到 hadoop102 的/opt/module/db_log/路径
3)数据装载
hive (gmall)> load data local inpath '/opt/module/db_log/date_info.txt' into table dwd_dim_date_info;
4)查询加载结果 hive (gmall)> select * from dwd_dim_date_info;
4.6 订单明细事实表(事务型快照事实表)
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_order_detail;
CREATE external TABLE dwd_fact_order_detail (
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'sku 商品 id',
`sku_name` string COMMENT '商品名称',
`order_price` DECIMAL (10, 2) COMMENT '商品价格',
`sku_num` BIGINT COMMENT '商品数量',
`create_time` string COMMENT '创建时间',
`province_id` string COMMENT '省份 ID',
`total_amount` DECIMAL (20, 2) COMMENT '订单总金额'
) PARTITIONED BY (`dt` string) stored AS parquet location '/warehouse/gmall/dwd/dwd_fact_order_detail/' tblproperties (
"parquet.compression" = "lzo"
);
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_order_detail PARTITION (dt = '2020-03-10') SELECT
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price * od.sku_num
FROM
(
SELECT
*
FROM
ods_order_detail
WHERE
dt = '2020-03-10'
) od
JOIN (
SELECT
*
FROM
ods_order_info
WHERE
dt = '2020-03-10'
) oi ON od.order_id = oi.id;
3)查询加载结果
hive (gmall)> select * from dwd_fact_order_detail where dt='2020-03-10';
4.7 支付事实表(事务型快照事实表)
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_payment_info;
CREATE external TABLE dwd_fact_payment_info (
`id` string COMMENT '',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`payment_amount` DECIMAL (16, 2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间',
`province_id` string COMMENT '省份 ID'
) PARTITIONED BY (`dt` string) stored AS parquet location '/warehouse/gmall/dwd/dwd_fact_payment_info/' tblproperties (
"parquet.compression" = "lzo"
);
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_payment_info PARTITION (dt = '2020-03-10') SELECT
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi. SUBJECT,
pi.payment_type,
pi.payment_time,
oi.province_id
FROM
(
SELECT
*
FROM
ods_payment_info
WHERE
dt = '2020-03-10'
) pi
JOIN (
SELECT
id,
province_id
FROM
ods_order_info
WHERE
dt = '2020-03-10'
) oi ON pi.order_id = oi.id;
3)查询加载结果
hive (gmall)> select * from dwd_fact_payment_info where dt='2020-03-10';
4.8 退款事实表(事务型快照事实表)
把 ODS 层 ods_order_refund_info 表数据导入到 DWD 层退款事实表,在导入过程中可 以做适当的清洗。
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_order_refund_info;
CREATE external TABLE dwd_fact_order_refund_info (
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`order_id` string COMMENT '订单 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款类型',
`refund_num` BIGINT COMMENT '退款件数',
`refund_amount` DECIMAL (16, 2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/';
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_order_refund_info PARTITION (dt = '2020-03-10') SELECT
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
FROM
ods_order_refund_info
WHERE
dt = '2020-03-10';
3)查询加载结果
hive (gmall)> select * from dwd_fact_order_refund_info where dt='2020-03-10';
4.9 评价事实表(事务型快照事实表)
把 ODS 层 ods_comment_info 表数据导入到 DWD 层评价事实表,在导入过程中可以做 适当的清洗。
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_comment_info;
CREATE external TABLE dwd_fact_comment_info (
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '订单 ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' location '/warehouse/gmall/dwd/dwd_fact_comment_info/';
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_comment_info PARTITION (dt = '2020-03-10') SELECT
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
FROM
ods_comment_info
WHERE
dt = '2020-03-10';
3)查询加载结果 hive (gmall)> select * from dwd_fact_comment_info where dt='2020-03-10';
4.10 加购事实表(周期型快照事实表,每日快照)
由于购物车的数量是会发生变化,所以导增量不合适。 每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。 周期型快照事实表劣势:存储的数据量会比较大。 解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可 以删除以前的数据。
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_cart_info;
CREATE external TABLE dwd_fact_cart_info (
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku 名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单。1 为已下单;0 为未下单',
`order_time` string COMMENT '下单时间'
) COMMENT '加购事实表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' location '/warehouse/gmall/dwd/dwd_fact_cart_info/';
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_cart_info PARTITION (dt = '2020-03-10') SELECT
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
FROM
ods_cart_info
WHERE
dt = '2020-03-10';
3)查询加载结果
hive (gmall)> select * from dwd_fact_cart_info where dt='2020-03-10';
4.11 收藏事实表(周期型快照事实表,每日快照)
收藏的标记,是否取消,会发生变化,做增量不合适。 每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_favor_info;
CREATE external TABLE dwd_fact_favor_info (
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '收藏事实表' PARTITIONED BY (`dt` string)
ROW format delimited FIELDS TERMINATED BY '\t' location '/warehouse/gmall/dwd/dwd_fact_favor_info/';
2)数据装载
hive (gmall) > INSERT overwrite TABLE dwd_fact_favor_info PARTITION (dt = '2020-03-10') SELECT
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
FROM
ods_favor_info
WHERE
dt = '2020-03-10';
3)查询加载结果 hive (gmall)> select * from dwd_fact_favor_info where dt='2020-03-10';
4.12 优惠券领用事实表(累积型快照事实表)
优惠卷的生命周期:领取优惠卷-》用优惠卷下单-》优惠卷参与支付
累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数
1)建表语句
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_coupon_use;
CREATE external TABLE dwd_fact_coupon_use (
`id` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券 ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '订单 id',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表' PARTITIONED BY (`dt` string) ROW format delimited FIELDS TERMINATED BY '\t' location '/warehouse/gmall/dwd/dwd_fact_coupon_use/';
hive (gmall) >
SET hive.exec.dynamic. PARTITION . MODE = nonstrict;
INSERT overwrite TABLE dwd_fact_coupon_use PARTITION (dt) SELECT
IF (new.id IS NULL, old.id, new.id),
IF (
new.coupon_id IS NULL,
old.coupon_id,
new.coupon_id
),
IF (
new.user_id IS NULL,
old.user_id,
new.user_id
),
IF (
new.order_id IS NULL,
old.order_id,
new.order_id
),
IF (
new.coupon_status IS NULL,
old.coupon_status,
new.coupon_status
),
IF (
new.get_time IS NULL,
old.get_time,
new.get_time
),
IF (
new.using_time IS NULL,
old.using_time,
new.using_time
),
IF (
new.used_time IS NULL,
old.used_time,
new.used_time
),
date_format(
IF (
new.get_time IS NULL,
old.get_time,
new.get_time
),
'yyyy-MM-dd'
)
FROM
(
SELECT
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
FROM
dwd_fact_coupon_use
WHERE
dt IN (
SELECT
date_format(get_time, 'yyyy-MM-dd')
FROM
ods_coupon_use
WHERE
dt = '2020-03-10'
)
) old
FULL OUTER JOIN (
SELECT
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
FROM
ods_coupon_use
WHERE
dt = '2020-03-10'
) new ON old.id = new.id;
3)查询加载结果
hive (gmall)> select * from dwd_fact_coupon_use where dt='2020-03-10';
4.13 订单事实表(累积型快照事实表)
1)concat 函数
concat 函数在连接字符串的时候,只要其中一个是 NULL,那么将返回 NULL
hive> select concat('a','b');
a
bhive> select concat('a','b',null);
NULL
2)concat_ws 函数
concat_ws 函数在连接字符串的时候,只要有一个字符串不是 NULL,就不会返回 NULL。
concat_ws 函数需要指定分隔符。
hive> select concat_ws('-','a','b');
a-b
hive> select concat_ws('-','a','b',null);
a-b
hive> select concat_ws('','a','b',null);
ab
3)STR_TO_MAP 函数
(1)语法描述
STR_TO_MAP(VARCHAR text, VARCHAR listDelimiter, VARCHAR keyValueDelimiter)
(2)功能描述
使用 listDelimiter 将 text 分隔成 K-V 对,然后使用 keyValueDelimiter 分隔每个 K-V 对, 组装成 MAP 返回。默认 listDelimiter 为( ,),keyValueDelimiter 为(=)。
(3)案例
str_to_map('1001=2020-03-10,1002=2020-03-10', ',' , '=')
输出{"1001":"2020-03-10","1002":"2020-03-10"}
4)建表语句
订单生命周期:创建时间=》支付时间=》取消时间=》完成时间=》退款时间=》退款完成时间。
由于 ODS 层订单表只有创建时间和操作时间两个状态,不能表达所有时间含义,所以 需要关联订单状态表。订单事实表里面增加了活动 id,所以需要关联活动订单表。
hive (gmall) > DROP TABLE
IF EXISTS dwd_fact_order_info;
CREATE external TABLE dwd_fact_order_info (
`id` string COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户 id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间(未支付状态)',
`payment_time` string COMMENT '支付时间(已支付状态)',
`cancel_time` string COMMENT '取消时间(已取消状态)',
`finish_time` string COMMENT '完成时间(已完成状态)',
`refund_time` string COMMENT '退款时间(退款中状态)',
`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)',
`province_id` string COMMENT '省份 ID',
`activity_id` string COMMENT '活动 ID',
`original_total_amount` string COMMENT '原价金额',
`benefit_reduce_amount` string COMMENT '优惠金额',
`feight_fee` string COMMENT '运费',
`final_total_amount` DECIMAL (10, 2) COMMENT '订单金额'
) PARTITIONED BY (`dt` string) stored AS parquet location '/warehouse/gmall/dwd/dwd_fact_order_info/' tblproperties (
"parquet.compression" = "lzo"
);
5)数据装载
常用函数
concat函数
hive (gmall) > SELECT
order_id,
concat(
order_status,
'=',
operate_time
)
FROM
ods_order_status_log
WHERE
dt = '2020-03-10';
输出:
3210 1001=2020-03-10 00:00:00.0
3211 1001=2020-03-10 00:00:00.0
3212 1001=2020-03-10 00:00:00.0
3210 1002=2020-03-10 00:00:00.0
3211 1002=2020-03-10 00:00:00.0
3212 1002=2020-03-10 00:00:00.0
3210 1005=2020-03-10 00:00:00.0
3211 1004=2020-03-10 00:00:00.0
3212 1004=2020-03-10 00:00:00.0
collect_set 函数
hive (gmall) > SELECT
order_id,
collect_set (
concat(
order_status,
'=',
operate_time
)
)
FROM
ods_order_status_log
WHERE
dt = '2020-03-10'
GROUP BY
order_id;
输出:
3210 ["1001=2020-03-10 00:00:00.0","1002=2020-03-10 00:00:00.0","1005=2020-03-10 00:00:00.0"]
3211 ["1001=2020-03-10 00:00:00.0","1002=2020-03-10 00:00:00.0","1004=2020-03-10 00:00:00.0"]
3212 ["1001=2020-03-10 00:00:00.0","1002=2020-03-10 00:00:00.0","1004=2020-03-10 00:00:00.0"]
concat_ws函数
hive (gmall) > SELECT
order_id,
concat_ws(
',',
collect_set (
concat(
order_status,
'=',
operate_time
)
)
)
FROM
ods_order_status_log
WHERE
dt = '2020-03-10'
GROUP BY
order_id;
输出:
3210 1001=2020-03-10 00:00:00.0,1002=2020-03-10 00:00:00.0,1005=2020-03-10 00:00:00.0 3211 1001=2020-03-10 00:00:00.0,1002=2020-03-10 00:00:00.0,1004=2020-03-10 00:00:00.0 3212 1001=2020-03-10 00:00:00.0,1002=2020-03-10 00:00:00.0,1004=2020-03-10 00:00:00.0
str_to_map函数
hive (gmall) > SELECT
order_id,
str_to_map (
concat_ws(
',',
collect_set (
concat(
order_status,
'=',
operate_time
)
)
),
',',
'='
)
FROM
ods_order_status_log
WHERE
dt = '2020-03-10'
GROUP BY
order_id;
输出:
3210 {"1001":"2020-03-10 00:00:00.0","1002":"2020-03-10 00:00:00.0","1005":"2020-03-10 00:00:00.0"}
3211 {"1001":"2020-03-10 00:00:00.0","1002":"2020-03-10 00:00:00.0","1004":"2020-03-10 00:00:00.0"}
3212 {"1001":"2020-03-10 00:00:00.0","1002":"2020-03-10 00:00:00.0","1004":"2020-03-10 00:00:00.0"}
6)数据装载
hive (gmall) >
SET hive.exec.dynamic. PARTITION . MODE = nonstrict;
INSERT overwrite TABLE dwd_fact_order_info PARTITION (dt) SELECT
IF (new.id IS NULL, old.id, new.id),
IF (
new.order_status IS NULL,
old.order_status,
new.order_status
),
IF (
new.user_id IS NULL,
old.user_id,
new.user_id
),
IF (
new.out_trade_no IS NULL,
old.out_trade_no,
new.out_trade_no
),
IF (
new.tms [ '1001' ] IS NULL,
old.create_time,
new.tms [ '1001' ]
) ,-- 1001 对应未支付状态
IF (
new.tms [ '1002' ] IS NULL,
old.payment_time,
new.tms [ '1002' ]
),
IF (
new.tms [ '1003' ] IS NULL,
old.cancel_time,
new.tms [ '1003' ]
),
IF (
new.tms [ '1004' ] IS NULL,
old.finish_time,
new.tms [ '1004' ]
),
IF (
new.tms [ '1005' ] IS NULL,
old.refund_time,
new.tms [ '1005' ]
),
IF (
new.tms [ '1006' ] IS NULL,
old.refund_finish_time,
new.tms [ '1006' ]
),
IF (
new.province_id IS NULL,
old.province_id,
new.province_id
),
IF (
new.activity_id IS NULL,
old.activity_id,
new.activity_id
),
IF (
new.original_total_amount IS NULL,
old.original_total_amount,
new.original_total_amount
),
IF (
new.benefit_reduce_amount IS NULL,
old.benefit_reduce_amount,
new.benefit_reduce_amount
),
IF (
new.feight_fee IS NULL,
old.feight_fee,
new.feight_fee
),
IF (
new.final_total_amount IS NULL,
old.final_total_amount,
new.final_total_amount
),
date_format(
IF (
new.tms [ '1001' ] IS NULL,
old.create_time,
new.tms [ '1001' ]
),
'yyyy-MM-dd'
)
FROM
(
SELECT
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
FROM
dwd_fact_order_info
WHERE
dt IN (
SELECT
date_format(create_time, 'yyyy-MM-dd')
FROM
ods_order_info
WHERE
dt = '2020-03-10'
)
) old
FULL OUTER JOIN (
SELECT
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
FROM
(
SELECT
order_id,
str_to_map (
concat_ws(
',',
collect_set (
concat(
order_status,
'=',
operate_time
)
)
),
',',
'='
) tms
FROM
ods_order_status_log
WHERE
dt = '2020-03-10'
GROUP BY
order_id
) log
JOIN (
SELECT
*
FROM
ods_order_info
WHERE
dt = '2020-03-10'
) info ON log.order_id = info.id
LEFT JOIN (
SELECT
*
FROM
ods_activity_order
WHERE
dt = '2020-03-10'
) act ON log.order_id = act.order_id
) new ON old.id = new.id;
6)查询加载结果
hive (gmall)> select * from dwd_fact_order_info where dt='2020-03-10';
4.14 用户维度表(拉链表)
用户表中的数据每日既有可能新增,也有可能修改,但修改频率并不高,属于缓慢变化 维度,此处采用拉链表存储用户维度数据。
1)什么是拉链表
2)为什么要做拉链表
3)如何使用拉链表
4)拉链表形成过程
5)拉链表制作过程图
6)项目中的拉链表制作过程
(1)建立拉链表
hive (gmall) > DROP TABLE
IF EXISTS dwd_dim_user_info_his;
CREATE external TABLE dwd_dim_user_info_his (
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链表' stored AS parquet location '/warehouse/gmall/dwd/dwd_dim_user_info_his/' tblproperties (
"parquet.compression" = "lzo"
);
(2)初始化拉链表
hive (gmall) > INSERT overwrite TABLE dwd_dim_user_info_his SELECT
id,
NAME,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-10',
'9999-99-99'
FROM
ods_user_info oi
WHERE
oi.dt = '2020-03-10';
步骤 1:制作当日变动数据(包括新增,修改)每日执行
(1)如何获得每日变动表
a.最好表内有创建时间和变动时间(Lucky!)
b.如果没有,可以利用第三方工具监控比如 canal,监控 MySQL 的实时变化进行记录(麻 烦)。c.逐行对比前后两天的数据,检查 md5(concat(全部有可能变化的字段))是否相同(low)
d.要求业务数据库提供变动流水
(2)因为 ods_order_info 本身导入过来就是新增变动明细的表,所以不用处理
a)数据库中新增 2020-03-11 一天的数据
b)通过 Sqoop 把 2020-03-11 日所有数据导入 mysqlTohdfs.sh all 2020-03-11
c)ods 层数据导入 hdfs_to_ods_db.sh all 2020-03-11
步骤 2:先合并变动信息,再追加新增信息,插入到临时表中
1)建立临时表
hive (gmall) > DROP TABLE
IF EXISTS dwd_dim_user_info_his_tmp;
CREATE external TABLE dwd_dim_user_info_his_tmp (
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表' stored AS parquet location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/' tblproperties (
"parquet.compression" = "lzo"
);
2)导入脚本
hive (gmall) > INSERT overwrite TABLE dwd_dim_user_info_his_tmp SELECT
*
FROM
(
SELECT
id,
NAME,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-11' start_date,
'9999-99-99' end_date
FROM
ods_user_info
WHERE
dt = '2020-03-11'
UNION ALL
SELECT
uh.id,
uh. NAME,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
IF (
ui.id IS NOT NULL
AND uh.end_date = '9999-99-99',
date_add(ui.dt ,- 1),
uh.end_date
) end_date
FROM
dwd_dim_user_info_his uh
LEFT JOIN (
SELECT
*
FROM
ods_user_info
WHERE
dt = '2020-03-11'
) ui ON uh.id = ui.id
) his
ORDER BY
his.id,
start_date;
步骤 3:把临时表覆盖给拉链表
1)导入数据
hive (gmall) > INSERT overwrite TABLE dwd_dim_user_info_his SELECT
*
FROM
dwd_dim_user_info_his_tmp;
2)查询导入数据
hive (gmall)> select id, start_date, end_date from dwd_dim_user_info_his;
4.15 DWD 层数据导入脚本
1)在/home/atguigu/bin 目录下创建脚本 ods_to_dwd_db.sh
[atguigu@hadoop102 bin]$ vim ods_to_dwd_db.sh
在脚本中填写如下内容
#!/bin/bash
APP=gmall hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;
then
do
_date=$2
else
do
_date=`date -d "-1 day" +%F`
fisql1=" set hive.exec.dynamic.partition.mode=nonstrict;
INSERT overwrite TABLE $ { APP }.dwd_dim_sku_info PARTITION (dt = '$do_date') SELECT
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3. NAME category3_name,
c2. NAME category2_name,
c1. NAME category1_name,
spu.spu_name,
sku.create_time
FROM
(
SELECT
*
FROM
$ { APP }.ods_sku_info
WHERE
dt = '$do_date'
) sku
JOIN (
SELECT
*
FROM
$ { APP }.ods_base_trademark
WHERE
dt = '$do_date'
) ob ON sku.tm_id = ob.tm_id
JOIN (
SELECT
*
FROM
$ { APP }.ods_spu_info
WHERE
dt = '$do_date'
) spu ON spu.id = sku.spu_id
JOIN (
SELECT
*
FROM
$ { APP }.ods_base_category3
WHERE
dt = '$do_date'
) c3 ON sku.category3_id = c3.id
JOIN (
SELECT
*
FROM
$ { APP }.ods_base_category2
WHERE
dt = '$do_date'
) c2 ON c3.category2_id = c2.id
JOIN (
SELECT
*
FROM
$ { APP }.ods_base_category1
WHERE
dt = '$do_date'
) c1 ON c2.category1_id = c1.id;
INSERT overwrite TABLE $ { APP }.dwd_dim_coupon_info PARTITION (dt = '$do_date') SELECT
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
FROM
$ { APP }.ods_coupon_info
WHERE
dt = '$do_date';
INSERT overwrite TABLE $ { APP }.dwd_dim_activity_info PARTITION (dt = '$do_date') SELECT
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
info.start_time,
info.end_time,
info.create_time
FROM
(
SELECT
*
FROM
$ { APP }.ods_activity_info
WHERE
dt = '$do_date'
) info
LEFT JOIN (
SELECT
*
FROM
$ { APP }.ods_activity_rule
WHERE
dt = '$do_date'
) rule ON info.id = rule.activity_id;
INSERT overwrite TABLE $ { APP }.dwd_fact_order_detail PARTITION (dt = '$do_date') SELECT
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price * od.sku_num
FROM
(
SELECT
*
FROM
$ { APP }.ods_order_detail
WHERE
dt = '$do_date'
) od
JOIN (
SELECT
*
FROM
$ { APP }.ods_order_info
WHERE
dt = '$do_date'
) oi ON od.order_id = oi.id;
INSERT overwrite TABLE $ { APP }.dwd_fact_payment_info PARTITION (dt = '$do_date') SELECT
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi. SUBJECT,
pi.payment_type,
pi.payment_time,
oi.province_id
FROM
(
SELECT
*
FROM
$ { APP }.ods_payment_info
WHERE
dt = '$do_date'
) pi
JOIN (
SELECT
id,
province_id
FROM
$ { APP }.ods_order_info
WHERE
dt = '$do_date'
) oi ON pi.order_id = oi.id;
INSERT overwrite TABLE $ { APP }.dwd_fact_order_refund_info PARTITION (dt = '$do_date') SELECT
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
FROM
$ { APP }.ods_order_refund_info
WHERE
dt = '$do_date';
INSERT overwrite TABLE $ { APP }.dwd_fact_comment_info PARTITION (dt = '$do_date') SELECT
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
FROM
$ { APP }.ods_comment_info
WHERE
dt = '$do_date';
INSERT overwrite TABLE $ { APP }.dwd_fact_cart_info PARTITION (dt = '$do_date') SELECT
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
FROM
$ { APP }.ods_cart_info
WHERE
dt = '$do_date';
INSERT overwrite TABLE $ { APP }.dwd_fact_favor_info PARTITION (dt = '$do_date') SELECT
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
FROM
$ { APP }.ods_favor_info
WHERE
dt = '$do_date';
INSERT overwrite TABLE $ { APP }.dwd_fact_coupon_use PARTITION (dt) SELECT
IF (new.id IS NULL, old.id, new.id),
IF (
new.coupon_id IS NULL,
old.coupon_id,
new.coupon_id
),
IF (
new.user_id IS NULL,
old.user_id,
new.user_id
),
IF (
new.order_id IS NULL,
old.order_id,
new.order_id
),
IF (
new.coupon_status IS NULL,
old.coupon_status,
new.coupon_status
),
IF (
new.get_time IS NULL,
old.get_time,
new.get_time
),
IF (
new.using_time IS NULL,
old.using_time,
new.using_time
),
IF (
new.used_time IS NULL,
old.used_time,
new.used_time
),
date_format(
IF (
new.get_time IS NULL,
old.get_time,
new.get_time
),
'yyyy-MM-dd'
)
FROM
(
SELECT
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
FROM
$ { APP }.dwd_fact_coupon_use
WHERE
dt IN (
SELECT
date_format(get_time, 'yyyy-MM-dd')
FROM
$ { APP }.ods_coupon_use
WHERE
dt = '$do_date'
)
) old
FULL OUTER JOIN (
SELECT
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
FROM
$ { APP }.ods_coupon_use
WHERE
dt = '$do_date'
) new ON old.id = new.id;
INSERT overwrite TABLE $ { APP }.dwd_fact_order_info PARTITION (dt) SELECT
IF (new.id IS NULL, old.id, new.id),
IF (
new.order_status IS NULL,
old.order_status,
new.order_status
),
IF (
new.user_id IS NULL,
old.user_id,
new.user_id
),
IF (
new.out_trade_no IS NULL,
old.out_trade_no,
new.out_trade_no
),
IF (
new.tms [ '1001' ] IS NULL,
old.create_time,
new.tms [ '1001' ]
) ,-- 1001 对应未支付状态
IF (
new.tms [ '1002' ] IS NULL,
old.payment_time,
new.tms [ '1002' ]
),
IF (
new.tms [ '1003' ] IS NULL,
old.cancel_time,
new.tms [ '1003' ]
),
IF (
new.tms [ '1004' ] IS NULL,
old.finish_time,
new.tms [ '1004' ]
),
IF (
new.tms [ '1005' ] IS NULL,
old.refund_time,
new.tms [ '1005' ]
),
IF (
new.tms [ '1006' ] IS NULL,
old.refund_finish_time,
new.tms [ '1006' ]
),
IF (
new.province_id IS NULL,
old.province_id,
new.province_id
),
IF (
new.activity_id IS NULL,
old.activity_id,
new.activity_id
),
IF (
new.original_total_amount IS NULL,
old.original_total_amount,
new.original_total_amount
),
IF (
new.benefit_reduce_amount IS NULL,
old.benefit_reduce_amount,
new.benefit_reduce_amount
),
IF (
new.feight_fee IS NULL,
old.feight_fee,
new.feight_fee
),
IF (
new.final_total_amount IS NULL,
old.final_total_amount,
new.final_total_amount
),
date_format(
IF (
new.tms [ '1001' ] IS NULL,
old.create_time,
new.tms [ '1001' ]
),
'yyyy-MM-dd'
)
FROM
(
SELECT
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
FROM
$ { APP }.dwd_fact_order_info
WHERE
dt IN (
SELECT
date_format(create_time, 'yyyy-MM-dd')
FROM
$ { APP }.ods_order_info
WHERE
dt = '$do_date'
)
) old
FULL OUTER JOIN (
SELECT
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
FROM
(
SELECT
order_id,
str_to_map (
concat_ws(
',',
collect_set (
concat(
order_status,
'=',
operate_time
)
)
),
',',
' ='
) tmsfrom $ { APP }.ods_order_status_log
WHERE
dt = '$do_date'
GROUP BY
order_id
) log
JOIN (
SELECT
*
FROM
$ { APP }.ods_order_info
WHERE
dt = '$do_date'
) info ON log.order_id = info.id
LEFT JOIN (
SELECT
*
FROM
$ { APP }.ods_activity_order
WHERE
dt = '$do_date'
) act ON log.order_id = act.order_id
) new ON old.id = new.id;
INSERT overwrite TABLE $ { APP }.dwd_dim_user_info_his_tmp SELECT
*
FROM
(
SELECT
id,
NAME,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
FROM
$ { APP }.ods_user_info
WHERE
dt = '$do_date'
UNION ALL
SELECT
uh.id,
uh. NAME,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
IF (
ui.id IS NOT NULL
AND uh.end_date = '9999-99-99',
date_add(ui.dt ,- 1),
uh.end_date
) end_date
FROM
$ { APP }.dwd_dim_user_info_his uh
LEFT JOIN (
SELECT
*
FROM
$ { APP }.ods_user_info
WHERE
dt = '$do_date'
) ui ON uh.id = ui.id
) his
ORDER BY
his.id,
start_date;
INSERT overwrite TABLE $ { APP }.dwd_dim_user_info_his SELECT
*
FROM
$ { APP }.dwd_dim_user_info_his_tmp;
"sql2=" INSERT overwrite TABLE $ { APP }.dwd_dim_base_province SELECT
bp.id,
bp. NAME,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
FROM
$ { APP }.ods_base_province bp
JOIN $ { APP }.ods_base_region br ON bp.region_id = br.id;
"
case $1 in
"first"){
$hive -e "$sql1"
$hive -e "$sql2"
};; "all"){
$hive -e "$sql1"
};; esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 ods_to_dwd_db.sh 3)
执行脚本导入数据
[atguigu@hadoop102 bin]$ ods_to_dwd_db.sh all 2020-03-11 4)
查看导入数据
hive (gmall)>
select * from dwd_fact_order_info where dt='2020-03-11';
select * from dwd_fact_order_detail where dt='2020-03-11';
select * from dwd_fact_comment_info where dt='2020-03-11';
select * from dwd_fact_order_refund_info where dt='2020-03-11';