使用Spark DataFrame实现基于物品的协同过滤算法(ItemCF)

简书不支持Markdown Math语法,请移步https://glassywing.github.io/2018/04/10/spark-itemcf/

简介

当前spark支持的协同过滤算法只有ALS(基于模型的协同过滤算法),但ALS算法对于某些特定的问题,效果并不理想,不像mahout提供了各种推荐算法。为了享受到spark在速度上带来的提升同时为满足一些业务需求,遂使用spark构建ItemCF算法。同时spark提供了新的DataFrame数据类型,使算法开发更加清晰和易于实现,

前提

常用相似度计算公式

协同过滤算法中最重要的部分是要计算物品间的相似度,对于不同的场景,可以应用不同的相似度计算公式来计算相似度,常用的相似度计算公式如下所示:

同现相似度(Co Occurrence)

同现相似度公式

$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{|N(x)|} $$

公式中分母是喜欢物品x的用户数,而分子则是同时对物品x和物品y感兴趣的用户数。因此,上述公式可用理解为对物品x感兴趣的用户有多大概率也对y感兴趣 (和关联规则类似)

但上述的公式存在一个问题,如果物品y是热门物品,有很多人都喜欢,则会导致W(x, y)很大,接近于1。因此会造成任何物品都和热门物品交有很大的相似度。为此我们用如下公式进行修正:

改进的同现相似度公式

$$ w(x,y)=\frac{|N(x)\cap{N(y)}|}{\sqrt{|N(x)||N(y)|}} $$

这个格式惩罚了物品y的权重,因此减轻了热门物品和很多物品相似的可能性。(也归一化了)

欧几里得相似度(Eucledian Similarity)

欧几里得相似度根据欧几里得距离计算而来,距离越近相似度越高,反之相反。

欧几里得距离定义

在数学中,欧几里得距离或欧几里得度量是欧几里得空间中两点间“普通”(即直线)距离。使用这个距离,欧氏空间成为度量空间。相关联的范数称为欧几里得范数。较早的文献称之为毕达哥拉斯度量。

欧几里得距离公式

$$ \ d_{X,Y}=\sqrt{ \sum_{i=1}n(x_i-y_i)2} $$

皮尔逊相似度

皮尔逊相关系数,即概率论中的相关系数,取值范围[-1,+1]。当大于零时,两个变量正相关,当小于零时表示两个向量负相关。

皮尔逊积矩相关系数定义

两个变量之间的皮尔逊相关系数定义为两个变量之间的协方差和标准差的商:

皮尔逊积矩相关系数公式

$$ \rho_{X,Y}=\frac{cov(X,Y)}{\sigma_{x}\sigma_{y}}=\frac{E((X-\mu_x)(Y-\mu_y))}{\sigma_{x}\sigma_{y}}=\frac{E(XY)-E(X)E(Y)}{\sqrt{E(X2)-E2(X)}\sqrt{E(Y2)-E2(Y)}} $$

余弦相似度(Cosine Similarity)

利用多维空间两点与所设定的点形成夹角的余弦值范围为[-1,1],值越大,说明夹角越大,两点相距就越远,相似度就越小。

向量间余弦定义

多维空间两点与所设定的点形成夹角的余弦值

余弦计算公式

$$ sim_{X,Y}=\frac{XY}{||X||||Y||}=\frac{ \sum_{i=1}n(x_iy_i)}{\sqrt{\sum_{i=1}n(x_i)2}*\sqrt{\sum_{i=1}n(y_i)^2}} $$

公式中$ x_i $表示第i个用户对物品x的评分,$ y_i $同理。
该公式只考虑到了用户的评分,很可能评分较高的物品会排在前面而不管物品的其它信息,改进版的余弦相似度计算公式如下:

改进的余弦相似度计算公式

$$ sim_{X,Y}=\frac{XYnum_{X\cap{Y}}}{||X||||Y||num_{X}log10(10+num_{Y})} $$

改进公式考虑到了两个向量相同个体个数、X向量大小、Y向量大小,注意:
$$ \ sim_{X,Y}\neq sim_{Y,X} $$

Tanimoto 相似度(Jaccard 系数)

Tanimoto相似度也称为Jaccard系数,是Cosine相似度扩展,多用于文档相似度就算。此相似度不考虑评价值,只考虑两个集合共同个体数量。

Jaccard 系数公式

$$ sim(x,y)=\frac{X\cap{Y}}{||X||+||Y||-||X\cap{Y}||} $$

预测用户评分公式

$$ pred_{u,p}=\frac{\sum_{i\in{ratedItems(u)}}{sim(i,p)r_{u,i}}}{\sum_{i\in{ratedItems(u)}}{sim(i,p)}} $$

公式中u指用户,p值物品,ratedItems(u)指用户u评价过的物品,sim指相似度(item之间的),r指用户对物品评分。

构建ItemCFModel

类定义

//  物品信息
case class Item(itemId: Int, itemName: String)

//  用户-物品-评分
case class Rating(userId: Int, itemId: Int, rating: Float)

//  用户信息
case class User(userId: Int, userName: String)

相似度度量

/**
  * SIMILARITY MEASURES
  */
object SimilarityMeasures {


  /**
    * The Co-occurrence similarity between two vectors A, B is
    * |N(i) ∩ N(j)| / sqrt(|N(i)||N(j)|)
    */
  def cooccurrence(numOfRatersForAAndB: Long, numOfRatersForA: Long, numOfRatersForB: Long): Double = {
    numOfRatersForAAndB / math.sqrt(numOfRatersForA * numOfRatersForB)
  }

  /**
    * The correlation between two vectors A, B is
    * cov(A, B) / (stdDev(A) * stdDev(B))
    *
    * This is equivalent to
    * [n * dotProduct(A, B) - sum(A) * sum(B)] /
    * sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
    */
  def correlation(size: Double, dotProduct: Double, ratingSum: Double,
                  rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double): Double = {

    val numerator = size * dotProduct - ratingSum * rating2Sum
    val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
      scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)

    numerator / denominator
  }

  /**
    * Regularize correlation by adding virtual pseudocounts over a prior:
    * RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
    * where w = # actualPairs / (# actualPairs + # virtualPairs).
    */
  def regularizedCorrelation(size: Double, dotProduct: Double, ratingSum: Double,
                             rating2Sum: Double, ratingNormSq: Double, rating2NormSq: Double,
                             virtualCount: Double, priorCorrelation: Double): Double = {

    val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
    val w = size / (size + virtualCount)

    w * unregularizedCorrelation + (1 - w) * priorCorrelation
  }

  /**
    * The cosine similarity between two vectors A, B is
    * dotProduct(A, B) / (norm(A) * norm(B))
    */
  def cosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double): Double = {
    dotProduct / (ratingNorm * rating2Norm)
  }

  /**
    * The improved cosine similarity between two vectors A, B is
    * dotProduct(A, B) * num(A ∩ B) / (norm(A) * norm(B) * num(A) * log10(10 + num(B)))
    */
  def improvedCosineSimilarity(dotProduct: Double, ratingNorm: Double, rating2Norm: Double
                               , numAjoinB: Long, numA: Long, numB: Long): Double = {
    dotProduct * numAjoinB / (ratingNorm * rating2Norm * numA * math.log10(10 + numB))
  }

  /**
    * The Jaccard Similarity between two sets A, B is
    * |Intersection(A, B)| / |Union(A, B)|
    */
  def jaccardSimilarity(usersInCommon: Double, totalUsers1: Double, totalUsers2: Double): Double = {
    val union = totalUsers1 + totalUsers2 - usersInCommon
    usersInCommon / union
  }
}

计算物品相似度

def fit(ratings: Dataset[Rating]): ItemCFModel = {
    this.ratings = Option(ratings)
    val numRatersPerItem = ratings.groupBy("itemId").count().alias("nor")
      .coalesce(defaultParallelism)

    // 在原记录基础上加上item的打分者的数量
    val ratingsWithSize = ratings.join(numRatersPerItem, "itemId")
      .coalesce(defaultParallelism)

    //  执行内联操作
    ratingsWithSize.join(ratingsWithSize, "userId")
      .toDF("userId", "item1", "rating1", "nor1", "item2", "rating2", "nor2")
      .selectExpr("userId"
        , "item1", "rating1", "nor1"
        , "item2", "rating2", "nor2"
        , "rating1 * rating2 as product"
        , "pow(rating1, 2) as rating1Pow"
        , "pow(rating2, 2) as rating2Pow")
      .coalesce(defaultParallelism)
      .createOrReplaceTempView("joined")


    //  计算必要的中间数据,注意此处有WHERE限定,只计算了一半的数据量
    val sparseMatrix = spark.sql(
      """
        |SELECT item1
        |, item2
        |, count(userId) as size
        |, sum(product) as dotProduct
        |, sum(rating1) as ratingSum1
        |, sum(rating2) as ratingSum2
        |, sum(rating1Pow)  as  ratingSumOfSq1
        |, sum(rating2Pow)  as  ratingSumOfSq2
        |, first(nor1)  as nor1
        |, first(nor2)  as nor2
        |FROM joined
        |WHERE item1 < item2
        |GROUP BY item1, item2
      """.stripMargin)
      .coalesce(defaultParallelism)
      .cache()

    //  计算物品相似度
    var sim = sparseMatrix.map(row => {
      val size = row.getAs[Long](2)
      val dotProduct = row.getAs[Double](3)
      val ratingSum1 = row.getAs[Double](4)
      val ratingSum2 = row.getAs[Double](5)
      val ratingSumOfSq1 = row.getAs[Double](6)
      val ratingSumOfSq2 = row.getAs[Double](7)
      val numRaters1 = row.getAs[Long](8)
      val numRaters2 = row.getAs[Long](9)

      val cooc = cooccurrence(size, numRaters1, numRaters2)
      val corr = correlation(size, dotProduct, ratingSum1, ratingSum2, ratingSumOfSq1, ratingSumOfSq2)
      val regCorr = regularizedCorrelation(size, dotProduct, ratingSum1, ratingSum2,
        ratingSumOfSq1, ratingSumOfSq2, PRIOR_COUNT, PRIOR_CORRELATION)
      val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingSumOfSq1), scala.math.sqrt(ratingSumOfSq2))
      val impCosSim = improvedCosineSimilarity(dotProduct, math.sqrt(ratingSumOfSq1), math.sqrt(ratingSum2), size, numRaters1, numRaters2)
      val jaccard = jaccardSimilarity(size, numRaters1, numRaters2)
      (row.getInt(0), row.getInt(1), cooc, corr, regCorr, cosSim, impCosSim, jaccard)
    }).toDF("itemId_01", "itemId_02", "cooc", "corr", "regCorr", "cosSim", "impCosSim", "jaccard")

    //  最终的物品相似度
    sim.withColumnRenamed("itemId_01", "itemId_02")
      .withColumnRenamed("itemId_02", "itemId_01")
      .union(sim)
      .repartition(defaultParallelism) //  重新分区,以便数据均匀分布,方便下游用户使用
      .cache()
    similarities = Option(sim)
    this
  }

用户推荐

/**
    * 为指定的用户推荐num个物品
    *
    * @param users 用户集
    * @param num   为每个用户推荐的物品数量
    * @return 推荐表
    */
  def recommendForUsers(users: Dataset[User], num: Int): DataFrame = {
    //  similarityMeasure为相似度算法名
    var sim = similarities.get.select("itemId_01", "itemId_02", similarityMeasure)
    //  获得评分表
    val rits = ratings.get

    val project: DataFrame = users
      .selectExpr("userId as user", "userName")
      //  进行子投影,此处左表数量远小于右表,执行左连接
      .join(rits, $"user" <=> rits("userId"), "left")
      .drop($"user")
      //  选择与用户相关的物品以及评分
      .select("userId", "itemId", "rating")

    // 获得用户感兴趣的物品与其它物品的相似度
    project.join(sim, $"itemId" <=> sim("itemId_01"))
      .selectExpr("userId"
        , "itemId_01 as relatedItem"
        , "itemId_02 as otherItem"
        , similarityMeasure
        , s"$similarityMeasure * rating as simProduct")
      .coalesce(defaultParallelism)
      .createOrReplaceTempView("tempTable")

    spark.sql(
      s"""
         |SELECT userId
         |,  otherItem
         |,  sum(simProduct) / sum($similarityMeasure) as rating
         |FROM tempTable
         |GROUP BY userId, otherItem
         |ORDER BY userId asc, rating desc
      """.stripMargin)
      //  过滤结果
      .rdd
      .map(row => (row.getInt(0), (row.getInt(1), row.getDouble(2))))
      .groupByKey()
      .mapValues(xs => {
        var sequence = Seq[(Int, Double)]()
        val iter = xs.iterator
        var count = 0
        while (iter.hasNext && count < num) {
          val rat = iter.next()
          if (rat._2 != Double.NaN)
            sequence :+= (rat._1, rat._2)
          count += 1
        }
        sequence
      })
      .toDF("userId", "recommended")
  }

相似度计算结果展示

数据来源

数据来自MovieLens,MovieLens数据集是一个关于电影评分的数据集,里面包含了从IMDB, The Movie DataBase上面得到的用户对电影的评分信息。

计算出的物品间相似度

以下展示了使用同现相似度,余弦相似度以及改进版进行相似度计算后(其它相似度请自行测试)的电影间的相似度,并以《星球大战(1977)》进行测试的结果(只显示了前20个结果)。

令人惊讶的是余弦相似度的结果似乎不太令人满意,这似乎是因为余弦相似度只和用户评分有关(更适用于推荐5星电影,不关心电影的类型等),也可能是我的算法出现了差错,欢迎指正。

同现相似度结果展示

movie1 movie2 coocurrence
星球大战(1977) 绝地归来(1983) 0.8828826458931883
星球大战(1977) 迷失方舟攻略(1981) 0.7679353753201742
星球大战(1977) 帝国反击,(1980) 0.7458505006229118
星球大战(1977) 教父,The(1972) 0.7275434127191666
星球大战(1977) 法戈(1996) 0.7239858668831711
星球大战(1977) 独立日(ID4)(1996) 0.723845113716724
星球大战(1977) 沉默的羔羊,The(1991) 0.7025515983155468
星球大战(1977) 印第安纳琼斯和最后的十字军东征(1989) 0.6920306174608959
星球大战(1977) 低俗小说(1994) 0.6885437675802282
星球大战(1977) 星际迷航:第一次接触(1996) 0.6850249237265413
星球大战(1977) 回到未来(1985) 0.6840536741086217
星球大战(1977) 逃亡者,The(1993) 0.6710463728397225
星球大战(1977) 摇滚,The(1996) 0.6646215466055597
星球大战(1977) 终结者,The(1984) 0.6636319257721421
星球大战(1977) 阿甘正传(1994) 0.6564951869930893
星球大战(1977) 终结者2:审判日(1991) 0.653467518885383
星球大战(1977) Princess Bride,The(1987) 0.6534487891771482
星球大战(1977) 异形(1979) 0.648232034779792
星球大战(1977) E.T。外星(1982) 0.6479990753086882
星球大战(1977) 巨蟒和圣杯(1974) 0.6476896799641126

余弦相似度结果展示

余弦相似度

movie1 movie2 cosSim
星球大战(1977) Infinity(1996) 1.0
星球大战(1977) Mostro,Il(1994) 1.0
星球大战(1977) Boys,Les(1997) 1.0
星球大战(1977) 陌生人,(1994) 1.0
星球大战(1977) 爱是一切(1996) 1.0
星球大战(1977) 巴黎是女人(1995) 1.0
星球大战(1977) 遇难者,A(1937) 1.0
星球大战(1977) 馅饼在天空(1995) 1.0
星球大战(1977) 世纪(1993) 1.0
星球大战(1977) 天使在我的肩膀(1946) 1.0
星球大战(1977) 这里来曲奇(1935) 1.0
星球大战(1977) 力量98(1995) 1.0
星球大战(1977) 滑稽女郎(1943) 1.0
星球大战(1977) 火山(1996) 1.0
星球大战(1977) 难忘的夏天(1994) 1.0
星球大战(1977) Innocents,The(1961) 1.0
星球大战(1977) Sleepover(1995) 1.0
星球大战(1977) 木星的妻子(1994) 1.0
星球大战(1977) 我的生活与时代与安东宁·阿托(En compagnie d'Antonin Artaud)(1993) 1.0
星球大战(1977) Bent(1997) 1.0

改进余弦相似度结果展示

改进余弦相似度

movie1 movie2 impCosSim
星球大战(1977) 绝地归来(1983) 0.6151374130038775
星球大战(1977) 失落方舟攻略(1981) 0.5139215764696529
星球大战(1977) 法戈(1996) 0.4978221397190352
星球大战(1977) 帝国反击,The(1980) 0.47719131109655355
星球大战(1977) 教父,The(1972) 0.4769568086870377
星球大战(1977) 沉默的羔羊,The(1991) 0.449096021012343
星球大战(1977) 独立日(ID4)(1996) 0.4334888029282058
星球大战(1977) 低俗小说(1994) 0.43054394420596026
星球大战(1977) 联系(1997) 0.4093441266211224
星球大战(1977) 印第安纳琼斯和最后的十字军东征(1989) 0.4080635382244593
星球大战(1977) 回到未来(1985) 0.4045977014813726
星球大战(1977) 星际迷航:第一次接触(1996) 0.40036290288050874
星球大战(1977) 逃亡者,The(1993) 0.3987919640908379
星球大战(1977) Princess Bride,The(1987) 0.39490206690864144
星球大战(1977) 摇滚,The(1996) 0.39100622194841916
星球大战(1977) 巨蟒与圣杯(1974) 0.3799595474408077
星球大战(1977) 终结者,The(1984) 0.37881311350029406
星球大战(1977) 阿甘正传(1994) 0.3755685058241706
星球大战(1977) 终结者2:审判日(1991) 0.37184317281514295
星球大战(1977) 杰瑞马奎尔(1996) 0.370478212770262
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容