1.触发条件 接受到用户评分大于3.5时,写入日志 system.log
//获取用户的评分数据
//电影评星
@RequestMapping(value = "/getStar", method = RequestMethod.POST)
@ResponseBody
public String getStar(HttpServletRequest request) throws Exception {
HttpSession session = request.getSession();
int movie_id = Integer.parseInt(request.getParameter("movie_id"));
Double star = Double.parseDouble(request.getParameter("star"));
String username = (String) session.getAttribute("username");
//缓存到数据库(mysql和redis)
int userId = userService.getUserIdByUsername(username);
//获取当前时间戳
int time = (int)new Date().getTime();
ratingsService.updateRating(new Rating(userId,movie_id,star,time));
//当前评分大于3.5表示用户对当前评分的电影感兴趣就写入日志进行监听
if (star>=Constant.USERLIKESTAR){
logger.info(Constant.MOVIE_RATING_PREFIX+userId+"|"+movie_id+"|"+star);
}
return "success";
}
日志文件配置
log4j.rootLogger=INFO, file, stdout
# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=D:\\projects\\GraduationProgram\\src\\main\\resources\\logs\\system.log
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
2.Fflume监听日志,Flume配置在本地windows 将监听的日志发送到CentOS的Kafka中
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f D:/projects/GraduationProgram/src/main/resources/logs/system.log
a1.sources.r1.fileHeader = true
a1.sources.r1.deserializer.outputCharset=UTF-8
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = recommender
a1.sinks.k1.brokerList = 192.168.100.112:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Spark Streaming处理流程
package org.cuit.recommend.streamingRecommend
import java.io.FileInputStream
import java.util.{Calendar, Date, Properties, TimeZone, Timer}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.cuit.recommend.offlineRecommend.OfflineRecommend.Ratings
import org.cuit.recommend.offlineStatistics.OfflineStatistics.Ratings
import redis.clients.jedis.Jedis
/**
* @Author
* @Date 2022/3/28 16:39
* @version 1.0
* @注释
*/
//连接工具类
object ConnectUtil extends Serializable{
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
//获取相关数据表名字
val REDIS_HOST = properties.getProperty("redis_host")
lazy val jedis = new Jedis(REDIS_HOST, 6379)
}
object StreamingRecommend {
/**
* 电影相似推荐矩阵
*
* @param movie_id 电影id
* @param movie_sim_info 相似的电影id+ 相似得分
*/
case class MovieSim(movie_id: Int, movie_sim_info: String)
/**
* 用户的推荐列表
*
* @param user_id 用户id
*
* @param user_sim_info 电影id+推荐评分
*
*/
case class UserSim(user_id: Int, user_sim_info:String)
/**
* 用户实时推荐列表
* @param user_id 用户id
* @param streaming_recom_info 推荐列表
*/
case class StreamingRecommend(user_id: Int, streaming_recom_info:String)
//用户评价
//1,31,2.5,1260759144
/**
*
* @param user_id 用户编码
* @param movie_id imdb编码
* @param rating 评级
* @param rating_timestamp 时间戳
*/
case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)
case class MysqlConfig(driver:String,url:String,username:String,password:String,tableName:String)
//读取配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
properties.load(new FileInputStream(path))
//获取相关数据表名字
val MOVIES_TABLE = properties.getProperty("movies_table")
val RATING_TABLE = properties.getProperty("rating_table")
val MOVIE_SIM_TABLE = properties.getProperty("movieSim_table")
val USER_SIM_TABLE = properties.getProperty("userSim_table")
val STREAMING_RECOMMEND_TABLE = properties.getProperty("streaming_Recommend_table")
val MAX_RECENT_RATING_NUM = properties.getProperty("max_recent_rating_num")
val MAX_MOVIE_SIM_NUM =properties.getProperty("max_movie_sim_num")
val KAFKA_TOPIC =properties.getProperty("Kafka_topic")
val MYSQL_URI = properties.getProperty("musql_url")
val KAFKA_BOOTSTRAP_SERVERS = properties.getProperty("kafka_bootstrap_servers")
val KAFKA_GROUP_ID = properties.getProperty("kafka_group_id")
val prop = new java.util.Properties
val USER = properties.getProperty("jdbc_username")
val PASSWORD = properties.getProperty("jdbc_password")
val LOG_PARAMETER = properties.getProperty("log_parameter")
val DRIVER = properties.getProperty("driver")
val mysqlConfig: MysqlConfig = MysqlConfig(DRIVER, MYSQL_URI, USER, PASSWORD,STREAMING_RECOMMEND_TABLE)
def streamingRecommend():Unit={
//设置连接对象
//拼接连接的url
var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"
prop.setProperty("user", USER)
prop.setProperty("password", PASSWORD)
//创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("StreamingRecommend").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建SparkSession
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
//StreamingContext 2:批处理时间
val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
//隐式转换
import sparkSession.implicits._
//获取数据 由于数据规模庞大,性能要求,将其广播出去
//得到一个评分map 方便后面直接根据user_id key 获取已经评分的movie_id s
val ratingsMap: Map[Int, Array[Int]] = sparkSession.read
//url /127.0.0.1:3306/test?user=root&password=root
.option("url", url)
//表名
.option("dbtable", RATING_TABLE)
.format("jdbc")
.load()
//转换为ratings类
.map{
row=>(row.getAs[Int]("user_id"),row.getAs[Int]("movie_id"))
}.collect().groupBy(_._1).map{
//user_id (movie_id,Movie_id........)
row=>(row._1,(row._2.map(_._2)))
}
//获取数据
val movieSimMap: collection.Map[Int, Map[Int, Double]] = sparkSession.read
//url /127.0.0.1:3306/test?user=root&password=root
.option("url", url)
//表名
.option("dbtable", MOVIE_SIM_TABLE)
.format("jdbc")
.load()
//转换为ratings类
.as[MovieSim]
.rdd
//格式转换
.map{ movie =>{
//类型变换 类型转换 + map(map()) movie_Id为key 方便后面根据两个movie_id查询
//movieRec: (movie_id,movie_id(String):score(String)| ....) ->
// (map(movie_id(key),List(movie_id(Int),score(Double)) ... ))
// -> (map(movie_id(key),List(map(movie_id(Int)(key),score(Double))) ... ))
//(movie_id,(array)) split:做切分得到数组之后对每个元素进行处理 movie_sim_info:|5319 : 0.742| 27803:0.661| 6370:0.648| 40583:0.643|
(movie.movie_id,movie.movie_sim_info.split("\\|").map{
x=>{ //|5319 : 0.742
val info = x.split(":")
(info(0).toInt,info(1).toDouble)
}
}.toMap)
}
}.collectAsMap()
val movieSimBroadCast: Broadcast[collection.Map[Int, Map[Int, Double]]] = sc.broadcast(movieSimMap)
//进行流式处理
//定义Kafka参数
val kafkaParams = Map[String, Object](
//连接参数 109.168.100。112:9092
"bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
//序列化
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> KAFKA_GROUP_ID,
"auto.offset.reset" -> "latest",
//自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//通过Kafka创建一个KafkaStream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(KAFKA_TOPIC), kafkaParams)
)
//日志数据 user_id|movie_id|rating|rating——timestamp 转换为评分流数据
val userDataStream: DStream[(Int, Int, Double, Int)] = kafkaStream.map {
//获取评分值
data => {
val user_data = data.value().split("RATING_PREFIX:")(1).split("\\|")
// user_id INT (6) not null,
// movie_id INT not null,
// rating FLOAT,
// rating_timestamp INT
(user_data(0).toInt, user_data(1).toInt, user_data(2).toDouble, new Date().getTime.toInt)
}
}
/**
*实时计算,根据得到的信息进行实时计算
*/
//对窗口获取到的一组用户数据进行遍历
userDataStream.foreachRDD {
//对每一条数据进行模式匹配
userData =>userData.foreach{
case (user_id:Int, movie_id:Int, rating:Double, rating_timestamp:Int) => {
println(s"data --::--- user_id = ${user_id},movie_id = ${movie_id},rating=${rating},rating_timestamp=${rating_timestamp}")
//数据更新到redis和mysql
/**
* 流程
* 1.获取用户的k次评分 <-- redis movie_id,rating
* 2.根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表 movie_id......
* 3.计算电影的优先推荐等级
* 4.根据优先级获得推荐列表 movie_id,score
* 5.缓存到业务数据库mysql
*/
//1.获取用户的k次评分 <-- redis
val recentRating: Array[(Int, Double)] = RecommendService.getUserRecentRatingFromRedis(MAX_RECENT_RATING_NUM.toInt, user_id, ConnectUtil.jedis)
//2根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表 movie_id......
//2.1筛选出用户已经评价过的电影
val userAlreadyRating = ratingsMap(user_id)
// val userAlreadyRating: Array[Int] = ratingsRdd.filter {
// rating => rating.user_id == user_id
// //return rdd[Ratings]
// }.map(_.movie_id).collect() //将筛选后的返回一个mivie_id列表
//2.2 根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表
val movieRecommendList: Array[Int] = RecommendService.getMovieSimListFromMysql(movie_id, userAlreadyRating, MAX_MOVIE_SIM_NUM.toInt, movieSimBroadCast.value)
//3,4.计算电影的优先推荐等级并得到列表 recentRating最近的评分 movieRecommendList:待推荐的电影列表 movieSimBroadCast:获取近期推荐电影的评分
val streamingRecs: Array[(Int, Double)] = RecommendService.calculateRecommendPriority(LOG_PARAMETER.toInt, recentRating, movieRecommendList, movieSimBroadCast.value)
//4.缓存到业务数据库mysql
// RecommendService.storeResToMysql(streamingRecs, user_id)(mysqlConfig)
RecommendService.storeResToRedis(streamingRecs, user_id,ConnectUtil.jedis)
}
}
}
//准备接受数据流
ssc.start()
//等待时间
ssc.awaitTermination()
}
def main(args: Array[String]): Unit = {
streamingRecommend()
}
}
处理函数
package org.cuit.recommend.streamingRecommend
import java.sql.{Connection, DriverManager, ResultSet}
import com.mysql.jdbc.PreparedStatement
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.cuit.recommend.offlineStatistics.OfflineStatistics.Ratings
import org.cuit.recommend.streamingRecommend.StreamingRecommend.{MAX_MOVIE_SIM_NUM, MAX_RECENT_RATING_NUM, MysqlConfig}
import redis.clients.jedis.Jedis
import scala.collection.mutable.ArrayBuffer
/**
* @Author 高拴梁
* @Date 2022/4/25 16:13
* @version 1.0
* @注释 实时推荐业务的具体实现
* * 1.获取用户的k次评分 <-- redis movie_id,rating
* * 2.根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表 movie_id......
* * 3.计算电影的优先推荐等级
* * 4.根据优先级获得推荐列表 movie_id,score
* * 5.缓存到业务数据库mysql
*/
object RecommendService {
/**
*
* @param recentRatingNum 实时推荐选择的最近k次评分
* @param user_id 用户id
* @param jedis redis连接客户端
* @return 最近评分结果(movie_id,rating)
*/
def getUserRecentRatingFromRedis(recentRatingNum:Int,user_id:Int,jedis:Jedis):Array[(Int,Double)]={
jedis.auth("123456")
//jedis.lrange 返回Java的Array类型,需要转换才能使用Scala的map
import scala.collection.JavaConversions._
//redis获取数据 key:user_id value: movie_id:rating "1900:4.5"....
// 返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定
jedis.lrange("user_id:"+user_id,0,recentRatingNum).map{
x=> {
val userData = x.split(":")
(userData(0).toInt,userData(0).toDouble)
}
}.toArray
}
/**
* 从电影相似度矩阵获取最相似的n个电影 (不包括用户已经浏览评分过的电影)
* @param movie_id 当前评分的电影id
* @param alreadyRating 用户已经评价过的电影
* @param movieSimNum 获取的电影数量
* @param movieSim 电影相似度矩阵
* @return 最相似的电影id Array(1,2,3,4.....)
*/
def getMovieSimListFromMysql(movie_id:Int
,alreadyRating:Array[Int]
,movieSimNum:Int
,movieSim:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Array[Int]={
/**
* 考虑到推荐结果不应该包含用户评分过的电影
* 流程
* 1.根据movie_id从movieSimBroadCast获取最相似的n个电影
* 2.过滤掉movie_id相同的电影
*/
//movieSim Map[Int(movie_id), Map[Int(movie_id), Double(sim_score)]] =>Array(Int(movie_id), Double(sim_score))
val simMovies = movieSim(movie_id).toArray
//2.过滤掉看过的电影 排序取
simMovies.filter(movie=> !alreadyRating.contains(movie._1))
.sortWith(_._2>_._2)
.take(movieSimNum)
//获得电影列表
.map(_._1)
}
/**
* 计算实时推荐的电影的优先级并返回优先级结果
* @param logParameter 求log的底数
* @param recentRating 最近评分结果(movie_id,rating)
* @param movieRecommendList 最相似的电影id Array(1,2,3,4.....)
* @param movieSimBroadCast 电影相似度矩阵
* @return 实施推荐结果列表 Array((movie_id,score)))
*/
def calculateRecommendPriority(logParameter:Int
,recentRating: Array[(Int, Double)]
,movieRecommendList: Array[Int]
,movieSimBroadCast:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Array[(Int,Double)]={
//计算每个推荐电影(推荐电影来自于movieRecommendList 即电影相似度矩阵的topN)的推荐得分 Int, Double movie_id,score
val movieScore = new ArrayBuffer[(Int, Double)]()
//推荐优先级 incount Int ,Int:电影id,用户最近k次评分中大于某个值的个数 个数越多说明该电影越值得推荐
val inCountMap = scala.collection.mutable.HashMap[Int, Int]()
//不推荐优先级 decount 同理
val deCountMap = scala.collection.mutable.HashMap[Int, Int]()
//遍历待推荐电影和用户近期的k次评分电影 得到基础得分movieScore中的值
//((1,0.5),(1,0.6),(1,0.7)...) 1:待推荐的电影id 0.5:该待推荐电影与近期评分数据的推荐值
for (movieRecommend<-movieRecommendList;recentMovie<-recentRating){
//计算两者的相似度,给定两个mid从相似度矩阵中得到对应的值
val score = getMovieSimScoreFromMovieSimRecs(movieRecommend, recentMovie._1,movieSimBroadCast)
if (score>0.5){
//sim(q,r) * Rating r (movieRecommend,score * recentMovie._2)作为一个整体元组放入ArrayBuffer
movieScore += ((movieRecommend,score * recentMovie._2))
if (recentMovie._2>3){
//避免因未初始化而报错
inCountMap(movieRecommend) = inCountMap.getOrElse(movieRecommend,0)+1
}else deCountMap(movieRecommend) = deCountMap.getOrElse(movieRecommend,0)+1
}
}
//做聚合求最后的推荐值 movie_id聚合 ((1,0.5),(1,0.6),(1,0.7)...) 1:待推荐的电影id 0.5:该待推荐电影与近期评分数据的推荐值
movieScore.groupBy(_._1).map{
/**
* 推荐值 = sim(q,r) * Rating r 做聚合(按照movie_id)求均值 + lg max(incount,1) - lg max(decount,1)
*/
//模式匹配 返回(movie_id,推荐值) scoreMapList:groupBy后返回的 数据类型 Map(movie_id,ArrayBuffer(movie_id,score))
case (movie_id: Int,scoreArray) => (movie_id,scoreArray.map(_._2).sum / scoreArray.length
+ log(inCountMap.getOrElse(movie_id,1))-log(deCountMap.getOrElse(movie_id,1)))
}.toArray.sortWith(_._2>_._2)
}
/**
* 插入结果数据到mysql
* @param streamingRecs 该用户推荐的电影(movie_id,score),(),()
* @param user_id 用户id
* @param mysqlConfig mysql连接配置
*/
def storeResToMysql(streamingRecs:Array[(Int,Double)],user_id:Int)(implicit mysqlConfig: MysqlConfig):Unit={
val driver = mysqlConfig.driver
val url = mysqlConfig.url
val username = mysqlConfig.username
val password = mysqlConfig.password
var connection:Connection = null
//结果处理
val streamingRecommendinfo: String = streamingRecs.map {
case (movie_id: Int, score: Double) => s"${movie_id}:${score.formatted("%.3f")}|"
}.mkString("")
try {
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
//判断未更新的原始数据是否存在
val resultSet = statement.executeQuery( s"select * from ${mysqlConfig.tableName}")
while ( resultSet.next() ) {
val deleteSql = s"delete from ${mysqlConfig.tableName} where user_id = ${user_id}"
connection.prepareStatement(deleteSql).executeUpdate()
}
//如果没有就插入数据
val insertSQL="insert into "+mysqlConfig.tableName+" values("+user_id+",\""+streamingRecommendinfo+"\")"
connection.prepareStatement(insertSQL).executeUpdate()
} catch {
case e => e.printStackTrace
//case _: Throwable => println("ERROR")
}
connection.close()
}
/**
* 推荐结果放入Redis
* @param streamingRecs 实时推荐矩阵
* @param user_id 用户id
* @param jedis jedis配置连接
*/
def storeResToRedis(streamingRecs:Array[(Int,Double)],user_id:Int,jedis: Jedis):Unit={
//结果处理
val streamingRecommendinfo: String = streamingRecs.map {
case (movie_id: Int, score: Double) => s"${movie_id}:${score.formatted("%.3f")}|"
}.mkString("")
jedis.auth("123456")
jedis.hset("streaming",user_id.toString,streamingRecommendinfo)
}
/**
* 根据movieA和B的电影id来从movieSimBroadCast中取得两个电影的相似度 ,找不到取默认值
* @param movieA 电影A的id
* @param movieB 电影B的id
* @param movieSimBroadCast 电影相似度矩阵 Map[Int(MovieA_id), Map[Int(movieB_id), Double(相似度)]
* @return 相似度
*/
def getMovieSimScoreFromMovieSimRecs(movieA:Int,movieB:Int,movieSimBroadCast:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Double={
movieSimBroadCast.get(movieA) match {
case Some(movieSimmap) => movieSimmap.get(movieB) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
/**
*
* @param m 代求数
* @param logParameter 底数
* @return log结果
*/
def log(m:Int,logParameter:Int = 10):Double={
//对数的换底公式
math.log(m) / math.log(logParameter)
}
}