Dataset基本使用

package com.kun.core

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.mutable.ArrayBuffer


/**
  * Dataset开发实战企业人员管理系统应用案例代码
  */
object DatasetOps {

  /**
    * Case Class的特别之处在于:
    *
    * 编译器会为Case Class自动生成以下方法:
    * equals & hashCode
    * toString
    * copy
    * 编译器会为Case Class自动生成伴生对象
    *
    * 编译器会为伴生对象自动生成以下方法
    * apply
    * unapply
    * 这意味着你可以不必使用new关键字来实例化一个case class.
    *
    * case class的类参数在不指定val/var修饰时,会自动编译为val,即对外只读,如果需要case class的字段外部可写,可以显式地指定var关键字!
    */
  case class Person(name: String, age: Long)
  case class Score(n: String, score: Long)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().appName("DatasetOps").master("local[4]").
      config("spark.sql.warehouse", "/home/ghost/IdeaProjects/SparkBook/spark-warehouse").getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    /**
      * Dataset中的tranformation和Action操作,Action类型的操作有:
      * show collect first reduce take count等
      * 这些操作都会产生结果,也就是说会执行逻辑计算的过程
      */
    val personsDF = spark.read.json("data/peoplemanagedata/people.json")
    val personScoresDF = spark.read.json("data/peoplemanagedata/peopleScores.json")
    // 生成Dataset
    val personsDS = personsDF.as[Person]
    val personScoresDS = personScoresDF.as[Score]

    println("使用groupBy算子进行分组:")
    /**
      * +-------+---+-----+
      * |   name|age|count|
      * +-------+---+-----+
      * | Justin| 29|    1|
      * |   Andy| 30|    1|
      * |Michael| 16|    1|
      * |Michael| 46|    1|
      * | Justin| 19|    1|
      * +-------+---+-----+
      */
    // $"columnName"  Scala short hand for a named column
    val personsDSGrouped = personsDS.groupBy($"name", $"age").count()
    personsDSGrouped.show()

    println("使用agg算子concat内置函数将姓名、年龄连接在一起成为单个字符串列 :")

    /**
      * +-------+---+-----------------+
      * |   name|age|concat(name, age)|
      * +-------+---+-----------------+
      * | Justin| 29|         Justin29|
      * |   Andy| 30|           Andy30|
      * |Michael| 16|        Michael16|
      * |Michael| 46|        Michael46|
      * | Justin| 19|         Justin19|
      * +-------+---+-----------------+
      */
    personsDS.groupBy($"name", $"age").agg(concat($"name", $"age")).show()

    println("使用col算子选择列 :")

    /**
      * +------------+------------+
      * |          _1|          _2|
      * +------------+------------+
      * |[16,Michael]|[Michael,88]|
      * |   [30,Andy]|  [Andy,100]|
      * | [19,Justin]| [Justin,89]|
      * | [29,Justin]| [Justin,89]|
      * |[46,Michael]|[Michael,88]|
      * +------------+------------+
      */
    personsDS.joinWith(personScoresDS, personsDS.col("name")===personScoresDS.col("n")).show()

    println("使用sum、avg等函数计算年龄总和、平均年龄、最大年龄、最小年龄、唯一年龄计数、平均年龄、当前时间等数据 :")

    /**
      * +-------+--------+--------+--------+--------+----------+-------------------+--------+--------------+
      * |   name|sum(age)|avg(age)|max(age)|min(age)|count(age)|count(DISTINCT age)|avg(age)|current_date()|
      * +-------+--------+--------+--------+--------+----------+-------------------+--------+--------------+
      * |Michael|      62|    31.0|      46|      16|         2|                  2|    31.0|    2019-02-26|
      * |   Andy|      30|    30.0|      30|      30|         1|                  1|    30.0|    2019-02-26|
      * | Justin|      48|    24.0|      29|      19|         2|                  2|    24.0|    2019-02-26|
      * +-------+--------+--------+--------+--------+----------+-------------------+--------+--------------+
      */
    personsDS.groupBy($"name").agg(sum($"age"), avg($"age"), max($"age"), min($"age"), count($"age"),
      countDistinct($"age"), mean($"age"), current_date()).show()

    println("函数collect_list、collect_set比较,collect_list函数结果中包含重复元素;collect_set函数结果中无重复元素:")

    /**
      * +-------+------------------+-----------------+
      * |   name|collect_list(name)|collect_set(name)|
      * +-------+------------------+-----------------+
      * |Michael|[Michael, Michael]|        [Michael]|
      * |   Andy|            [Andy]|           [Andy]|
      * | Justin|  [Justin, Justin]|         [Justin]|
      * +-------+------------------+-----------------+
      */
    personsDS.groupBy($"name").agg(collect_list($"name"), collect_set($"name")).show()

    println("使用sample算子进行随机采样:")

    /**
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 16|Michael|
      * | 30|   Andy|
      * | 19| Justin|
      * | 46|Michael|
      * +---+-------+
      * 不保证刚好0.5
      */
    personsDS.sample(false, 0.5).show()

    println("使用randomSplit算子进行随机切分:")

    /**
      * +---+------+
      * |age|  name|
      * +---+------+
      * | 29|Justin|
      * +---+------+
      *
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 16|Michael|
      * | 19| Justin|
      * | 30|   Andy|
      * | 46|Michael|
      * +---+-------+
      *
      * Array里面两个数为权重
      */
    personsDS.randomSplit(Array(10, 20)).foreach(dataset => dataset.show())

    println("使用select算子选择列:")

    /**
      * +-------+
      * |   name|
      * +-------+
      * |Michael|
      * |   Andy|
      * | Justin|
      * | Justin|
      * |Michael|
      * +-------+
      */
    personsDS.select("name").show()

    println("使用joinWith算子关联企业人员信息、企业人员分数评分信息:")

    /**
      * +------------+------------+
      * |          _1|          _2|
      * +------------+------------+
      * |[16,Michael]|[Michael,88]|
      * |   [30,Andy]|  [Andy,100]|
      * | [19,Justin]| [Justin,89]|
      * | [29,Justin]| [Justin,89]|
      * |[46,Michael]|[Michael,88]|
      * +------------+------------+
      */
    personsDS.joinWith(personScoresDS, $"name"===$"n").show()

    println("使用join算子关联企业人员信息、企业人员分数评分信息:")

    /**
      * +---+-------+-------+-----+
      * |age|   name|      n|score|
      * +---+-------+-------+-----+
      * | 16|Michael|Michael|   88|
      * | 30|   Andy|   Andy|  100|
      * | 19| Justin| Justin|   89|
      * | 29| Justin| Justin|   89|
      * | 46|Michael|Michael|   88|
      * +---+-------+-------+-----+
      */
    personsDS.join(personScoresDS, $"name"===$"n").show()

    println("使用sort算子对年龄进行降序排序:")

    /**
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 46|Michael|
      * | 30|   Andy|
      * | 29| Justin|
      * | 19| Justin|
      * | 16|Michael|
      * +---+-------+
      */
    personsDS.sort($"age".desc).show()

    def myFlatMapFunction(myPerson: Person, myEncoder: Person): Dataset[Person] = {
      personsDS
    }

    /**
      * +-------+---+
      * |     _1| _2|
      * +-------+---+
      * |Michael| 46|
      * |   Andy|100|
      * | Justin| 49|
      * | Justin| 59|
      * |Michael| 76|
      * +-------+---+
      */
    personsDS.flatMap(persons => persons match {
      case Person(name, age) if name=="Andy" => List((name, age+70))
      case Person(name, age) => List((name, age+30))
    }).show()

    /**
      * +-------+----+
      * |     _1|  _2|
      * +-------+----+
      * |Michael|1016|
      * |   Andy|1030|
      * | Justin|1019|
      * | Justin|1029|
      * |Michael|1046|
      * +-------+----+
      */
    personsDS.mapPartitions{ persons =>
      val result = ArrayBuffer[(String, Long)]()
      while (persons.hasNext){
        val person = persons.next()
        result += ((person.name, person.age+1000))
      }
      result.iterator
    }.show()

    println("使用dropDuplicates算子统计企业人员管理系统姓名无重复员工的记录:")

    /**
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 16|Michael|
      * | 30|   Andy|
      * | 19| Justin|
      * +---+-------+
      */
    personsDS.dropDuplicates("name").show()

    /**
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 46|Michael|
      * | 30|   Andy|
      * | 19| Justin|
      * | 16|Michael|
      * | 29| Justin|
      * +---+-------+
      */
    personsDS.distinct().show()

    println("使用repartition算子设置分区:")

    /**
      * 原分区数:1
      */
    println("原分区数:" + personsDS.rdd.partitions.size)
    val repartitionDs = personsDS.repartition(4)

    /**
      * repartition设置分区数:4
      */
    println("repartition设置分区数:" + repartitionDs.rdd.partitions.size)

    println("使用coalesce算子设置分区:")

    /**
      * repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
      *
      * 他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)
      *
      * 1)、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
      *
      * 2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
      *
      * 3)如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。
      *
      * 总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的。
      *
      * coalesce 合并
       */
    val coalesced: Dataset[Person] = repartitionDs.coalesce(2)

    /**
      * coalesce设置分区数:2
      */
    println("coalesce设置分区数:" + coalesced.rdd.partitions.size)

    /**
      * +---+-------+
      * |age|   name|
      * +---+-------+
      * | 30|   Andy|
      * | 19| Justin|
      * | 29| Justin|
      * | 16|Michael|
      * | 46|Michael|
      * +---+-------+
      */
    coalesced.show

    spark.stop()

  }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容