Spark实时离线电影推荐系统

1 项目介绍
2 涉及的技术
3 推荐流程图
4 收获
5 问题

1 项目介绍

  1. 使用Spark框架实现电影推荐系统;
  2. 运用数据挖掘的算法产生模型,为用户精准推荐喜好的电影;
  3. 分别通过离线和实时两种方式实现电影推荐系统;

2 涉及技术

  1. Spark:基于内存的分布式计算框架

  2. Hadoop:分布式离线计算框架

  3. Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行

  4. Kafka:分布式高并发消息队列,负责缓存Flume采集的数据并为下游的各种计算提供高并发的数据处理

  5. Hbase:亿级行百万列并可毫秒级查询的数据库,可快速查询我们的计算数据

  6. Phoenix:是构建在HBase上的SQL中间层,Phoenix查询引擎会将SQL查询转换为一个或者多个HBase Scan,并行执行以生成标准的JDBC结果集。

3 推荐流程图

image.png

解释如下:

  1. 加载HDFS数据,处理之后存储到Hive中;
  2. 离线推荐部分技术处理思路;
  • 从Hive中加载训练数据和测试数据
  • 使用SparkMLlib的ALS交替最小二乘法训练模型
  • 使用模型产生推荐结果
  • 将推荐结果写入到Mysql、Hive、Phoenix+Hbase中
  1. 实时推荐部分技术处理思路;
  • 从Hive中拿出数据
  • 取出测试数据集中数据,send到Kafka中。
  • 通过SparkStreaming主动Kafka消息队列获取数据,并根据用户是否为新用户制定推荐策略
  • 新用户,从训练数据集中取出浏览人数最多的电影的前5部作为推荐结果
  • 老用户,使用推荐模型为用户推荐5部电影
image.png

4 收获

1 大数据环境搭建

(1)单机版Hadoop、Spark、Hive、Mysql的搭建
Spark处理HDFS数据,并将结果存储在Hive中
配置一台Hive + Mysql元数据库

2 数据初始预处理

object ETL {
  def main(args: Array[String]): Unit = {
    val localClusterURL = "local[2]"
    val clusterMasterURL = "spark://s1:7077"
    val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val hc = new HiveContext(sc)
    import sqlContext.implicits._
    hc.sql("use moive_recommend")

    // 设置RDD的partition的数量一般以集群分配给应用的CPU核数的整数倍为宜。
    val minPartitions = 8
    // 通过case class来定义Links的数据结构,数据的schema,适用于schama已知的数据
    // 也可以通过StructType的方式,适用于schema未知的数据,具体参考文档:
    //http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
    val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()

    val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()

    val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()

    val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
      .map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()


    links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
    hc.sql("drop table if exists links")
    hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")


    movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
    hc.sql("drop table if exists movies")
    hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")


    ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
    hc.sql("drop table if exists ratings")
    hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")


    tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
    hc.sql("drop table if exists tags")
    hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
    hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
  }

  // tags中大部分数据格式如下:
  //    4208,260,Action-packed,1438012536
  // 但会出现如下的数据:
  //    4208,260,"Family,Action-packed",1438012562
  // 这样对数据split后插入hive中就会出错,需清洗数据:
  //    4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
  private def rebuild(input:String):String = {
    val a = input.split(",")
    val head = a.take(2).mkString(",")//提取列表的前2个元素
    val tail = a.takeRight(1).mkString//提取列表的最后1个元素
    val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
    val output = head + "," + b + "," + tail
    output
  }
}

3 Hive的使用

配置一台Hive + Mysql元数据库

4 SparkMLlib机器学习算法库的使用

/**
  * KafkaProducer从测试数据集中取出数据
  */
object Spark_MovieTraining extends AppConf {
  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    // 训练集,总数据集的60%
    val trainingData = hc.sql("select * from trainingData")
    val ratingRDD = hc.sql("select * from trainingData")
      .rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))

    // Build the recommendation model using ALS
    val rank = 10
    val numIterations = 10
    val model = ALS.train(ratingRDD, rank, numIterations, 0.01)

    // Evaluate the model on rating data
    val training = ratingRDD.map {
      case Rating(userid, movieid, rating) => (userid, movieid)
    }

    ratingRDD.persist()
    training.persist()

    val predictions =
      model.predict(training).map {
        case Rating(userid, movieid, rating) => ((userid, movieid), rating)
      }

    val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
      ((userid, movieid), rating)
    }.join(predictions)


    val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
      val err = (r1 - r2)
      err * err
    }.mean()

    println(s"Mean Squared Error = $MSE")

    // Save and load model
    model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
    //val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
  }
}

5 实时推荐部分Kafka + Streaming + Phoenix+Hbase流处理

object KafkaProducer extends AppConf {

  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    val testDF = hc.sql("select * from testData limit 10000")
    val prop = new Properties()
    // 指定kafka的 ip地址:端口号
    prop.put("bootstrap.servers", "s1:9092")
    // 设定ProducerRecord发送的key值为String类型
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 设定ProducerRecord发送的value值为String类型
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val topic = "movie"
    val testData = testDF.map(
      x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
    )
    val producer = new KafkaProducer[String, String](prop)
    // 如果服务器内存不够,会出现OOM错误
    val messages = testData.toLocalIterator
    while (messages.hasNext) {
      val message = messages.next()
      val record = new ProducerRecord[String, String](topic, message._1, message._2)
      println(record)
      producer.send(record)
      // 延迟10毫秒
      Thread.sleep(10)
    }
    producer.close()
  }
}
/**
  * 接收kafka产生的数据,进行处理
  */
object SparkDirectStream {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
    // Duration对象中封装了时间的一个对象,它的单位是ms.
    val batchDuration = new Duration(5000)
    // batchDuration为时间间隔
    val ssc = new StreamingContext(conf, batchDuration)
    val hc = new HiveContext(ssc.sparkContext)

    // 训练数据中是否有该用户
    val validusers = hc.sql("select * from trainingData")
    val userlist = validusers.select("userId")

    val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
    val broker = "s1:9092"

    // val topics = "movie".split(",").toSet
    val topics = Set("movie")
    // val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
    val kafkaParams = Map("metadata.broker.list" -> "s1:9092")

    def exist(u: Int): Boolean = {
      val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
      userlist.contains(u)
    }

    // 为没有登录的用户推荐电影的策略:
    // 1.推荐观看人数较多的电影,采用这种策略
    // 2.推荐最新的电影
    val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator

    // 创建SparkStreaming接收kafka消息队列数据的2种方式
    // 一种是Direct approache,通过SparkStreaming自己主动去Kafka消息队
    // 列中查询还没有接收进来的数据,并把他们拿到sparkstreaming中。
    val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)

    val messages = kafkaDirectStream.foreachRDD { rdd =>
//      println(rdd)
      val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
      val validusers = userrdd.filter(user => exist(user))
      val newusers = userrdd.filter(user => !exist(user))
      // 采用迭代器的方式来避开对象不能序列化的问题。
      // 通过对RDD中的每个元素实时产生推荐结果,将结果写入到redis,或者其他高速缓存中,来达到一定的实时性。
      // 2个流的处理分成2个sparkstreaming的应用来处理。
      val validusersIter = validusers.toLocalIterator
      val newusersIter = newusers.toLocalIterator
      while (validusersIter.hasNext) {
        val recresult = model.recommendProducts(validusersIter.next, 5)
        println("below movies are recommended for you :")
        println(recresult)
      }
      while (newusersIter.hasNext) {
        println("below movies are recommended for you :")
        for (i <- defaultrecresult) {
          println(i.getString(0))
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

5 问题

依旧不熟悉scala语言,在使用Spark时很多东西依旧不知道

参考文献

http://www.dajiangtai.com/course/56.do


end

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354