1.scala读取参数文件(不写),Scala读写文本文件
2.参数调优+模型训练
3.Spark join/笛卡尔积 算子 + 模式匹配
4.Spark sql 缓存到mysql文件
参数调优
package org.cuit.recommand.offlineRecommand
import java.io.{BufferedWriter, File, FileInputStream, FileOutputStream, FileWriter, OutputStream, OutputStreamWriter}
import java.util.Properties
import breeze.numerics.sqrt
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.cuit.recommand.offlineRecommand.OfflineRecommend.Ratings
/**
* @Author
* @Date 2022/3/28 15:10
* @version 1.0
* @注释 模型评估 为找到最优参数,采用计算均根方差 计算预测与实际评分的误差
*
*/
object ALSParameterSelection {
def ALSTrain(): Unit = {
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
//获取相关数据表名字
val RATING_TABLE = properties.getProperty("rating_table")
//创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("ALSParameterSelection").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建SparkSession
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//隐式转换
import sparkSession.implicits._
val MYSQL_URI = properties.getProperty("musql_url")
val prop = new java.util.Properties
val USER = properties.getProperty("jdbc_username")
val PASSWORD = properties.getProperty("jdbc_password")
//设置连接对象
//拼接连接的url
var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"
prop.setProperty("user", USER)
prop.setProperty("password", PASSWORD)
//获取数据
val ratingRdd = sparkSession.read
//url /127.0.0.1:3306/test?user=root&password=root
.option("url", url)
//表名
.option("dbtable", RATING_TABLE)
.format("jdbc")
.load()
//转换为ratings类
.as[Ratings]
.rdd
//格式转换
.map(rating =>
//去掉时间戳,转换为rdd形式 并持久化到内存
// ALS的输入数据 转换为Rating类 val user : scala.Int, val product : scala.Int,val rating : scala.Double
Rating(rating.user_id, rating.movie_id, rating.rating))
//数据集切分- 训练集和测试集
val ratingArray: Array[RDD[Rating]] = ratingRdd.randomSplit(Array(0.8, 0.2))
val trainDataRdd: RDD[Rating] = ratingArray(0)
val testDataRdd: RDD[Rating] = ratingArray(1)
//选择最优模型参数
val bestParameter = parameterSelection(trainDataRdd, testDataRdd)
//写入参数文件(rank, lambda, rmse)
val str = s"${bestParameter._1},${bestParameter._2},${bestParameter._3}"
val file = new File("src\\main\\java\\org\\cuit\\recommand\\offlineRecommand\\bestParameter.txt")
val writer_name = new BufferedWriter(new FileWriter(file))
writer_name.write(str)
writer_name.close()
sparkSession.close()
}
/**
* 选择最佳参数
* @param trainDataRdd 训练数据
* @param testDataRdd 测试数据
* @return 最佳参数 (rank, lambda, rmse)
*/
def parameterSelection(trainDataRdd: RDD[Rating], testDataRdd: RDD[Rating]): (Int, Double, Double) = {
/**
* 定义结果集
*/
//迭代次数
val iteration = 5
//特征维度
val rankArray = Array(30,35,40,50,55)
//正则化系数
val lambdaArray = Array(0.01,0.001,0.075,0.05)
//方法二 对系统内存有要求
val result: Array[(Int, Double, Double)] = for (rank <- rankArray; lambda <- lambdaArray)
//for 循环中的 yield 会把当前的元素记下来,保存在集合中,循环结束后将返回该集合
yield{
val model: MatrixFactorizationModel = ALS.train(trainDataRdd, rank, iteration, lambda)
val rmse = computeRMSE(model, testDataRdd)
(rank, lambda, rmse)
}
//获取最小的损失数
println(result.minBy(_._3))
result.minBy(_._3)
}
/**
* 计算均方根误差
*
* @param model 训练得到的模型结果
* @param testDataRdd 测试数据
* @return rems误差值
*/
def computeRMSE(model: MatrixFactorizationModel, testDataRdd: RDD[Rating]): Double = {
//测试数据筛选 user_id,movie_id ,rating -> user_id,movie_id
val testUser = testDataRdd.map(data => (data.user, data.product))
//计算评分
val predict: RDD[Rating] = model.predict(testUser)
/**
* 通过内连接预测的与实际的进行匹配 user_id 与 movie_id 作为key
*/
//预测值
val predictValue = predict.map(data => ((data.user, data.product), data.rating))
//实际值
//转换格式 ((user_id,movie_id) rating)
val actualValue = testDataRdd.map(data => ((data.user, data.product), data.rating))
// inner join ((user_id,movie_id),(预测值,实际值))
val err = predictValue.join(actualValue)
.map {
//(Int,Int),(Double,Double)
case ((user_id, movie_Id), (predictVal, actualVal)) =>
val errVal = (actualVal - predictVal)
errVal * errVal
} mean()
//开平方
val rmse = sqrt(err)
rmse
}
def main(args: Array[String]): Unit = {
ALSTrain()
}
}
模型训练+预测
package org.cuit.recommand.offlineRecommand
import java.io.FileInputStream
import java.util.Properties
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.cuit.recommand.offlineStatistics.OfflineStatistics.{MovieTopN, storeResToMysql}
import org.jblas.DoubleMatrix
import scala.io.{BufferedSource, Source}
/**
* @Author
* @Date 2022/3/28 15:10
* @version 1.0
* @注释
*/
object OfflineRecommend {
//用户评价
//1,31,2.5,1260759144
/**
*
* @param user_id 用户编码
* @param movie_id imdb编码
* @param rating 评级
* @param rating_timestamp 时间戳
*/
case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)
/**
* 用户的推荐列表
*
* @param user_id 用户id
*
* @param user_sim_info 电影id+推荐评分
*
*/
case class UserSim(user_id: Int, user_sim_info:String)
//载入数据进行训练后结果入库
def offlineRecommend(): Unit = {
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
//获取相关数据表名字
val MOVIES_TABLE = properties.getProperty("movies_table")
val RATING_TABLE = properties.getProperty("rating_table")
val MOVIE_SIM_TABLE = properties.getProperty("movieSim_table")
val USER_SIM_TABLE = properties.getProperty("userSim_table")
val MYSQL_URI = properties.getProperty("musql_url")
val prop = new java.util.Properties
val USER = properties.getProperty("jdbc_username")
val PASSWORD = properties.getProperty("jdbc_password")
//设置连接对象
//拼接连接的url
var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"
prop.setProperty("user", USER)
prop.setProperty("password", PASSWORD)
//创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("offlineRecommend").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建SparkSession
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//隐式转换
import sparkSession.implicits._
//获取数据
val ratingRdd = sparkSession.read
//url /127.0.0.1:3306/test?user=root&password=root
.option("url", url)
//表名
.option("dbtable", RATING_TABLE)
.format("jdbc")
.load()
//转换为ratings类
.as[Ratings]
.rdd
//格式转换
.map(rating =>
//去掉时间戳,转换为rdd形式 并持久化到内存
// ALS的输入数据 转换为Rating类 val user : scala.Int, val product : scala.Int,val rating : scala.Double
(rating.user_id, rating.movie_id, rating.rating))
//获取uid和mid 去重
val uidRdd: RDD[Int] = ratingRdd.map(row => row._1).distinct()
val midRdd: RDD[Int] = ratingRdd.map(row => row._2).distinct()
/**
* 模型训练
*/
//rank 隐特征向量维度 iterations:交替最小二乘法 迭代次数 lambda:正则化系数 seed:随机种子
val trainData = ratingRdd.map(x => Rating(x._1, x._2, x._3))
//读取参数文件
val source: BufferedSource = Source.fromFile("src\\main\\java\\org\\cuit\\recommand\\offlineRecommand\\bestParameter.txt")
val parameter: Array[String] = source.getLines().toString().split(",")
val (rank, iterations, lambda) = (parameter(0).toInt, 5, parameter(1).toDouble)
val model = ALS.train(trainData, rank, iterations, lambda)
//基于用户和电影的特征计算预测评分,得到每个用户的推荐列表
//通过生成笛卡尔积得到空矩阵
val userMovieIdRdd: RDD[(Int, Int)] = uidRdd.cartesian(midRdd)
val predict: RDD[Rating] = model.predict(userMovieIdRdd)
//对评分做过滤
val userRecommendDf: DataFrame = predict
//评分大于0
.filter(_.rating > 0.1)
//map转换为( () ) 方便后面聚合 product:movie_id rating:预测的评分
.map(ratingRes => (ratingRes.user, (ratingRes.product, ratingRes.rating)))
.groupByKey()
.map {
//(user_id, (recs)) : (157,(1234,3.663157894736842)) ......
case (user_id, recs: Iterable[(Int, Double)]) =>
StringBuffer buffer = new StringBuffer()
recs.toList.sortWith(_._2 > _._2).take(10).map((item: (Int, Double)) => {
//mysql 存储 movie_id:average_score|movie_id:average_score...
//item (1580,3.663157894736842)\
buffer.append(s"${item._1.toString}:${item._2.formatted("%.3f")}|")
}
)
//userid movie_id:推荐得分 。。。。。。
//244 1073:7.408|1073:7.408509:6.825|1073:7.408509:6.825899:6.550
UserSim(user_id, buffer.toString())
}.toDF()
//结果入库
storeResToMysql(userRecommendDf,USER_SIM_TABLE)(prop,MYSQL_URI)
sparkSession.stop()
}
def main(args: Array[String]): Unit = {
offlineRecommend()
}
/**
* 结果信息写到数据库
*
* @param dataFrame 缓存的数据集
* @param tableName 结果表名
* @param prop 数据库连接信息 user password
* @param mysqlURI mysql连接url
*/
def storeResToMysql(dataFrame: DataFrame, tableName: String)(implicit prop: Properties, mysqlURI: String): Unit = {
dataFrame.write.mode("overwrite").jdbc(mysqlURI, tableName, prop)
}
}