Scala之一二

SELECT:

val df = trainData.select("ruid", "log_date","fans1","watch_num1","danmu_cnt1","gap_days1","money_num1")

JOIN:

val result2=kmeansData.join(result1,Seq("prediction"))

GROUP BY:

kmeansData.groupBy("prediction").count().show()
kmeansData.groupBy("prediction").mean("fans","gap_days")

主播聚类code示例:

package com.bilibili.live

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler}
import org.apache.spark.sql.SparkSession

object AnchorCluster {

  def main(args: Array[String]): Unit = {
    println("anchor cluster label process start!")

        val saveMode = args(0)
        val log_dt = args(1)

        println(s"log_date: ${log_dt}")

    //
        val spark = SparkSession
          .builder()
          .appName("AnchorCluster")
//          .master("local[4]")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .config("spark.sql.warehouse.dir", "spark-warehouse")
          .config("spark.sql.broadcastTimeout", "36000")
          .enableHiveSupport()
          .getOrCreate()
        // spark.conf.set("hive.security.authorization.enabled", false)
        println("spark version:%s".format(spark.version))




    //    // 开始聚类
    //    val numClusters = 4
    //    val numIterations = 500
    //    val clusters = KMeans.train(parsedData, numClusters, numIterations)
    //
    //    // Evaluate clustering by computing Within Set Sum of Squared Errors
    //    // val WSSSE = clusters.computeCost(parsedData)
    //    // println(s"Within Set Sum of Squared Errors = $WSSSE")
    //
    //    // Save and load model
    //    clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
    //    val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
    //
    //    trainingData.collect().foreach(
    //      sample =>{
    //      val predictedCluster=model.predict(sample)
    //      println(sample.toString + "belongs to cluster" + predictedCluster)}
    //
    //    // 返回数据集和结果
    //    val result = data.map {
    //      line =>
    //        val linevectore = Vectors.dense(line.split(" ").map(_.toDouble))
    //        val prediction = model.predict(linevectore)
    //        line + " " + prediction
    //    }.collect.foreach(println)

//-----------------------------
//         读取sql:
        val trainDataSql = "select *,log10(fans+1) as fans1,log10(watch_num+1) as watch_num1," +
          "log10(danmu_cnt+1) as danmu_cnt1,log10(gap_days+1) as gap_days1,log10(money_num+1) as money_num1 " +
          "from ai_live.ruid_cluster_daily_train" +
          s" where log_date = '${log_dt}' "

         println("-----------------train data sql----------------")
         println(trainDataSql)

        val trainData = spark.sql(trainDataSql).limit(500000)


         println("-----------------train feature----------------")
         trainData.show(3)



        // 读取本地数据:
//          val trainData: DataFrame = spark.read.format("csv")
//            .option("header", "true")
//            .option("inferSchema", "true")
//            .load("data/anchor_cluster_test.csv")
//            .na.fill(-99)
//
//           trainData.show(3)

        // 处理数据,toarry+归一化
          val df = trainData.select("ruid", "log_date","fans1","watch_num1","danmu_cnt1","gap_days1","money_num1",
                                                        "fans","watch_num","danmu_cnt","gap_days","money_num")
           df.show(numRows = 3)

          val assembler = new VectorAssembler()
           .setInputCols(df.drop("ruid","log_date","fans","watch_num","danmu_cnt","gap_days","money_num").columns.toArray)
           .setOutputCol("features")

          val df1=assembler.transform(df)

          val scaler = new StandardScaler()
            .setInputCol("features")
            .setOutputCol("new_feature")
            .setWithStd(true)
            .setWithMean(false)

          val scalerModel = scaler.fit(df1)
          val scaledData = scalerModel.transform(df1)
            scaledData.show

        //聚类
         val kmeans=new KMeans()
           .setK(4)
           .setMaxIter(500)//最大迭代次数
           .setFeaturesCol("new_feature")
           .setPredictionCol("prediction")

         val kmeansModel = kmeans.fit(scaledData)

//    查看聚类结果
    val kmeansData = kmeansModel.transform(scaledData)
    kmeansData.show()

    kmeansData.groupBy("prediction").count().show()
    val result1=kmeansData
      .groupBy("prediction").mean("fans","gap_days")
      .withColumnRenamed("avg(fans)","avg_fans")
      .withColumnRenamed("avg(gap_days)","avg_gap_days")


    val result2=kmeansData.join(result1,Seq("prediction"))
    result2.show()

    result2.createOrReplaceTempView("result_final")

    val result_final = spark.sql("select ruid,fans,watch_num,danmu_cnt,gap_days,money_num,prediction,"+
      "case when avg_fans>10000 then '成熟'" +
      " when avg_fans>1000 then '成长' " +
      " when avg_gap_days<10 then '新' else '尾部主播' end as label from result_final")

    result_final.show()

//写入数据库
    spark.catalog.setCurrentDatabase("bili_live")
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

    if (saveMode == "overwrite") {
      result_final.write.mode("overwrite")
        .partitionBy("log_date")
        .format("parquet")
        .saveAsTable("ai_live.data_mining_anchor_cluster_predict_d")
    } else {
      result_final.write.mode("overwrite")
        .format("parquet")
        .insertInto("ai_live.data_mining_anchor_cluster_predict_d")
    }

    println("Write table success!")

    println("Done!!!")

    spark.stop()

  }
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容