Spark基于ALS隐语义的离线推荐服务

1.scala读取参数文件(不写),Scala读写文本文件
2.参数调优+模型训练
3.Spark join/笛卡尔积 算子 + 模式匹配
4.Spark sql 缓存到mysql文件

参数调优

package org.cuit.recommand.offlineRecommand

import java.io.{BufferedWriter, File, FileInputStream, FileOutputStream, FileWriter, OutputStream, OutputStreamWriter}
import java.util.Properties

import breeze.numerics.sqrt
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.cuit.recommand.offlineRecommand.OfflineRecommend.Ratings

/**
 * @Author 
 * @Date 2022/3/28 15:10
 * @version 1.0
 * @注释 模型评估    为找到最优参数,采用计算均根方差     计算预测与实际评分的误差
 *
 */
object ALSParameterSelection {


  def ALSTrain(): Unit = {

    //读取配置文件
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))

    //获取相关数据表名字
    val RATING_TABLE = properties.getProperty("rating_table")


    //创建SparkConf

    val conf: SparkConf = new SparkConf().setAppName("ALSParameterSelection").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建SparkSession
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    //隐式转换
    import sparkSession.implicits._
    val MYSQL_URI = properties.getProperty("musql_url")

    val prop = new java.util.Properties
    val USER = properties.getProperty("jdbc_username")
    val PASSWORD = properties.getProperty("jdbc_password")
    //设置连接对象
    //拼接连接的url
    var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"

    prop.setProperty("user", USER)
    prop.setProperty("password", PASSWORD)
    //获取数据
    val ratingRdd = sparkSession.read
      //url /127.0.0.1:3306/test?user=root&password=root
      .option("url", url)
      //表名
      .option("dbtable", RATING_TABLE)
      .format("jdbc")
      .load()
      //转换为ratings类
      .as[Ratings]
      .rdd
      //格式转换
      .map(rating =>
        //去掉时间戳,转换为rdd形式 并持久化到内存
        //  ALS的输入数据  转换为Rating类   val user : scala.Int, val product : scala.Int,val rating : scala.Double
        Rating(rating.user_id, rating.movie_id, rating.rating))

    //数据集切分-  训练集和测试集
    val ratingArray: Array[RDD[Rating]] = ratingRdd.randomSplit(Array(0.8, 0.2))
    val trainDataRdd: RDD[Rating] = ratingArray(0)
    val testDataRdd: RDD[Rating] = ratingArray(1)

    //选择最优模型参数
    val bestParameter = parameterSelection(trainDataRdd, testDataRdd)
    //写入参数文件(rank, lambda, rmse)
    val str = s"${bestParameter._1},${bestParameter._2},${bestParameter._3}"
    val file = new File("src\\main\\java\\org\\cuit\\recommand\\offlineRecommand\\bestParameter.txt")
    val writer_name = new BufferedWriter(new FileWriter(file))
    writer_name.write(str)
    writer_name.close()

    sparkSession.close()
  }

  /**
   *  选择最佳参数
   * @param trainDataRdd    训练数据
   * @param testDataRdd    测试数据
   * @return    最佳参数  (rank, lambda, rmse)
   */
  def parameterSelection(trainDataRdd: RDD[Rating], testDataRdd: RDD[Rating]): (Int, Double, Double) = {

    /**
     * 定义结果集
     */
    //迭代次数
    val iteration = 5
    //特征维度
    val rankArray = Array(30,35,40,50,55)
    //正则化系数
    val lambdaArray = Array(0.01,0.001,0.075,0.05)

    //方法二   对系统内存有要求
    val result: Array[(Int, Double, Double)] = for (rank <- rankArray; lambda <- lambdaArray)
    //for 循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合
      yield{
        val model: MatrixFactorizationModel = ALS.train(trainDataRdd, rank, iteration, lambda)
        val rmse = computeRMSE(model, testDataRdd)

        (rank, lambda, rmse)
      }
    //获取最小的损失数
    println(result.minBy(_._3))
    result.minBy(_._3)

  }

  /**
   * 计算均方根误差
   *
   * @param model       训练得到的模型结果
   * @param testDataRdd 测试数据
   * @return rems误差值
   */
  def computeRMSE(model: MatrixFactorizationModel, testDataRdd: RDD[Rating]): Double = {

    //测试数据筛选 user_id,movie_id ,rating -> user_id,movie_id
    val testUser = testDataRdd.map(data => (data.user, data.product))
    //计算评分
    val predict: RDD[Rating] = model.predict(testUser)
    /**
     * 通过内连接预测的与实际的进行匹配 user_id  与   movie_id  作为key
     */
    //预测值
    val predictValue = predict.map(data => ((data.user, data.product), data.rating))

    //实际值
    //转换格式       ((user_id,movie_id) rating)
    val actualValue = testDataRdd.map(data => ((data.user, data.product), data.rating))
    // inner join  ((user_id,movie_id),(预测值,实际值))
    val err = predictValue.join(actualValue)
      .map {
        //(Int,Int),(Double,Double)
        case ((user_id, movie_Id), (predictVal, actualVal)) =>
          val errVal = (actualVal - predictVal)
          errVal * errVal
      } mean()
    //开平方
    val rmse = sqrt(err)
    rmse
  }


  def main(args: Array[String]): Unit = {
    ALSTrain()
  }
}

模型训练+预测

package org.cuit.recommand.offlineRecommand

import java.io.FileInputStream
import java.util.Properties

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.cuit.recommand.offlineStatistics.OfflineStatistics.{MovieTopN, storeResToMysql}
import org.jblas.DoubleMatrix

import scala.io.{BufferedSource, Source}

/**
 * @Author 
 * @Date 2022/3/28 15:10
 * @version 1.0
 * @注释
 */
object OfflineRecommend {

  //用户评价
  //1,31,2.5,1260759144
  /**
   *
   * @param user_id          用户编码
   * @param movie_id         imdb编码
   * @param rating           评级
   * @param rating_timestamp 时间戳
   */
  case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)



  /**
   * 用户的推荐列表
   *
   * @param user_id     用户id
   *
   * @param user_sim_info    电影id+推荐评分
   *
   */
  case class UserSim(user_id: Int, user_sim_info:String)


  //载入数据进行训练后结果入库
  def offlineRecommend(): Unit = {

    //读取配置文件
    val properties = new Properties()
    val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
    properties.load(new FileInputStream(path))

    //获取相关数据表名字
    val MOVIES_TABLE = properties.getProperty("movies_table")
    val RATING_TABLE = properties.getProperty("rating_table")
    val MOVIE_SIM_TABLE = properties.getProperty("movieSim_table")
    val USER_SIM_TABLE = properties.getProperty("userSim_table")


    val MYSQL_URI = properties.getProperty("musql_url")

    val prop = new java.util.Properties
    val USER = properties.getProperty("jdbc_username")
    val PASSWORD = properties.getProperty("jdbc_password")
    //设置连接对象
    //拼接连接的url
    var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"

    prop.setProperty("user", USER)
    prop.setProperty("password", PASSWORD)

    //创建SparkConf

    val conf: SparkConf = new SparkConf().setAppName("offlineRecommend").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建SparkSession
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    //隐式转换
    import sparkSession.implicits._

    //获取数据
    val ratingRdd = sparkSession.read
      //url /127.0.0.1:3306/test?user=root&password=root
      .option("url", url)
      //表名
      .option("dbtable", RATING_TABLE)
      .format("jdbc")
      .load()
      //转换为ratings类
      .as[Ratings]
      .rdd
      //格式转换
      .map(rating =>
        //去掉时间戳,转换为rdd形式 并持久化到内存
        //  ALS的输入数据  转换为Rating类   val user : scala.Int, val product : scala.Int,val rating : scala.Double
        (rating.user_id, rating.movie_id, rating.rating))

    //获取uid和mid   去重
    val uidRdd: RDD[Int] = ratingRdd.map(row => row._1).distinct()
    val midRdd: RDD[Int] = ratingRdd.map(row => row._2).distinct()

    /**
     * 模型训练
     */
    //rank   隐特征向量维度      iterations:交替最小二乘法 迭代次数     lambda:正则化系数      seed:随机种子
    val trainData = ratingRdd.map(x => Rating(x._1, x._2, x._3))
    //读取参数文件
    val source: BufferedSource = Source.fromFile("src\\main\\java\\org\\cuit\\recommand\\offlineRecommand\\bestParameter.txt")
    val parameter: Array[String] = source.getLines().toString().split(",")
    val (rank, iterations, lambda) = (parameter(0).toInt, 5, parameter(1).toDouble)

    val model = ALS.train(trainData, rank, iterations, lambda)

    //基于用户和电影的特征计算预测评分,得到每个用户的推荐列表
    //通过生成笛卡尔积得到空矩阵
    val userMovieIdRdd: RDD[(Int, Int)] = uidRdd.cartesian(midRdd)
    val predict: RDD[Rating] = model.predict(userMovieIdRdd)
    //对评分做过滤
    val userRecommendDf: DataFrame = predict
      //评分大于0
      .filter(_.rating > 0.1)
      //map转换为( () ) 方便后面聚合  product:movie_id   rating:预测的评分
      .map(ratingRes => (ratingRes.user, (ratingRes.product, ratingRes.rating)))
      .groupByKey()
      .map {
        //(user_id, (recs)) :   (157,(1234,3.663157894736842)) ......
        case (user_id, recs: Iterable[(Int, Double)]) =>
         StringBuffer buffer =  new StringBuffer()
          recs.toList.sortWith(_._2 > _._2).take(10).map((item: (Int, Double)) => {
            //mysql  存储  movie_id:average_score|movie_id:average_score...
            //item  (1580,3.663157894736842)\
          buffer.append(s"${item._1.toString}:${item._2.formatted("%.3f")}|")
         
          }
          )
          //userid        movie_id:推荐得分    。。。。。。
          //244            1073:7.408|1073:7.408509:6.825|1073:7.408509:6.825899:6.550
          UserSim(user_id, buffer.toString())
      }.toDF()

    //结果入库
    storeResToMysql(userRecommendDf,USER_SIM_TABLE)(prop,MYSQL_URI)

    sparkSession.stop()

  }

  def main(args: Array[String]): Unit = {
    offlineRecommend()
  }

  /**
   * 结果信息写到数据库
   *
   * @param dataFrame 缓存的数据集
   * @param tableName 结果表名
   * @param prop      数据库连接信息 user  password
   * @param mysqlURI  mysql连接url
   */
  def storeResToMysql(dataFrame: DataFrame, tableName: String)(implicit prop: Properties, mysqlURI: String): Unit = {
    dataFrame.write.mode("overwrite").jdbc(mysqlURI, tableName, prop)
  }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容