算法训练和模型部署如何避免多次重写数据预处理代码

前言

前段时间,我们对接算法的工程师哭丧的和我说,模型生成后一般都要部署成API的形态对外提供服务,但是算法工程师并没有提供如何将一条数据转化特征向量的方法,他能拿到的是代码逻辑以及一些“中间元数据”。数据预处理本来就复杂,翻译也是一件极其困难的事情。我解释了这件事情难以解决的原因,但是显然他还是有些失望。

今天的目标就是谈谈如何尝试改善这件事情。

解决方案

在我看来,之前业界已经给出解决方案了,就是pipeline,pipeline不仅仅包括数据特征化,还包括模型。Spark也是这么做的,但是其实应用并不广泛。原因有如下几个:

  1. 不同的框架pipeline模式不一样。比如Sklearn的pipeline并没不太容易用在工程团队上,毕竟大部分研发工程师都是用java/c++系的。
  2. Spark 的pipeline 不适合API服务,因为他是为了批处理设计的,而不是为了响应时间。

知道原因后,解决方案就变得相对直观了:

  1. 用一种统一的语言描述pipeline,横跨数据处理框架和算法框架。
  2. pipeline对单条数据处理必须能够在毫秒级,同时需要保持数据预处理离线训练和online预测/流预测的一致性。

用一种统一的语言描述pipeline

能做统一描述的其实是SQL,我们会将数据特征化流程也抽象成了算法训练,通过调参的方式干预特征处理,另外,我们尽可能的根据任务进行高级抽象而不是小功能点。这点很重要。比如:

-- 把文本字段转化为tf/idf向量,可以自定义词典
train orginal_text_corpus as TfIdfInPlace.`/tmp/tfidfinplace`
where inputCol="content"
-- 分词相关配置
and ignoreNature="true"
-- 停用词路径
and stopWordPath="/tmp/tfidf/stopwords"
-- 高权重词路径
and priorityDicPath="/tmp/tfidf/prioritywords"
-- 高权重词加权倍数
and priority="5.0"
and nGram="2,3"
;

我们将tf/idf特征化抽象成了一个算法,通过参数配置覆盖了如下的一些诉求:

  1. 分词(可以指定自定义词典)
  2. 过滤停用词
  3. ngram特征组合
  4. 字符转化为数字
  5. 计算idf/tf值
  6. 记在权重高的词汇,并且给对应特征加权。

如果是在训练阶段,我们直接加载模型的数据部分,从而让后续的算法可以继续进行处理。
使用方式如下:

load parquet.`/tmp/tfidfinplace/data` 
as trainningdata;

我们也可能在其他的批处理或者流程序里去使用它预处理新的数据,这个时候我们需要
先注册一下:

register TfIdfInPlace.`/tmp/tfidfinplace` as tfidf;

然后通过UDF函数的方式去使用即可:

 select tfidf(content) from hivetable;

如果你是部署成API服务,那么通过接口注册后,可以使用类似下面的http请求:

curl -XPOST 'http://127.0.0.1:9003/model/predict' -d '
 data=["你好,世界"] & 
 dataType=string &
pipeline= tfidf,bayes
'

我这里额外添加了一个贝叶斯贝叶斯模型,这里pipeline的的调用相当于 bayes(tfidf("你好,世界")) 最后返回的是一个预测结果。

pipeline对单条数据处理必须能够在毫秒级

这个如何能做到呢?这就需要我们保存每个“数据处理模型”中间的元数据以及计算规则。比如以前面的TfIdfInPlace为例,他训练完成后会保存所有的训练参数,词空间,词和数字的映射等等。这样我们下次使用时就可以加载这些元数据,并且按特定的规则对新数据进行处理。

因为训练时的数据预处理和预测时的数据预处理本质是不同的,训练时的数据预处理只能针对批量数据,从中学习特征化的方式,而预测时的数据预处理更偏向于“利用训练时学到的经验仅仅进行计算”,这种天然不匹配带来的成本在于,你需要针对pipeline里的每个模型的预测部分(包括数据预处理和算法模型)进行重新的实现,而无法复用之前批训练时的逻辑。

对于MLSQL而言,它重新实现了大部分Spark mllib算法/数据处理模型的预测逻辑,增加了更多高阶的数据预处理模型,并且提供对tensorflow,sklearn,dl4j等框架的预测支持。

实际案例

目前StreamingPro已经实现了一个案例,比如下面的代码通过SVM和随机深林实现了一个文本分类,特征工程用的是TfIdfInPlace算法:

set traning_dir = "/tmp/lwys_corpus";

-- 加载数据
load csv.`/Users/allwefantasy/Downloads/lwys_corpus` options header="true" and delimiter="\t" and quote="'"
as lwys_corpus;

select cut as features,cast(sid as int) as label from lwys_corpus
as orginal_text_corpus;

-- 把文本字段转化为tf/idf向量,可以自定义词典
train orginal_text_corpus as TfIdfInPlace.`${traning_dir}/tfidf` 
where inputCol="features" 
-- 分词的字典路径,支持多个
and `dic.paths`="....."
-- 停用词路径
and stopWordPath="..."
-- 高权重词路径
and priorityDicPath="...."
-- 高权重词加权倍数
and priority="5.0"
;

load parquet.`${traning_dir}/tfidf/data` 
as lwys_corpus_with_featurize;

-- 把label转化为递增数字
train lwys_corpus_with_featurize StringIndex.`${traning_dir}/si` 
where inputCol="label";

register StringIndex.`${traning_dir}/si` as predict;

select predict(label) as label,features as features from lwys_corpus_with_featurize 
as lwys_corpus_final_format;

-- 切分训练集、验证集,该算法会保证每个分类都是按比例切分。
train lwys_corpus_final_format as RateSampler.`${traning_dir}/ratesampler` 
where labelCol="label"
and sampleRate="0.9,0.1";

load parquet.`${traning_dir}/ratesampler` as data2;

select * from data2 where __split__=1
as validateTable;

select * from data2 where __split__=0
as trainingTable;

-- 训练,可以配置多个模型同时进行训练

train trainingTable as SKLearn.`${traning_dir}/model`  
where `kafkaParam.bootstrap.servers`="127.0.0.1:9092"
and `kafkaParam.topic`="test"
and `kafkaParam.group_id`="g_test-1"
and  `fitParam.0.batchSize`="300"
and  `fitParam.0.labelSize`="41"
and  `fitParam.0.alg`="RandomForestClassifier"
and  `fitParam.1.batchSize`="300"
and  `fitParam.1.labelSize`="41"
and  `fitParam.1.alg`="SVM"
and validateTable="validateTable"
and `systemParam.pythonPath`="python"
and `systemParam.pythonVer`="2.7"
;

训练完成后,如果我想在流式/批处理里用,那么应该是这样的:

-- 注册特征处理模型
register TfIdfInPlace.`${traning_dir}/tfidf`  as 
tfidf_compute;
register StringIndex.`${traning_dir}/si` as label_convert;

-- 注册算法模型
register SKLearn.`${traning_dir}/model`   as predict_label;

-- 对数据进行特征处理
select  tfidf_compute(content) as feature  from  some-hive-table
as newdata;

--对数据进行预测:
select label_convert_reverse(predict_label(feature)) as predict_catogory   from  newdata;

如果我们希望部署成一个API服务,首先启动一个MLSQL服务:

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name predict_service \
streamingpro-spark-2.0-1.0.0.jar    \
-streaming.name predict_service    \
-streaming.job.file.path file:///tmp/query.json \
-streaming.platform spark   \
-streaming.rest true   \
-streaming.driver.port 9003   \
-streaming.spark.service true \
-streaming.thrift false \
-streaming.enableHiveSupport true

访问 http://127.0.0.1:9003/run/script 接口动态注册已经生成的模型:

-- 注册特征处理模型
register TfIdfInPlace.`${traning_dir}/tfidf`  as 
tfidf_compute;
register StringIndex.`${traning_dir}/si` as label_convert;

-- 注册算法模型
register SKLearn.`${traning_dir}/model`   as predict_label;

MLSQL可以注册MLSQL自身的一些数据处理模型,对于算法模型,则包含了spark mllib, tensorflow, sklearn,dl4j等流行框架。

访问http://127.0.0.1:9003/model/predict进行预测请求:

curl -XPOST 'http://127.0.0.1:9003/model/predict' -d '
 data=["你好,世界"] & 
 dataType=string &
pipeline= tfidf_compute, predict_label, label_convert
'

这个时候,会分别调用tfidf_compute, predict_label, label_convert 三个模型处理传递过来的数据,完成最后的预测。可以简单理解为一个嵌套函数调用(实际上就是,😁):

label_convert(predict_label(tfidf_compute(feature)))

对于API服务,我们的接口设计思路是,用户传进来的有三部分东西: 一个是待处理的数据(json),一个是你会用到算子(比如模型,比如数据预处理,比如自定义函数,比如SQL里的常见函数),以及如何组合这些算子(简单的比如是一个pipeline,复杂的可能多层嵌套)。

组合这些算子,我们支持这种格式 select predict(concat_vector(tfidf(field1),(tfidfield2)))。

比如前面的例子,你可以这么调用:

curl -XPOST 'http://127.0.0.1:9003/model/predict' -d '
 data=[{"content":"你好世界"},{"content":"地球美好"}] & 
 dataType=row &
 sql = select label_convert (predict_label(tfidf_compute(content))) as predict_clalss
'

data 是待处理的内容,sql是组织形式,里面的算子则是函数比如 tfidf_compute等

额外的话

通过上面描述的方式,我们可以有效的通过pipeline机制,在保证Online predict的响应时间的情况下,复用在训练时的数据预处理和算法模型,避免了重复开发,减少了研发的负担,并且基于统一的pipline dsl成功的无缝融合了数据处理框架和多个流行的机器学习框架。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356