一、实现
package com.atguigu.sparkmall.offline
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID
import com.atguigu.sparkmall.common.model.CategoryTop10
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import com.atguigu.sparkmall.common.util.StringUtil
object Req1CategoryTop10Application {
def main(args: Array[String]): Unit = {
//需求一:获取点击、下单和支付数量排名前10的品类
val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
val sc = new SparkContext(conf)
//从文件中获取原始数据
val lineRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")
//将数据转换结构(category_click,1),(category_order,1),(category_pay,1)
/** category有点击品类、下单品类、付款品类*/
val mapRDD: RDD[Array[(String, Long)]] = lineRDD.map(line => {
val datas: Array[String] = line.split(",")
if (datas(6) != "-1") { //-1表示无效数据
//点击数据
Array((datas(6) + "_click", 1L))
} else if (StringUtil.isNotEmpty(datas(8))) {
//下单数据
val categoryIds: Array[String] = datas(8).split("-")
categoryIds.map {
id => (id + "_order", 1L)
}
} else if (StringUtil.isNotEmpty(datas(10))) {
val categoryIds: Array[String] = datas(10).split("-")
categoryIds.map {
id => (id + "_pay", 1L)
}
} else {
Array(("", 0L)) //此处是为了在前面所有条件都不满足的情况下返回和前面返回数据类型相同的结构
}
})
//因为拿到的是一个数组整体mapRDD: RDD[Array[(String, Long)]],所以需要扁平化打散成单个的tuple
val flatmapRDD: RDD[(String, Long)] = mapRDD.flatMap(array => array)
//因为前面当所有条件都不满足的情况下会产生("", 0L)的数据,需要过滤
val filterRDD: RDD[(String, Long)] = flatmapRDD.filter {
case (key, v) => {
StringUtil.isNotEmpty(key)
}
}
//将数据分组聚合(category_click,sum),(category_order,sum),(category_pay,sum)
val reduceRDD: RDD[(String, Long)] = filterRDD.reduceByKey(_+_)
//将数据转换结构(category,(click,sum)),(category,(order,sum)),(category,(pay,sum)),
val mappRDD1: RDD[(String, (String, Long))] = reduceRDD.map {
case (key, sum) => {
val keys: Array[String] = key.split("_")
(keys(0), (keys(1), sum))
}
}
//分组,排序: 根据品类分组
val groupRDD: RDD[(String, Iterable[(String, Long)])] = mappRDD1.groupByKey()
//因为操作数据不方便,将数据转换程样例类RDD[(String, Iterable[(String, Long)])] --> UserVisitAction
val taskId: String = UUID.randomUUID().toString
val classRDD: RDD[CategoryTop10] = groupRDD.map {
case (categoryId, iter) => {
val map: Map[String, Long] = iter.toMap //转换成map
CategoryTop10(taskId, categoryId, map.getOrElse("click", 0L), map.getOrElse("order", 0L), map.getOrElse("pay", 0L))
}
}
//取top10
val collectArray: Array[CategoryTop10] = classRDD.collect() //收集为Array用集合的方式进行排序
//sortWith方便自定义排序规则
val top10Array: Array[CategoryTop10] = collectArray.sortWith {
(left, right) => {
if (left.clickCount > right.clickCount) {
true
} else if (left.clickCount == right.clickCount) {
if (left.orderCount > right.orderCount) {
true
} else if (left.orderCount == right.orderCount) {
if (left.payCount > right.payCount) {
true
} else {
false
}
} else {
false
}
} else {
false
}
}
}.take(10)
//将结果保存到Mysql中
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://linux1:3306/sparkmall-190311"
val userName = "root"
val passWd = "000000"
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url, userName, passWd)
val sql = "insert into category_top10 ( taskId, category_id, click_count, order_count, pay_count ) values (?, ?, ?, ?, ?)"
val statement: PreparedStatement = connection.prepareStatement(sql)
top10Array.foreach{
obj=>{
statement.setObject(1, obj.taskId)
statement.setObject(2, obj.categoryId)
statement.setObject(3, obj.clickCount)
statement.setObject(4, obj.orderCount)
statement.setObject(5, obj.payCount)
statement.executeUpdate()
}
}
statement.close()
connection.close()
// 释放资源
sc.stop()
}
}
二、使用累加器进行优化
package com.atguigu.sparkmall.offline
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID
import com.atguigu.sparkmall.common.model.CategoryTop10
import com.atguigu.sparkmall.common.util.StringUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.{immutable, mutable}
import scala.collection.parallel.immutable
object Req1CategoryTop10Application1 {
def main(args: Array[String]): Unit = {
//需求一:获取点击、下单和支付数量排名前10的品类
val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
val sc = new SparkContext(conf)
//从文件中获取原始数据
val lineRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")
lineRDD.foreach(println)
println("********")
//创建累加器
val accumulator = new CategoryAccumulator
//注册累加器
sc.register(accumulator,"accumulator")
//使用累加器
lineRDD.foreach{
line => {
val datas: Array[String] = line.split(",")
if(datas(6) != -1) {
//点击数据
accumulator.add(datas(6) + "_click")
} else if (StringUtil.isNotEmpty(datas(8))) {
//下单数据
val categoryIds: Array[String] = datas(8).split("-")
categoryIds.map{
id => {
accumulator.add(id + "_order")
}
}
} else if(StringUtil.isNotEmpty(datas(10))) {
//支付数据
val categoryIds: Array[String] = datas(10).split("-")
categoryIds.map{
id => {
accumulator.add(id + "_pay")
}
}
}
}
}
//获取累加器的值(category_click,sum)
val accumulatorVal: mutable.HashMap[String, Long] = accumulator.value
//将累加器的值根据品类进行分组 (category,(category_click,sum))
val categoryToMap: Map[String, mutable.HashMap[String, Long]] = accumulatorVal.groupBy {
case (k, sum) => {
k.split("_")(0)
}
}
val taskId: String = UUID.randomUUID().toString
//将分组后的数据转换为样例类:
val categoryTop10: Iterable[CategoryTop10] = categoryToMap.map {
case (category, map) => {
CategoryTop10(
taskId,
category,
map.getOrElse(category + "_click", 0L),
map.getOrElse(category + "_order", 0L),
map.getOrElse(category + "_pay", 0L)
)
}
}
val result: List[CategoryTop10] = categoryTop10.toList.sortWith {
(left, right) => {
if (left.clickCount > right.clickCount) {
true
} else if (left.clickCount == right.clickCount) {
if (left.orderCount > right.orderCount) {
true
} else if (left.orderCount == right.orderCount) {
left.payCount > right.payCount
} else {
false
}
} else {
false
}
}
}.take(10)
result.foreach(println)
/*
//将结果保存到Mysql中
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
val userName = "root"
val passWd = "111111"
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url, userName, passWd)
val sql = "insert into category_top10 ( taskId, category_id, click_count, order_count, pay_count ) values (?, ?, ?, ?, ?)"
val statement: PreparedStatement = connection.prepareStatement(sql)
top10Array.foreach{
obj=>{
statement.setObject(1, obj.taskId)
statement.setObject(2, obj.categoryId)
statement.setObject(3, obj.clickCount)
statement.setObject(4, obj.orderCount)
statement.setObject(5, obj.payCount)
statement.executeUpdate()
}
}
statement.close()
connection.close()*/
// 释放资源
sc.stop()
}
}
//自定义累加器
//category-click,100
//categoru-order,200
//category-pay,300
class CategoryAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Long]] {
var map = new mutable.HashMap[String,Long]()
override def isZero: Boolean = {
map.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.HashMap[String, Long]] = {
new CategoryAccumulator
}
override def reset(): Unit = {
map.clear()
}
override def add(in: String): Unit = {
map(in) = map.getOrElse(in,0L) + 1
}
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Long]]): Unit = {
var map1 = map
var map2 = other.value
map = map1.foldLeft(map2) {
(tempMap, kv) =>{
val k: String = kv._1
val v: Long = kv._2
tempMap(k) = tempMap.getOrElse(k,0L) + v
tempMap
}
}
}
override def value: mutable.HashMap[String, Long] = {
map
}
}