本文主要介绍spark LDA的使用,并且编写了EMLDAOptimizer文档推断的方法。
大纲
- LDA简单介绍
- spark LDA代码实例
- 求文档的相似性
- 计算新文档的话题分布
1. LDA简单介绍
LDA是主题模型(Topic Modeling)的一种,顾名思义,就是通过LDA模型我们可以得到一批文档的主题。但是,LDA假设每个文档是由多个主题mix混合而成的,而且每个主题可以由多个词的概率表示。
LDA定义每篇文档的生成过程,其中参数提前设定,具体步骤如下:
(a) 从中取样生成文档
的主题分布
;
(b) 对于文档中每个词:
- 从主题的
中取样生成文档
的第
个词的主题
- 从
中采样得到主题
对应的词语分布
- 从词语的
中采样得到
LDA
算法输入 | 一批文档,主题数K,超参数 |
---|---|
算法输出 | 1. 每个主题下词概率分布 |
- | 2. 每篇文档的主题概率分布 |
- | 3. 词表 |
2. LDA代码实例
首先描述一下使用场景:数据集中若干些文档(如下),
这些文档已经分好词,并且标好索引号。建索引是为了后面查找相应文档的向量。
1 文化,阅读,视频,燃文,平台,总会计师,阅读网,文书,协会,鲸鱼,领先,小说网,小说,新华
2 建设工程,税务局,welcome,平台,确认,电子,选择,nginx,招标网,发票,国税,增值税
3 数字化,人力,会议,行健,导航,在线,powered,系统,美味,星空网,discuz,上网,支持,远程,管家,访问,商控,teamviewer,餐饮
……
之后的操作分为以下几步:
- 文本预处理(需要将文本转换成向量)
- LDA模型训练
- 获取话题的词概率分布,和文档的话题概率分布
2.1 文本预处理
在文本预处理中,会使用到token化、过滤部分词、向量化等处理。在spark中有相应的类,如RegexTokenizer/Tokenizer、StopWordsRemover、CountVectorizer/CountVectorizerModel。
import org.apache.spark.ml.feature.{CountVectorizerModel, RegexTokenizer, StopWordsRemover}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
private case class User(
user_id: Long,
user: String,
text: String)
private def preprocess(
sc: SparkContext,
documentPaths: String,
vocab: Array[String],
stopwordFile: String): (RDD[(Long, Vector)], Long) = {
val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
// One document per line in each text file.
println(s"convert to dataframe")
val srcDF = sc.textFile(documentPaths).repartition(800).map(_.split("\t"))
.map{line =>
var _text = line(2).split(",").mkString(" ")
User(line(0).toLong, line(1), _text)}.toDF()
//println(srcDF.show())
val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
val wordsData = tokenizer.transform(srcDF)
//println(wordsData.show())
println(s"stopWordsRemover")
val stopword_list = sc.textFile(stopwordFile).collect()
val stopWordsRemover = new StopWordsRemover()
.setInputCol("words")
.setOutputCol("tokens")
stopWordsRemover.setStopWords(stopword_list)
val removed_data = stopWordsRemover.transform(wordsData)
//println(removed_data.show())
println(s"countVectorizer fit")
val cvm = new CountVectorizerModel(vocab)
.setInputCol("tokens")
.setOutputCol("features")
val raw_documents = cvm.transform(removed_data)
println(raw_documents.show())
val documents = raw_documents.select("user_id", "features")
.repartition(800)
.map { case Row(user_id: Long, features: Vector) => (user_id, features) }
//.sortBy(_._1)
println(documents.toDF().show())
println(s"return vectors")
//println(documents.first())
(documents,
documents.map(_._2.numActives).sum().toLong
) // total token count
}
在这个过程中,由于使用RegexTokenizer/Tokenizer、StopWordsRemover等需要将rdd转换成DataFrame。具体的方法有两种,详细可见Interoperating with RDDs。这里采用的是第一种方法Inferring the Schema Using Reflection。
2.2 LDA训练模型
spark的machine learning (ML) library分两个packages:spark.ml和spark.mllib,两者主要的区别是spark.ml针对DataFrame提供API,而spark.mllib针对RDD提供API。在这里我们使用的是spark.mllib
It divides into two packages:
spark.mllib contains the original API built on top of RDDs.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.
LDA是一种无监督的学习方法,在训练前我们需要提前设置好一些参数:
LDA takes in a collection of documents as vectors of word counts and the following parameters (set using the builder pattern):
- k: Number of topics (i.e., cluster centers)
- optimizer: Optimizer to use for learning the LDA model, either EMLDAOptimizer or OnlineLDAOptimizer
- docConcentration: Dirichlet parameter for prior over documents’ distributions over topics. Larger values encourage smoother inferred distributions.
- topicConcentration: Dirichlet parameter for prior over topics’ distributions over terms (words). Larger values encourage smoother inferred distributions.
- maxIterations: Limit on the number of iterations.
- checkpointInterval: If using checkpointing (set in the Spark configuration), this parameter specifies the frequency with which checkpoints will be created. If maxIterations is large, using checkpointing can help reduce shuffle file sizes on disk and help with failure recovery.
由于online的优化方法比较耗客户端的内存,因此我们采用的是EMLDAOptimizer。
训练经历以下几个步骤:
- 设置参数
- 开始训练
- 输出话题,并保存模型
private case class Params(
input:String = "hdfs_path_to_corpus/part-00[0-2]*",
k: Int = 200,
maxIterations: Int = 100,
docConcentration: Double = -1,
topicConcentration: Double = -1,
vocabSize: Int = 140000,
stopwordFile: String = "",
checkpointDir: String = "hdfs_path_to_save_model/",
checkpointInterval: Int = 10)
def main(args:Array[String]) {
println(s"defaultParams")
val defaultParams = Params()
println(s"run")
run(defaultParams)
}
private def run(params: Params) {
println(s"create context")
val conf = new SparkConf().setAppName(s"LDA_zsm")
conf.set("spark.driver.maxResultSize", "4g")
conf.set("spark.default.parallelism","800")
conf.set("spark.ui.retainedJobs", "10")
conf.set("spark.ui.retainedStages", "10")
conf.set("spark.shuffle.consolidateFiles", "true")
val sc = new SparkContext(conf)
// Load documents, and prepare them for LDA.
val preprocessStart = System.nanoTime()
println(s"preprocess data")
val (corpus, vocabArray, actualNumTokens) =
preprocess(sc, params.input, params.vocabSize, params.stopwordFile)
println(s"get count num")
val actualCorpusSize = corpus.count()
val actualVocabSize = vocabArray.size
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
println()
println(s"Corpus summary:")
println(s"\t Training set size: $actualCorpusSize documents")
println(s"\t Vocabulary size: $actualVocabSize terms")
println(s"\t Preprocessing time: $preprocessElapsed sec")
println()
// Run LDA.
val lda = new LDA()
println(s"choose optimizer")
val optimizer = new EMLDAOptimizer
println(s"set params")
lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
.setCheckpointInterval(params.checkpointInterval)
if (params.checkpointDir.nonEmpty) {
sc.setCheckpointDir(params.checkpointDir)
}
val startTime = System.nanoTime()
println(s"training")
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9
println(s"Finished training LDA model. Summary:")
println(s"\t Training time: $elapsed sec")
// Print topics and save model
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 100)
val topics = topicIndices.map { case (terms, termWeights) =>
terms.zip(termWeights).map { case (term, weight) => (vocabArray(term.toInt), weight) }
}
println(s"${params.k} topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) =>
println(s"$term\t$weight")
}
println()
}
val voca_path = params.checkpointDir.concat("vocab")
println(s"save vocab model")
sc.parallelize(vocabArray).saveAsTextFile(voca_path)
val model_path = params.checkpointDir.concat("model")
println(s"save lda model")
ldaModel.save(sc,model_path)//LocalLDAModel
# load model
distLDAModel = DistributedLDAModel.load(sc, model_path)
val topicDistributionMatrix = distLDAModel.topicDistributions
val topicMatrix = distLDAModel.topicsMatrix
sc.stop()
}
相关说明:
- lda.run(corpus) # corpus类型为RDD[(doc_id, countVector)]
- ldaModel.describeTopics(maxTermsPerTopic = 100) # 每个话题都返回[(term_id, term_weight),...]
- distLDAModel.topicDistributions #文档的话题概率分布,返回(doc_id, vector)
- distLDAModel.topicsMatrix #话题的词概率分布, 返回matrix(W, K)
3. 求文档的相似性
在LDA模型最后可以得到文档的话题概率分布,从另一个角度看,话题概率分布可以看作文档的向量表示。因此,在此基础上,我们可以得到相似文档。
这里我们用cosine计算文档的相似性。
在计算过程中我们使用了ElementwiseProduct,用来做向量的对应相乘。
import org.apache.spark.mllib.feature.ElementwiseProduct
val distLDAModel = DistributedLDAModel.load(sc, model_path)
val topicDistributionMatrix = distLDAModel.topicDistributions.sortBy(_._1)
# sample some testdata
val nums: List[Int] = List(2304096, 14334195, 2110749, 114645, 16623381, 21556887, 5709227, 21360395, 6431072, 13329823, 2657615, 333413, 11031474)
val testSamples = topicDistributionMatrix.filter{ case(doc_id, vector) => nums.exists(_ == doc_id)}.collect()
for (_sample <- testSamples){
val testId = _sample._1
val testVector = _sample._2
val transformer = new ElementwiseProduct(testVector)
val similarities = topicDistributionMatrix
.map{ case(doc_id, vector) =>
val value = transformer.transform(vector).toArray.sum / (Vectors.norm(vector, 2) * Vectors.norm(testVector, 2))
(testId, doc_id, value)}
.sortBy(_._3, false)
.take(20).foreach(println)
}
在此说明一下,spark也提供了求新文档的话题概率分布的方法,如下:
val distLDAModel = DistributedLDAModel.load(sc, model_path)
val vector = distLDAModel.toLocal.topicDistributions(new_document) # new_document是用词频表示的向量
但是有个问题:这个方法用的是online VB的方法,而我们在训练过程中用的是MAP 的方法。虽然在理论上讲,最后都会收敛到最优值,但是实际上得到新文档的话题分布和训练集中的话题分布很不一样。
4. 计算新文档的话题分布
在求解新文档的话题分布之前,我们有必要了解一下训练的过程,spark中使用的是EMLDAOptimizer。虽然下面会涉及到公式,但是大家不用详细了解推导过程,只需知道变量的意义。
下面对变量做些解释:
-
表示词w在文档j中出现的次数
-
表示每个词在每个话题下的概率分布
-
表示每篇文档在每个话题下的概率分布
训练步骤如下:
初始化
,并计算
,
,
,
-
for i in maxIterations:
- 根据公式1计算
- 计算
,
,
,
- 根据公式2计算
,
- 根据公式1计算
(1)
(2)
完成训练过程后,我们可以得到:、
;另外,我们可以获取
、
、
。
接下来我们看看,如果求新文档的
,我们需要哪些数据?
当新文档j+1出现,可得到:
- 初始化
,计算
,
- 开始迭代:
i. 根据公式1以及训练中得到的
ii. 计算,
iii. 计算
主要代码:
private def docTopicDistribution(
docId: Long,
termCounts: Vector,
W: Int,
K: Int,
alpha: BDV[Double],
topicMatrixToArray: Array[Double]): DenseVector = {
val alpha1 = alpha :- 1.0
val Kalpha1 = alpha1 :* K.toDouble
//Initial the parameters
val randomSeed = docId
var N_j = BDV.zeros[Double](K)
var meanThetaChange = 1D
val gamma = BDM.zeros[Double](K, W)
var w = 0
while (w < W){
val random = new Random(randomSeed + w*K)
val gamma_w = normalize(BDV.fill[Double](K)(random.nextDouble()), 1.0)
gamma(::, w) := gamma_w
w += 1
}
var k = 0
while (k < K){
val gamma_k = gamma(k, ::).t
N_j(k) = gamma_k dot BDV(termCounts.toArray)
k += 1
}
val random_theta = new Random(randomSeed)
val theta = normalize(BDV.fill[Double](K)(random_theta.nextDouble()), 1.0)
var t = 0
while (t < 20 && meanThetaChange > 1e-7){
// E step, gamma
val _mu = N_j + alpha1
w = 0
while (w < W){
val beta_w = BDV(topicMatrixToArray.slice(w*K, (w+1)*K))
val gamma_w = beta_w :* _mu
val sum_gamma = sum(gamma_w)
gamma_w :/= sum_gamma
gamma(::, w) := gamma_w
w += 1
}
// N_kj, N_j
k = 0
while (k < K){
N_j(k) = gamma(k, ::).t dot BDV(termCounts.toArray)
k += 1
}
val sum_j = sum(N_j)
// M step, theta
val lasttheta = theta.copy
val sumKalpha1 = Kalpha1 :+ sum_j
theta := (N_j + alpha1) :/ sumKalpha1
meanThetaChange = sum(BNabs(theta - lasttheta)) / K.toDouble
println("======================================")
println(docId, t, meanThetaChange)
t += 1
}
//val inds = argtopk(theta, 20).toArray
//val values = inds.map(i => i.toString + "," + theta(i).toString)
//values.mkString("|") + "|||" + t.toString
Vectors.fromBreeze(theta).asInstanceOf[DenseVector]
}
参考文献
- On Smoothing and Inference for Topic Models
- Online Learning for Latent Dirichlet Allocation
- spark LDAOptimizer
- spark LDAModel
- DistributedLDAModel API