Flume-Kafka-SparkStreaming实时推荐的实现

1.触发条件 接受到用户评分大于3.5时,写入日志 system.log

//获取用户的评分数据
    //电影评星
    @RequestMapping(value = "/getStar", method = RequestMethod.POST)
    @ResponseBody
    public String getStar(HttpServletRequest request) throws Exception {
        HttpSession session = request.getSession();
        int movie_id = Integer.parseInt(request.getParameter("movie_id"));
        Double star = Double.parseDouble(request.getParameter("star"));
        String username = (String) session.getAttribute("username");

        //缓存到数据库(mysql和redis)
        int userId = userService.getUserIdByUsername(username);
        //获取当前时间戳
        int time = (int)new Date().getTime();
        ratingsService.updateRating(new Rating(userId,movie_id,star,time));

        //当前评分大于3.5表示用户对当前评分的电影感兴趣就写入日志进行监听
        if (star>=Constant.USERLIKESTAR){
            logger.info(Constant.MOVIE_RATING_PREFIX+userId+"|"+movie_id+"|"+star);
        }
        return "success";

    }

日志文件配置

log4j.rootLogger=INFO, file, stdout

# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=D:\\projects\\GraduationProgram\\src\\main\\resources\\logs\\system.log
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n

2.Fflume监听日志,Flume配置在本地windows 将监听的日志发送到CentOS的Kafka中

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command =  tail -f D:/projects/GraduationProgram/src/main/resources/logs/system.log
a1.sources.r1.fileHeader = true
a1.sources.r1.deserializer.outputCharset=UTF-8

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = recommender
a1.sinks.k1.brokerList = 192.168.100.112:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Spark Streaming处理流程

package org.cuit.recommend.streamingRecommend

import java.io.FileInputStream
import java.util.{Calendar, Date, Properties, TimeZone, Timer}

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.cuit.recommend.offlineRecommend.OfflineRecommend.Ratings
import org.cuit.recommend.offlineStatistics.OfflineStatistics.Ratings
import redis.clients.jedis.Jedis

/**
 * @Author 
 * @Date 2022/3/28 16:39
 * @version 1.0
 * @注释
 */

//连接工具类
object ConnectUtil extends Serializable{
  val properties = new Properties()
  val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
  properties.load(new FileInputStream(path))

  //获取相关数据表名字
  val REDIS_HOST = properties.getProperty("redis_host")


  lazy val jedis = new Jedis(REDIS_HOST, 6379)


}

object StreamingRecommend {

  /**
   * 电影相似推荐矩阵
   *
   * @param movie_id       电影id
   * @param movie_sim_info 相似的电影id+ 相似得分
   */
  case class MovieSim(movie_id: Int, movie_sim_info: String)


  /**
   * 用户的推荐列表
   *
   * @param user_id     用户id
   *
   * @param user_sim_info    电影id+推荐评分
   *
   */
  case class UserSim(user_id: Int, user_sim_info:String)

  /**
   * 用户实时推荐列表
   * @param user_id   用户id
   * @param streaming_recom_info      推荐列表
   */
  case class StreamingRecommend(user_id: Int, streaming_recom_info:String)

  //用户评价
  //1,31,2.5,1260759144
  /**
   *
   * @param user_id          用户编码
   * @param movie_id         imdb编码
   * @param rating           评级
   * @param rating_timestamp 时间戳
   */
  case class Ratings(user_id: Int, movie_id: Int, rating: Double, rating_timestamp: Int)

  case class MysqlConfig(driver:String,url:String,username:String,password:String,tableName:String)

  //读取配置文件
  val properties = new Properties()
  val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath //文件要放到resource文件夹下
  properties.load(new FileInputStream(path))

  //获取相关数据表名字
  val MOVIES_TABLE = properties.getProperty("movies_table")
  val RATING_TABLE = properties.getProperty("rating_table")
  val MOVIE_SIM_TABLE = properties.getProperty("movieSim_table")
  val USER_SIM_TABLE = properties.getProperty("userSim_table")
  val STREAMING_RECOMMEND_TABLE = properties.getProperty("streaming_Recommend_table")
  val MAX_RECENT_RATING_NUM = properties.getProperty("max_recent_rating_num")
  val MAX_MOVIE_SIM_NUM =properties.getProperty("max_movie_sim_num")
  val KAFKA_TOPIC =properties.getProperty("Kafka_topic")
  val MYSQL_URI = properties.getProperty("musql_url")
  val KAFKA_BOOTSTRAP_SERVERS = properties.getProperty("kafka_bootstrap_servers")
  val KAFKA_GROUP_ID = properties.getProperty("kafka_group_id")
  val prop = new java.util.Properties
  val USER = properties.getProperty("jdbc_username")
  val PASSWORD = properties.getProperty("jdbc_password")
  val LOG_PARAMETER = properties.getProperty("log_parameter")
  val DRIVER = properties.getProperty("driver")
  val mysqlConfig: MysqlConfig = MysqlConfig(DRIVER, MYSQL_URI, USER, PASSWORD,STREAMING_RECOMMEND_TABLE)

  def streamingRecommend():Unit={

    //设置连接对象
    //拼接连接的url
    var url = s"${MYSQL_URI}?user=${USER}&password=${PASSWORD}"

    prop.setProperty("user", USER)
    prop.setProperty("password", PASSWORD)



    //创建SparkConf

    val conf: SparkConf = new SparkConf().setAppName("StreamingRecommend").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //创建SparkSession
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()

    //StreamingContext       2:批处理时间
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
    //隐式转换
    import sparkSession.implicits._

    //获取数据  由于数据规模庞大,性能要求,将其广播出去

    //得到一个评分map 方便后面直接根据user_id key 获取已经评分的movie_id s
    val ratingsMap: Map[Int, Array[Int]] = sparkSession.read
      //url /127.0.0.1:3306/test?user=root&password=root
      .option("url", url)
      //表名
      .option("dbtable", RATING_TABLE)
      .format("jdbc")
      .load()
      //转换为ratings类
      .map{
        row=>(row.getAs[Int]("user_id"),row.getAs[Int]("movie_id"))
      }.collect().groupBy(_._1).map{
          //user_id   (movie_id,Movie_id........)
      row=>(row._1,(row._2.map(_._2)))
    }

    //获取数据
    val movieSimMap: collection.Map[Int, Map[Int, Double]] = sparkSession.read
      //url /127.0.0.1:3306/test?user=root&password=root
      .option("url", url)
      //表名
      .option("dbtable", MOVIE_SIM_TABLE)
      .format("jdbc")
      .load()
      //转换为ratings类
      .as[MovieSim]
      .rdd
      //格式转换
      .map{ movie =>{
        //类型变换     类型转换 + map(map())  movie_Id为key  方便后面根据两个movie_id查询
        //movieRec: (movie_id,movie_id(String):score(String)| ....)  ->
        //              (map(movie_id(key),List(movie_id(Int),score(Double)) ... ))
        //             ->   (map(movie_id(key),List(map(movie_id(Int)(key),score(Double))) ... ))
        //(movie_id,(array))      split:做切分得到数组之后对每个元素进行处理       movie_sim_info:|5319 : 0.742|   27803:0.661|    6370:0.648|  40583:0.643|
        (movie.movie_id,movie.movie_sim_info.split("\\|").map{
          x=>{ //|5319 : 0.742
            val info = x.split(":")
            (info(0).toInt,info(1).toDouble)
          }
        }.toMap)
      }
      }.collectAsMap()

    val movieSimBroadCast: Broadcast[collection.Map[Int, Map[Int, Double]]] = sc.broadcast(movieSimMap)

    //进行流式处理

    //定义Kafka参数
    val kafkaParams = Map[String, Object](
      //连接参数 109.168.100。112:9092
      "bootstrap.servers" -> KAFKA_BOOTSTRAP_SERVERS,
      //序列化
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> KAFKA_GROUP_ID,
      "auto.offset.reset" -> "latest",
      //自动提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //通过Kafka创建一个KafkaStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(KAFKA_TOPIC), kafkaParams)
    )

    //日志数据 user_id|movie_id|rating|rating——timestamp 转换为评分流数据
    val userDataStream: DStream[(Int, Int, Double, Int)] = kafkaStream.map {
      //获取评分值
      data => {
        val user_data = data.value().split("RATING_PREFIX:")(1).split("\\|")

        //        user_id INT (6) not null,
        //        movie_id INT not null,
        //        rating FLOAT,
        //        rating_timestamp INT
        (user_data(0).toInt, user_data(1).toInt, user_data(2).toDouble, new Date().getTime.toInt)
      }
    }

    /**
     *实时计算,根据得到的信息进行实时计算
     */

    //对窗口获取到的一组用户数据进行遍历
    userDataStream.foreachRDD {
      //对每一条数据进行模式匹配
      userData =>userData.foreach{
        case (user_id:Int, movie_id:Int, rating:Double, rating_timestamp:Int) => {
          println(s"data --::--- user_id = ${user_id},movie_id = ${movie_id},rating=${rating},rating_timestamp=${rating_timestamp}")

          //数据更新到redis和mysql

          /**
           * 流程
           * 1.获取用户的k次评分  <-- redis   movie_id,rating
           * 2.根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表     movie_id......
           * 3.计算电影的优先推荐等级
           * 4.根据优先级获得推荐列表 movie_id,score
           * 5.缓存到业务数据库mysql
           */

          //1.获取用户的k次评分  <-- redis
          val recentRating: Array[(Int, Double)] = RecommendService.getUserRecentRatingFromRedis(MAX_RECENT_RATING_NUM.toInt, user_id, ConnectUtil.jedis)

          //2根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表     movie_id......

          //2.1筛选出用户已经评价过的电影

          val userAlreadyRating = ratingsMap(user_id)

//          val userAlreadyRating: Array[Int] = ratingsRdd.filter {
//            rating => rating.user_id == user_id
//            //return rdd[Ratings]
//          }.map(_.movie_id).collect() //将筛选后的返回一个mivie_id列表

          //2.2 根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表
          val movieRecommendList: Array[Int] = RecommendService.getMovieSimListFromMysql(movie_id, userAlreadyRating, MAX_MOVIE_SIM_NUM.toInt, movieSimBroadCast.value)

          //3,4.计算电影的优先推荐等级并得到列表      recentRating最近的评分     movieRecommendList:待推荐的电影列表   movieSimBroadCast:获取近期推荐电影的评分
          val streamingRecs: Array[(Int, Double)] = RecommendService.calculateRecommendPriority(LOG_PARAMETER.toInt, recentRating, movieRecommendList, movieSimBroadCast.value)

          //4.缓存到业务数据库mysql
//          RecommendService.storeResToMysql(streamingRecs, user_id)(mysqlConfig)
          RecommendService.storeResToRedis(streamingRecs, user_id,ConnectUtil.jedis)
        }
      }
    }
    //准备接受数据流
    ssc.start()
    //等待时间
    ssc.awaitTermination()
  }




  def main(args: Array[String]): Unit = {
    streamingRecommend()
  }
}

处理函数

package org.cuit.recommend.streamingRecommend
import java.sql.{Connection, DriverManager, ResultSet}

import com.mysql.jdbc.PreparedStatement
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.cuit.recommend.offlineStatistics.OfflineStatistics.Ratings
import org.cuit.recommend.streamingRecommend.StreamingRecommend.{MAX_MOVIE_SIM_NUM, MAX_RECENT_RATING_NUM, MysqlConfig}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ArrayBuffer

/**
 * @Author 高拴梁
 * @Date 2022/4/25 16:13
 * @version 1.0
 * @注释 实时推荐业务的具体实现
 *     * 1.获取用户的k次评分  <-- redis   movie_id,rating
 *     * 2.根据当前用户评分的电影 取得 与该电影 最相似的电影作为推荐列表     movie_id......
 *     * 3.计算电影的优先推荐等级
 *     * 4.根据优先级获得推荐列表 movie_id,score
 *     * 5.缓存到业务数据库mysql
 */
object RecommendService {

  /**
   *
   * @param recentRatingNum    实时推荐选择的最近k次评分
   * @param user_id       用户id
   * @param jedis         redis连接客户端
   * @return   最近评分结果(movie_id,rating)
   */
  def getUserRecentRatingFromRedis(recentRatingNum:Int,user_id:Int,jedis:Jedis):Array[(Int,Double)]={

    jedis.auth("123456")
    //jedis.lrange 返回Java的Array类型,需要转换才能使用Scala的map
    import scala.collection.JavaConversions._
    //redis获取数据  key:user_id  value: movie_id:rating     "1900:4.5"....
    // 返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定
    jedis.lrange("user_id:"+user_id,0,recentRatingNum).map{
      x=> {
        val userData = x.split(":")
        (userData(0).toInt,userData(0).toDouble)
      }
    }.toArray
  }

  /**
   *  从电影相似度矩阵获取最相似的n个电影 (不包括用户已经浏览评分过的电影)
   * @param movie_id   当前评分的电影id
   * @param alreadyRating   用户已经评价过的电影
   * @param movieSimNum    获取的电影数量
   * @param movieSim     电影相似度矩阵
   * @return  最相似的电影id Array(1,2,3,4.....)
   */
  def getMovieSimListFromMysql(movie_id:Int
                               ,alreadyRating:Array[Int]
                               ,movieSimNum:Int
                               ,movieSim:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Array[Int]={
    /**
     * 考虑到推荐结果不应该包含用户评分过的电影
     * 流程
     * 1.根据movie_id从movieSimBroadCast获取最相似的n个电影
     * 2.过滤掉movie_id相同的电影
     */
    //movieSim      Map[Int(movie_id), Map[Int(movie_id), Double(sim_score)]]  =>Array(Int(movie_id), Double(sim_score))
    val simMovies = movieSim(movie_id).toArray
    //2.过滤掉看过的电影  排序取
    simMovies.filter(movie=> !alreadyRating.contains(movie._1))
      .sortWith(_._2>_._2)
      .take(movieSimNum)
      //获得电影列表
      .map(_._1)

  }

  /**
   * 计算实时推荐的电影的优先级并返回优先级结果
   * @param logParameter   求log的底数
   * @param recentRating     最近评分结果(movie_id,rating)
   * @param movieRecommendList         最相似的电影id Array(1,2,3,4.....)
   * @param movieSimBroadCast       电影相似度矩阵
   * @return  实施推荐结果列表 Array((movie_id,score)))
   */
  def calculateRecommendPriority(logParameter:Int
                                 ,recentRating: Array[(Int, Double)]
                                 ,movieRecommendList: Array[Int]
                                 ,movieSimBroadCast:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Array[(Int,Double)]={

    //计算每个推荐电影(推荐电影来自于movieRecommendList 即电影相似度矩阵的topN)的推荐得分 Int, Double movie_id,score
    val movieScore = new ArrayBuffer[(Int, Double)]()
    //推荐优先级  incount     Int ,Int:电影id,用户最近k次评分中大于某个值的个数   个数越多说明该电影越值得推荐
    val inCountMap = scala.collection.mutable.HashMap[Int, Int]()
    //不推荐优先级  decount  同理
    val deCountMap = scala.collection.mutable.HashMap[Int, Int]()

    //遍历待推荐电影和用户近期的k次评分电影   得到基础得分movieScore中的值
    //((1,0.5),(1,0.6),(1,0.7)...)     1:待推荐的电影id    0.5:该待推荐电影与近期评分数据的推荐值
    for (movieRecommend<-movieRecommendList;recentMovie<-recentRating){
      //计算两者的相似度,给定两个mid从相似度矩阵中得到对应的值
      val score = getMovieSimScoreFromMovieSimRecs(movieRecommend, recentMovie._1,movieSimBroadCast)
      if (score>0.5){
        //sim(q,r) * Rating r        (movieRecommend,score * recentMovie._2)作为一个整体元组放入ArrayBuffer
        movieScore += ((movieRecommend,score * recentMovie._2))
        if (recentMovie._2>3){
          //避免因未初始化而报错
          inCountMap(movieRecommend) = inCountMap.getOrElse(movieRecommend,0)+1
        }else deCountMap(movieRecommend) = deCountMap.getOrElse(movieRecommend,0)+1
      }
    }
    //做聚合求最后的推荐值 movie_id聚合  ((1,0.5),(1,0.6),(1,0.7)...)     1:待推荐的电影id    0.5:该待推荐电影与近期评分数据的推荐值
    movieScore.groupBy(_._1).map{
      /**
       * 推荐值 = sim(q,r) * Rating r 做聚合(按照movie_id)求均值 + lg max(incount,1) - lg max(decount,1)
       */
      //模式匹配 返回(movie_id,推荐值)   scoreMapList:groupBy后返回的 数据类型  Map(movie_id,ArrayBuffer(movie_id,score))
      case (movie_id: Int,scoreArray) => (movie_id,scoreArray.map(_._2).sum / scoreArray.length
        + log(inCountMap.getOrElse(movie_id,1))-log(deCountMap.getOrElse(movie_id,1)))
    }.toArray.sortWith(_._2>_._2)
  }

  /**
   *  插入结果数据到mysql
   * @param streamingRecs   该用户推荐的电影(movie_id,score),(),()
   * @param user_id        用户id
   * @param mysqlConfig    mysql连接配置
   */
  def storeResToMysql(streamingRecs:Array[(Int,Double)],user_id:Int)(implicit mysqlConfig: MysqlConfig):Unit={


    val driver = mysqlConfig.driver
    val url = mysqlConfig.url
    val username = mysqlConfig.username
    val password = mysqlConfig.password

    var connection:Connection = null

    //结果处理
    val streamingRecommendinfo: String = streamingRecs.map {
      case (movie_id: Int, score: Double) => s"${movie_id}:${score.formatted("%.3f")}|"
    }.mkString("")

    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)
      val statement = connection.createStatement()

      //判断未更新的原始数据是否存在
      val resultSet = statement.executeQuery( s"select * from ${mysqlConfig.tableName}")
      while ( resultSet.next() ) {
        val deleteSql = s"delete from ${mysqlConfig.tableName} where user_id = ${user_id}"
        connection.prepareStatement(deleteSql).executeUpdate()
      }
      //如果没有就插入数据
      val insertSQL="insert into "+mysqlConfig.tableName+" values("+user_id+",\""+streamingRecommendinfo+"\")"
      connection.prepareStatement(insertSQL).executeUpdate()
    } catch {
      case e => e.printStackTrace
      //case _: Throwable => println("ERROR")
    }
    connection.close()
  }

  /**
   * 推荐结果放入Redis
   * @param streamingRecs 实时推荐矩阵
   * @param user_id   用户id
   * @param jedis   jedis配置连接
   */
  def storeResToRedis(streamingRecs:Array[(Int,Double)],user_id:Int,jedis: Jedis):Unit={

    //结果处理
    val streamingRecommendinfo: String = streamingRecs.map {
      case (movie_id: Int, score: Double) => s"${movie_id}:${score.formatted("%.3f")}|"
    }.mkString("")

    jedis.auth("123456")
    jedis.hset("streaming",user_id.toString,streamingRecommendinfo)

  }



  /**
   * 根据movieA和B的电影id来从movieSimBroadCast中取得两个电影的相似度  ,找不到取默认值
   * @param movieA  电影A的id
   * @param movieB  电影B的id
   * @param movieSimBroadCast      电影相似度矩阵  Map[Int(MovieA_id), Map[Int(movieB_id), Double(相似度)]
   * @return  相似度
   */
  def getMovieSimScoreFromMovieSimRecs(movieA:Int,movieB:Int,movieSimBroadCast:scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]):Double={
    movieSimBroadCast.get(movieA) match {
      case Some(movieSimmap) => movieSimmap.get(movieB) match {
        case Some(score) => score
        case None => 0.0
      }
      case None => 0.0
    }
  }

  /**
   *
   * @param m   代求数
   * @param logParameter  底数
   * @return  log结果
   */
  def log(m:Int,logParameter:Int = 10):Double={

    //对数的换底公式
    math.log(m) / math.log(logParameter)
  }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容