目标
1.spark从hive获取数据对用户特征进行处理写入hbase
2.保留30天用户特征数据,用于模型训练(每天训练一次,离线预测)
用户画像设计
用户画像通俗说则是根据用户的行为给用户打标签,我们将用户行为划分为以下几个维度,分别是:
基础信息: 用户年龄,性别,等级等相关信息
环境信息: 用户使用app时的地域,设备,网络情况(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)
行为偏好: 用户常玩的app类型, 登录时间,在线时长等(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)
付费偏好: 用户充值额度, 付费次数, 付费商品类型, 付费时间段等等(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)
社交特征: 如果存在社交系统, 可以统计一下好友聊天, 好友开局等信息(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)
策略玩法特征: 具体游戏(或其他)应用核心内容的玩法, 使用等信息(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)
调整数值特征: 可以通过该一系列特征来调整付费,留存的字段
用户画像Hbase表结构: user_profile
列簇 | 字段 | |||
---|---|---|---|---|
key (rowKey关键字段) | time 统计日 | game_id 游戏ID | uid用户id | |
info (基础信息) | $game_sex 游戏性别[man, woman] | $vip vip等级 | $level 等级 | |
env_tag (环境偏好特征) | $address 地址分布[.*] | $device 设备分布 [huawei, oppo, apple, ...] | $os 系统分布[android, ios] | $sim 运营商分布 [yidong, liantong, ...] |
favor_tag (行为偏好特征) | $game 常玩游戏类型[tcg, rpg, mmo, ...] | $login_time 登录时段分布[00~23] | $online_time 在线时长分布[d+] | |
pay_tag (付费偏好特征) | $pay_amount 充值额度分布[count、max、min、avg] | $pay_frequency 付费次数分布[count、max、min、avg] | $pay_money 支付方式分布(基于金额) [wechat, applepay, ...] | $pay_count 支付方式分布(基于次数) [wechat, applepay, ...] |
social_tag (社交偏好特征) | $new_friend_count 新增好友[count、max、min、avg] | $chat_count 好友聊天次数[count、max、min、avg] | $chat_count_friend 游戏好友聊天次数[d+] | $chat_count_wechat 微信好友聊天次数[d+] |
strategy_tag (策略偏好特征) | $pvp_count 竞技次数[count、max、min、avg] | $pve_count 副本次数[count、max、min、avg] | $role_type 角色类型使用 [ap, ad, other] | $fight_type 战斗类型 [pvp, pve] |
adjust_tag (调整数值特征) | $allowance 破产补贴 |
[图片上传失败...(image-5e7d98-1643355969865)]
Hive数据结构
hive
show create table action (此为表名, 我这边是action表)
输入输出格式为ORC, Presto针对这种格式的数据做了优化查询, 如果是impala查询则使用parquet格式。
CREATE TABLE `action`(
`uid` string,
`uid_type` string,
`agent` string,
`ip` string,
`timestamp` timestamp,
`time` timestamp,
`year` string,
`month` string,
`week` string,
`hour` string,
`minute` string,
`properties` map<string,string>)
PARTITIONED BY (
`game_id` int,
`timezone` string,
`event` string,
`day` date)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'colelction.delim'=',',
'field.delim'='\t',
'mapkey.delim'=':',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://slaves01:8020/warehouse/tablespace/managed/hive/event.db/action'
TBLPROPERTIES (
'auto-compaction'='true',
'bucketing_version'='2',
'compaction.file-size'='128MB',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.trigger'='process-time',
'sink.shuffle-by-partition.enable'='true',
'transient_lastDdlTime'='1642571371')
Spark2Hbase逻辑处理
1.获取游戏列表和对应时区(或者传入字符串数组)
2.创建用户画像表user_profile(设置TTL为30天,压缩方式为snappy)
3.计算不同特征tag, 写入用户画像表
├── pom.xml
└── src
└── main
├── resources
│ ├── application.conf # 默认配置
│ ├── code2area.csv
│ ├── hive-site.xml # 将/usr/hdp/current/spark2-client/conf/hive-site.xml拷贝过来
│ └── reference.conf
└── scala
├── com
│ └── carol
│ └── bigdata
│ ├── App.scala # 程序主入口
│ ├── Config.scala # 配置参数
│ ├── Task.scala # 程序主要任务
│ ├── constant
│ │ └── KVConstant.scala
│ ├── task # 具体任务包
│ │ └── feature
│ │ ├── CalEnvTag.scala # 计算环境偏好特征
│ │ ├── CalFavorTag.scala # 计算行为偏好特征
│ │ ├── CalPayTag.scala # 计算付费特征
│ │ └── cag
│ │ ├── calAdjustTag.scala # 计算调整特征(此处由用户自行填写)
│ │ ├── calSocialTag.scala # 计算社交特征
│ │ └── calStrategyTag.scala # 计算策略特征
│ │ └── label
│ │ └── calRetentionLabel.scala # 计算留存标签
│ │ └── model
│ │ ├── algo # 算法模型
│ │ │ ├── DT.scala
│ │ │ ├── LR.scala
│ │ │ ├── ModelTrait.scala # 定义算法模型接口
│ │ │ ├── ModelUtil.scala
│ │ │ ├── RF.scala
│ │ │ └── SVM.scala
│ │ ├── feature
│ │ │ └── FeatureUtil.scala # 特征工程
│ │ └── train
│ │ └── TrainRetention.scala # 留存预测训练Demo
│ └── utils
│ ├── FeatureUtil.scala # 计算特征公共函数
│ ├── Flag.scala # 命令行参数
│ ├── FuncUtil.scala
│ ├── HBaseFilter.scala # hbase建表,读过滤操作
│ ├── HBaseUtil.scala # hbase建表,读写操作
│ ├── RddReader.scala # 读取hive/hdfs转换为RDD
│ └── TimeUtil.scala # 时间处理
└── org
└── apache
└── hadoop
└── hive
└── shims
└── ShimLoader.java # hadoop3不支持,在源码93行,加入对case 3 version的支持
└── spark
└── ml
└── feature
└── VectorDisassembler.scala # 将合并列拆分
样本设计
标签设计
假设此处需要预测活跃留存, 以便调整相关特征提升用户留存率。key为hbase表的rowkey相关字段, retention为标签字段, active_r1代表用户在次日仍然活跃则为1, 否则为0, 因此我们将此模型抽象为一个二分类模型。(TTL 30days)
列簇 | 字段 | |||||
---|---|---|---|---|---|---|
key | time 统计日 | game_id 游戏ID | uid 用户ID | |||
retention | active_r1 次日活跃留存[0,1] | active_r2 2日活跃留存[0,1] | active_r3 3日活跃留存[0,1] | active_r7 7日活跃留存[0,1] | active_r15 15日活跃留存[0,1] | active_r30 30日活跃留存[0,1] |
prediction | pred_active_r1 次日活跃留存[0,1] | pred_active_r2 2日活跃留存[0,1] | pred_active_r3 3日活跃留存[0,1] | pred_active_r7 7日活跃留存[0,1] | pred_active_r15 15日活跃留存[0,1] | pred_active_r30 30日活跃留存[0,1] |
样本标签表
根据用户画像和标签表, 合并成样本标签表,进行特征工程处理, 其中合并原则为以每天标签表为左表join用户画像表。(在特征工程中合并训练)
time统计, game_id 游戏ID, uid 用户ID, 用户特征(env_tag, favor_tag,...), 实际标签(active_r1 0/1)
算法模型
特征工程
将特征列类型划分为int, double, map,string列, 分别对不同的列标记tag或分段, 也可以对数值列进行标准化。如图所示, 最后一列是次日留存的标签, 前面是随意摘取的特征列, 这里使用的是spark的hash特征向量化。
模型封装
定义模型初始化, 训练, 预测, 保存接口, 具体的算法实现接口即可。
package com.carol.bigdata.task.model.algo
import java.io.File
import org.apache.spark.ml.classification.{Classifier, OneVsRest}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.DataFrame
trait ModelTrait {
// 未实现函数,子类集成必须实现,已实现函数子类可以直接使用
def init(params: Map[String, Any]): Unit
// 构建pipeline模型
def buildPipeline(featuresCol: String = "features",
labelCol: String = "label",
rawPredictionCol: String = "rawPrediction",
predictionCol: String = "prediction",
objective: String = "binary",
numClass: Int = 2): Pipeline
// 交叉验证
def buildValidator(pipeline: Pipeline,
seed: Int = 1,
numFolds: Int = 2,
parallelNum: Int = 2,
objective: String = "binary"): CrossValidator
// 基于二分类器构建任意分类的PipeLine
def buildOvrTree(binaryModel: Classifier[_, _, _],
featuresCol: String = "features",
labelCol: String = "label",
rawPredictionCol: String = "rawPrediction",
predictionCol: String = "prediction",
objective: String = "binary"): OneVsRest = {
// 构造多分类器
val ovrTree: OneVsRest = new OneVsRest()
.setClassifier(binaryModel)
.setFeaturesCol(featuresCol)
.setLabelCol(labelCol)
.setPredictionCol(predictionCol)
ovrTree
}
// 基于pipeline和gridBuilder构建交叉验证器
def buildValidatorFromGrid(pipeline: Pipeline,
gridBuilder: ParamGridBuilder,
seed: Int = 1,
numFolds: Int = 2,
parallelNum: Int = 2,
objective: String = "binary"): CrossValidator = {
// 交叉验证
val paramGrid: Array[ParamMap] = gridBuilder.build()
// 评估器
val evaluator = {
if (objective == "binary")
new BinaryClassificationEvaluator
else new MulticlassClassificationEvaluator
}
// 交叉验证模型
val cv: CrossValidator = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(evaluator)
.setSeed(seed)
.setNumFolds(numFolds) // Use 3+ in practice
.setParallelism(parallelNum) // Evaluate up to 2 parameter settings in parallel
//.setCollectSubModels(true) // specified to collect all validated models
cv
}
// 获取最优超参
def getBestParams(crossModel: CrossValidatorModel): Map[String, Any] = {
val bestParamsMap = crossModel.getEstimatorParamMaps.zip(crossModel.avgMetrics).maxBy(_._2)._1
bestParamsMap.toSeq.map(pair => (pair.param.name, pair.value)).toMap
}
// 保存模型
def saveModel(pipeline: PipelineModel, modelPath: String = "model"): Unit = {
// save到本地货HDFS,供PipelineModel加载
println(s"pipeline model saving...")
pipeline.write.overwrite.save(modelPath)
println(s"pipeline model save success to $modelPath")
}
// 评估模型
def evalModel(evalData: DataFrame, objective: String = "binary"): Unit = {
val evaluator = {
if (objective == "binary")
new BinaryClassificationEvaluator
else new MulticlassClassificationEvaluator
}
val accuracy: Double = evaluator.evaluate(evalData)
println("accuracy:", accuracy)
}
// 更新微调参数
def updateTuneParams(bestParamsMap: ParamMap): Unit
def updateTuneParamsFromCV(crossModel: CrossValidatorModel,
maxBy: Boolean = true,
objective: String = "binary"): Unit
}
模型应用
将调整数值特征划分为一系列段位, 将当日活跃用户左连用户画像特征并按段位遍历组成新的特征标签表, 使用训练好的模型进行预测, 选择用户留下来的分段区间, 从小到大进行排列, 写入离线预测表。
系列文章
第一篇: Ambari自动化部署
第二篇: 数据埋点设计和SDK源码
第三篇: 数据采集和验证方案
第四篇: ETL实时方案: Kafka->Flink->Hive
第五篇: ETL用户数据处理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函数
第七篇: 用户画像和留存预测