4.2 需求1实现

一、实现

package com.atguigu.sparkmall.offline

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID

import com.atguigu.sparkmall.common.model.CategoryTop10
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import com.atguigu.sparkmall.common.util.StringUtil


object Req1CategoryTop10Application {
  def main(args: Array[String]): Unit = {
    //需求一:获取点击、下单和支付数量排名前10的品类

    val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
    val sc = new SparkContext(conf)

    //从文件中获取原始数据
    val lineRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")

    //将数据转换结构(category_click,1),(category_order,1),(category_pay,1)
    /** category有点击品类、下单品类、付款品类*/
    val mapRDD: RDD[Array[(String, Long)]] = lineRDD.map(line => {
      val datas: Array[String] = line.split(",")
      if (datas(6) != "-1") { //-1表示无效数据
        //点击数据
        Array((datas(6) + "_click", 1L))
      } else if (StringUtil.isNotEmpty(datas(8))) {
        //下单数据
        val categoryIds: Array[String] = datas(8).split("-")
        categoryIds.map {
          id => (id + "_order", 1L)
        }
      } else if (StringUtil.isNotEmpty(datas(10))) {
        val categoryIds: Array[String] = datas(10).split("-")
        categoryIds.map {
          id => (id + "_pay", 1L)
        }
      } else {
        Array(("", 0L)) //此处是为了在前面所有条件都不满足的情况下返回和前面返回数据类型相同的结构
      }
    })

    //因为拿到的是一个数组整体mapRDD: RDD[Array[(String, Long)]],所以需要扁平化打散成单个的tuple
    val flatmapRDD: RDD[(String, Long)] = mapRDD.flatMap(array => array)

    //因为前面当所有条件都不满足的情况下会产生("", 0L)的数据,需要过滤
    val filterRDD: RDD[(String, Long)] = flatmapRDD.filter {
      case (key, v) => {
        StringUtil.isNotEmpty(key)
      }
    }

    //将数据分组聚合(category_click,sum),(category_order,sum),(category_pay,sum)
    val reduceRDD: RDD[(String, Long)] = filterRDD.reduceByKey(_+_)

    //将数据转换结构(category,(click,sum)),(category,(order,sum)),(category,(pay,sum)),
    val mappRDD1: RDD[(String, (String, Long))] = reduceRDD.map {
      case (key, sum) => {
        val keys: Array[String] = key.split("_")
        (keys(0), (keys(1), sum))
      }
    }

    //分组,排序: 根据品类分组
    val groupRDD: RDD[(String, Iterable[(String, Long)])] = mappRDD1.groupByKey()

    //因为操作数据不方便,将数据转换程样例类RDD[(String, Iterable[(String, Long)])] --> UserVisitAction
    val taskId: String = UUID.randomUUID().toString

    val classRDD: RDD[CategoryTop10] = groupRDD.map {
      case (categoryId, iter) => {
        val map: Map[String, Long] = iter.toMap //转换成map
        CategoryTop10(taskId, categoryId, map.getOrElse("click", 0L), map.getOrElse("order", 0L), map.getOrElse("pay", 0L))
      }
    }

    //取top10
    val collectArray: Array[CategoryTop10] = classRDD.collect() //收集为Array用集合的方式进行排序

    //sortWith方便自定义排序规则
    val top10Array: Array[CategoryTop10] = collectArray.sortWith {
      (left, right) => {
        if (left.clickCount > right.clickCount) {
          true
        } else if (left.clickCount == right.clickCount) {
          if (left.orderCount > right.orderCount) {
            true
          } else if (left.orderCount == right.orderCount) {
            if (left.payCount > right.payCount) {
              true
            } else {
              false
            }
          } else {
            false
          }
        } else {
          false
        }
      }
    }.take(10)

    //将结果保存到Mysql中
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://linux1:3306/sparkmall-190311"
    val userName = "root"
    val passWd = "000000"

    Class.forName(driver)
    val connection: Connection = DriverManager.getConnection(url, userName, passWd)
    val sql = "insert into category_top10 ( taskId, category_id, click_count, order_count, pay_count ) values (?, ?, ?, ?, ?)"
    val statement: PreparedStatement = connection.prepareStatement(sql)

    top10Array.foreach{
      obj=>{
        statement.setObject(1, obj.taskId)
        statement.setObject(2, obj.categoryId)
        statement.setObject(3, obj.clickCount)
        statement.setObject(4, obj.orderCount)
        statement.setObject(5, obj.payCount)
        statement.executeUpdate()
      }
    }
    statement.close()
    connection.close()

    // 释放资源
    sc.stop()
  }
}

二、使用累加器进行优化

package com.atguigu.sparkmall.offline

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID

import com.atguigu.sparkmall.common.model.CategoryTop10
import com.atguigu.sparkmall.common.util.StringUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.{immutable, mutable}
import scala.collection.parallel.immutable


object Req1CategoryTop10Application1 {
  def main(args: Array[String]): Unit = {
    //需求一:获取点击、下单和支付数量排名前10的品类

    val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
    val sc = new SparkContext(conf)

    //从文件中获取原始数据
    val lineRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")

    lineRDD.foreach(println)
    println("********")

    //创建累加器
    val accumulator = new CategoryAccumulator

    //注册累加器
    sc.register(accumulator,"accumulator")

    //使用累加器
    lineRDD.foreach{
      line => {
        val datas: Array[String] = line.split(",")
        if(datas(6) != -1) {
          //点击数据
          accumulator.add(datas(6) + "_click")
        } else if (StringUtil.isNotEmpty(datas(8))) {
          //下单数据
          val categoryIds: Array[String] = datas(8).split("-")
          categoryIds.map{
            id => {
              accumulator.add(id + "_order")
            }
          }
        } else if(StringUtil.isNotEmpty(datas(10))) {
          //支付数据
          val categoryIds: Array[String] = datas(10).split("-")
          categoryIds.map{
            id => {
              accumulator.add(id + "_pay")
            }
          }
        }
      }
    }

    //获取累加器的值(category_click,sum)
    val accumulatorVal: mutable.HashMap[String, Long] = accumulator.value

    //将累加器的值根据品类进行分组 (category,(category_click,sum))
    val categoryToMap: Map[String, mutable.HashMap[String, Long]] = accumulatorVal.groupBy {
      case (k, sum) => {
        k.split("_")(0)
      }
    }

    val taskId: String = UUID.randomUUID().toString

    //将分组后的数据转换为样例类:
    val categoryTop10: Iterable[CategoryTop10] = categoryToMap.map {
      case (category, map) => {
        CategoryTop10(
          taskId,
          category,
          map.getOrElse(category + "_click", 0L),
          map.getOrElse(category + "_order", 0L),
          map.getOrElse(category + "_pay", 0L)
        )
      }
    }

    val result: List[CategoryTop10] = categoryTop10.toList.sortWith {
      (left, right) => {
        if (left.clickCount > right.clickCount) {
          true
        } else if (left.clickCount == right.clickCount) {
          if (left.orderCount > right.orderCount) {
            true
          } else if (left.orderCount == right.orderCount) {
            left.payCount > right.payCount
          } else {
            false
          }
        } else {
          false
        }
      }
    }.take(10)


    result.foreach(println)


/*
    //将结果保存到Mysql中
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
    val userName = "root"
    val passWd = "111111"

    Class.forName(driver)
    val connection: Connection = DriverManager.getConnection(url, userName, passWd)
    val sql = "insert into category_top10 ( taskId, category_id, click_count, order_count, pay_count ) values (?, ?, ?, ?, ?)"
    val statement: PreparedStatement = connection.prepareStatement(sql)

    top10Array.foreach{
      obj=>{
        statement.setObject(1, obj.taskId)
        statement.setObject(2, obj.categoryId)
        statement.setObject(3, obj.clickCount)
        statement.setObject(4, obj.orderCount)
        statement.setObject(5, obj.payCount)
        statement.executeUpdate()
      }
    }
    statement.close()
    connection.close()*/
    // 释放资源
    sc.stop()
  }
}


//自定义累加器
//category-click,100
//categoru-order,200
//category-pay,300
class CategoryAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Long]] {
  var map = new mutable.HashMap[String,Long]()

  override def isZero: Boolean = {
    map.isEmpty
  }

  override def copy(): AccumulatorV2[String, mutable.HashMap[String, Long]] = {
    new CategoryAccumulator
  }

  override def reset(): Unit = {
    map.clear()
  }

  override def add(in: String): Unit = {
    map(in) = map.getOrElse(in,0L) + 1
  }

  override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Long]]): Unit = {
    var map1 = map
    var map2 = other.value

    map = map1.foldLeft(map2) {
      (tempMap, kv) =>{
        val k: String = kv._1
        val v: Long = kv._2

        tempMap(k) = tempMap.getOrElse(k,0L) + v

        tempMap
      }
    }
  }

  override def value: mutable.HashMap[String, Long] = {
    map
  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容