推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
在上述步骤中,我们已经将业务数据和用户行为数据同步到了推荐系统数据库当中,接下来,我们就要对文章数据和用户数据进行分析,构建文章画像和用户画像,本文我们主要讲解如何构建文章画像。文章画像由关键词和主题词组成,我们将每个词的 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重,筛选出权重最高的 K 个词作为关键词,将 TextRank 权重最高的 K 个词与 TF-IDF 权重最高的 K 个词的共现词作为主题词。
首先,在 Hive 中创建文章数据库 article 及相关表,其中表 article_data 用于存储完整的文章信息,表 idf_keywords_values 用于存储关键词和索引信息,表 tfidf_keywords_values 用于存储关键词和 TF-IDF 权重信息,表 textrank_keywords_values 用于存储关键词和 TextRank 权重信息,表 article_profile 用于存储文章画像信息。
-- 创建文章数据库
create database if not exists article comment "artcile information" location '/user/hive/warehouse/article.db/';
-- 创建文章信息表
CREATE TABLE article_data
(
article_id BIGINT comment "article_id",
channel_id INT comment "channel_id",
channel_name STRING comment "channel_name",
title STRING comment "title",
content STRING comment "content",
sentence STRING comment "sentence"
)
COMMENT "toutiao news_channel"
LOCATION '/user/hive/warehouse/article.db/article_data';
-- 创建关键词索引信息表
CREATE TABLE idf_keywords_values
(
keyword STRING comment "article_id",
idf DOUBLE comment "idf",
index INT comment "index"
);
-- 创建关键词TF-IDF权重信息表
CREATE TABLE tfidf_keywords_values
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword STRING comment "keyword",
tfidf DOUBLE comment "tfidf"
);
-- 创建关键词TextRank权重信息表
CREATE TABLE textrank_keywords_values
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword STRING comment "keyword",
textrank DOUBLE comment "textrank"
);
-- 创建文章画像信息表
CREATE TABLE article_profile
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword MAP<STRING, DOUBLE> comment "keyword",
topics ARRAY<STRING> comment "topics"
);
计算文章完整信息
为了计算文章画像,需要将文章信息表(news_article_basic)、文章内容表(news_article_content)及频道表(news_channel)进行合并,从而得到完整的文章信息,通常使用 Spark SQL 进行处理。
通过关联表 news_article_basic, news_article_content 和 news_channel 获得文章完整信息,包括 article_id, channel_id, channel_name, title, content,这里获取一个小时内的文章信息。
spark.sql("use toutiao")
_now = datetime.today().replace(minute=0, second=0, microsecond=0)
start = datetime.strftime(_now + timedelta(days=0, hours=-1, minutes=0), "%Y-%m-%d %H:%M:%S")
end = datetime.strftime(_now, "%Y-%m-%d %H:%M:%S")
basic_content = spark.sql(
"select a.article_id, a.channel_id, a.title, b.content from news_article_basic a "
"inner join news_article_content b on a.article_id=b.article_id where a.review_time >= '{}' "
"and a.review_time < '{}' and a.status = 2".format(start, end))
basic_content.registerTempTable("temp_article")
channel_basic_content = spark.sql(
"select t.*, n.channel_name from temp_article t left join news_channel n on t.channel_id=n.channel_id")
channel_basic_content
结果如下所示
利用 concat_ws()
方法,将 channel_name, title, content 这 3 列数据合并为一列 sentence,并将结果写入文章完整信息表 article_data 中
import pyspark.sql.functions as F
spark.sql("use article")
sentence_df = channel_basic_content.select("article_id", "channel_id", "channel_name", "title", "content", \
F.concat_ws(
",",
channel_basic_content.channel_name,
channel_basic_content.title,
channel_basic_content.content
).alias("sentence")
)
del basic_content
del channel_basic_content
gc.collect() # 垃圾回收
sentence_df.write.insertInto("article_data")
sentence_df
结果如下所示,文章完整信息包括 article_id, channel_id, channel_name, title, content, sentence,其中 sentence 为 channel_name, title, content 合并而成的长文本内容
计算 TF-IDF
前面我们得到了文章的完整内容信息,接下来,我们要先对文章进行分词,然后计算每个词的 TF-IDF 权重,将 TF-IDF 权重最高的 K 个词作为文章的关键词。首先,读取文章信息
spark.sql("use article")
article_dataframe = spark.sql("select * from article_data")
利用 mapPartitions()
方法,对每篇文章进行分词,这里使用的是 jieba
分词器
words_df = article_dataframe.rdd.mapPartitions(segmentation).toDF(["article_id", "channel_id", "words"])
def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs
abspath = "/root/words"
# 结巴加载用户词典
userdict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userdict_path)
# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")
def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list
# 所有的停用词列表
stopwords_list = get_stopwords_list()
# 分词
def cut_sentence(sentence):
"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词"""
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一个词语或者是英文单词
filtered_words_list.append(seg.word)
return filtered_words_list
for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替换掉标签数据
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words
words_df
结果如下所示,words 为将 sentence 分词后的单词列表
使用分词结果对词频统计模型(CountVectorizer)进行词频统计训练,并将 CountVectorizer 模型保存到 HDFS 中
from pyspark.ml.feature import CountVectorizer
# vocabSize是总词汇的大小,minDF是文本中出现的最少次数
cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
# 训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/CV.model")
加载 CountVectorizer 模型,计算词频向量
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/CV.model")
# 得出词频向量结果
cv_result = cv_model.transform(words_df)
cv_result
结果如下所示,countFeatures 为词频向量,如 (986, [2, 4, ...], [3.0, 5.0, ...]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中分别出现 3 次和 5 次,
得到词频向量后,再利用逆文本频率模型( IDF ),根据词频向量进行 IDF 统计训练,并将 IDF 模型保存到 HDFS
from pyspark.ml.feature import IDF
idf = IDF(inputCol="countFeatures", outputCol="idfFeatures")
idf_model = idf.fit(cv_result)
idf_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/IDF.model")
我们已经分别计算了文章信息中每个词的 TF 和 IDF,这时就可以加载 CountVectorizer 模型和 IDF 模型,计算每个词的 TF-IDF
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/countVectorizerOfArticleWords.model")
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/IDFOfArticleWords.model")
cv_result = cv_model.transform(words_df)
tfidf_result = idf_model.transform(cv_result)
tfidf_result
结果如下所示,idfFeatures 为 TF-IDF 权重向量,如 (986, [2, 4, ...], [0.3, 0.5, ...]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中的 TF-IDF 值分别为 0.3 和 0.5
对文章的每个词都根据 TF-IDF 权重排序,保留 TF-IDF 权重最高的前 K 个词作为关键词
def sort_by_tfidf(partition):
TOPK = 20
for row in partition:
# 找到索引与IDF值并进行排序
_dict = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
_dict = sorted(_dict, key=lambda x: x[1], reverse=True)
result = _dict[:TOPK]
for word_index, tfidf in result:
yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)
keywords_by_tfidf = tfidf_result.rdd.mapPartitions(sort_by_tfidf).toDF(["article_id", "channel_id", "index", "weights"])
keywords_by_tfidf
结果如下所示,每篇文章保留了权重最高的 K 个单词,index 为单词索引,weights 为对应单词的 TF-IDF 权重
接下来,我们需要知道每个词的对应的 TF-IDF 值,可以利用 zip()
方法,将所有文章中的每个词及其 TF-IDF 权重组成字典,再加入索引列,由此得到每个词对应的 TF-IDF 值,将该结果保存到 idf_keywords_values 表
keywords_list_with_idf = list(zip(cv_model.vocabulary, idf_model.idf.toArray()))
def append_index(data):
for index in range(len(data)):
data[index] = list(data[index]) # 将元组转为list
data[index].append(index) # 加入索引
data[index][1] = float(data[index][1])
append_index(keywords_list_with_idf)
sc = spark.sparkContext
rdd = sc.parallelize(keywords_list_with_idf) # 创建rdd
idf_keywords = rdd.toDF(["keywords", "idf", "index"])
idf_keywords.write.insertInto('idf_keywords_values')
idf_keywords
结果如下所示,包含了所有单词的名称、TF-IDF 权重及索引
通过 index 列,将 keywords_by_tfidf
与表 idf_keywords_values 进行连接,选取文章 ID、频道 ID、关键词、TF-IDF 权重作为结果,并保存到 TF-IDF 关键词表 tfidf_keywords_values
keywords_index = spark.sql("select keyword, index idx from idf_keywords_values")
keywords_result = keywords_by_tfidf.join(keywords_index, keywords_index.idx == keywords_by_tfidf.index).select(["article_id", "channel_id", "keyword", "weights"])
keywords_result.write.insertInto("tfidf_keywords_values")
keywords_result
结果如下所示,keyword 和 weights 即为所有词在每个文章中的 TF-IDF 权重
计算 TextRank
前面我们已经计算好了每个词的 TF-IDF 权重,为了计算关键词,还需要得到每个词的 TextRank 权重,接下来,还是先读取文章完整信息
spark.sql("use article")
article_dataframe = spark.sql("select * from article_data")
对文章 sentence 列的内容进行分词,计算每个词的 TextRank 权重,并将每篇文章 TextRank 权重最高的 K 个词保存到 TextRank 结果表 textrank_keywords_values
textrank_keywords_df = article_dataframe.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id", "keyword", "textrank"])
textrank_keywords_df.write.insertInto("textrank_keywords_values")
TextRank 计算细节:分词后只保留指定词性的词,滑动截取长度为 K 的窗口,计算窗口内的各个词的投票数
def textrank(partition):
import os
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs
abspath = "/root/words"
# 结巴加载用户词典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)
# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")
def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list
# 所有的停用词列表
stopwords_list = get_stopwords_list()
class TextRank(jieba.analyse.TextRank):
def __init__(self, window=20, word_min_len=2):
super(TextRank, self).__init__()
self.span = window # 窗口大小
self.word_min_len = word_min_len # 单词的最小长度
# 要保留的词性,根据jieba github ,具体参见https://github.com/baidu/lac
self.pos_filt = frozenset(
('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', "nw", "nz", "PER", "LOC", "ORG"))
def pairfilter(self, wp):
"""过滤条件,返回True或者False"""
if wp.flag == "eng":
if len(wp.word) <= 2:
return False
if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len \
and wp.word.lower() not in stopwords_list:
return True
# TextRank过滤窗口大小为5,单词最小为2
textrank_model = TextRank(window=5, word_min_len=2)
allowPOS = ('n', "x", 'eng', 'nr', 'ns', 'nt', "nw", "nz", "c")
for row in partition:
tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS=allowPOS, withFlag=False)
for tag in tags:
yield row.article_id, row.channel_id, tag[0], tag[1]
textrank_keywords_df
结果如下所示,keyword 和 textrank 即为每个单词在文章中的 TextRank 权重
画像计算
我们计算出 TF-IDF 和 TextRank 后,就可以计算关键词和主题词了,读取 TF-IDF 权重
idf_keywords_values = oa.spark.sql("select * from idf_keywords_values")
读取 TextRank 权重
textrank_keywords_values = oa.spark.sql("select * from textrank_keywords_values")
通过 keyword
关联 TF-IDF 权重和 TextRank 权重
keywords_res = textrank_keywords_values.join(idf_keywords_values, on=['keyword'], how='left')
计算 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重
keywords_weights = keywords_res.withColumn('weights', keywords_res.textrank * keywords_res.idf).select(["article_id", "channel_id", "keyword", "weights"])
keywords_weights
结果如下所示
这里,我们需要将相同文章的词都合并到一条记录中,将 keywords_weights
按照 article_id 分组,并利用 collect_list()
方法,分别将关键词和权重合并为列表
keywords_weights.registerTempTable('temp')
keywords_weights = spark.sql("select article_id, min(channel_id) channel_id, collect_list(keyword) keywords, collect_list(weights) weights from temp group by article_id")`
keywords_weights
结果如下所示,keywords 为每篇文章的关键词列表,weights 为关键词对应的权重列表
为了方便查询,我们需要将关键词和权重合并为一列,并存储为 map 类型,这里利用 dict()
和 zip()
方法,将每个关键词及其权重组合成字典
def to_map(row):
return row.article_id, row.channel_id, dict(zip(row.keywords, row.weights))
article_keywords = keywords_weights.rdd.map(to_map).toDF(['article_id', 'channel_id', 'keywords'])
article_keywords
结果如下所示,keywords 即为每篇文章的关键词和对应权重
前面我们计算完了关键词,接下来我们将 TF-IDF 和 TextRank 的共现词作为主题词,将 TF-IDF 权重表 tfidf_keywords_values 和 TextRank 权重表 textrank_keywords_values 进行关联,并利用 collect_set()
对结果进行去重,即可得到 TF-IDF 和 TextRank 的共现词,即主题词
topic_sql = """
select t.article_id article_id2, collect_set(t.keyword) topics from tfidf_keywords_values t
inner join
textrank_keywords_values r
where t.keyword=r.keyword
group by article_id2
"""
article_topics = spark.sql(topic_sql)
article_topics
结果如下所示,topics 即为每篇文章的主题词列表
最后,将主题词结果和关键词结果合并,即为文章画像,保存到表 article_profile
article_profile = article_keywords.join(article_topics, article_keywords.article_id==article_topics.article_id2).select(["article_id", "channel_id", "keywords", "topics"])
article_profile.write.insertInto("article_profile")
文章画像数据查询测试
hive> select * from article_profile limit 1;
OK
26 17 {"策略":0.3973770571351729,"jpg":0.9806348975390871,"用户":1.2794959063944176,"strong":1.6488457985625076,"文件":0.28144603583387057,"逻辑":0.45256526469610714,"形式":0.4123994242601279,"全自":0.9594604850547191,"h2":0.6244481634710125,"版本":0.44280276959510817,"Adobe":0.8553618185108718,"安装":0.8305037437573172,"检查更新":1.8088946300014435,"产品":0.774842382276899,"下载页":1.4256311032544344,"过程":0.19827163395829256,"json":0.6423301791599972,"方式":0.582762869780791,"退出应用":1.2338671268242603,"Setup":1.004399549339134} ["Electron","全自动","产品","版本号","安装包","检查更新","方案","版本","退出应用","逻辑","安装过程","方式","定性","新版本","Setup","静默","用户"]
Time taken: 0.322 seconds, Fetched: 1 row(s)
Apscheduler 定时更新
定义离线更新文章画像的方法,首先合并最近一个小时的文章信息,接着计算每个词的 TF-IDF 和 TextRank 权重,并根据 TF-IDF 和 TextRank 权重计算得出文章关键词和主题词,最后将文章画像信息保存到 Hive
def update_article_profile():
"""
定时更新文章画像
:return:
"""
ua = UpdateArticle()
# 合并文章信息
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():
textrank_keywords_df, keywordsIndex = ua.generate_article_label()
ua.get_article_profile(textrank_keywords_df, keywordsIndex)
利用 Apscheduler 添加定时更新文章画像任务,设定每隔 1 个小时更新一次
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
# 创建scheduler,多进程执行
executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BlockingScheduler(executors=executors)
# 添加一个定时更新文章画像的任务,每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
scheduler.start()
利用 Supervisor 进行进程管理,配置文件如下
[program:offline]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/scheduler/main.py
directory=/root/toutiao_project/scheduler
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/offlinesuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true
参考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)