ComprehensivePractice


import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable

object Test01 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder()
      .master("local")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()
    //    dataMask(session, "D:\\testLogs\\userinfo.txt", "D:\\testLogs\\userinfo")
    //    datainfoToJson(session, "D:\\testLogs\\userinfo", "D:\\testLogs\\idRules.txt", "D:\\testLogs\\userinfoToJson")
    datainfoToMysql(session,"D:\\testLogs\\userinfoToJson","countprovince")
    session.close()
    session.stop()
  }

  // 1.使用spark读取文本文件,将身份证号脱敏,格式为: 620402********2111,姓名脱敏,为: 黄**,并将脱敏后的身份证和姓名保存为userInfo.parquet文件(只需要脱敏后的身份证号和姓名)
  def dataMask(session: SparkSession, input: String, output: String): Unit = {
    val sc: SparkContext = session.sparkContext
    //  350211199006033016|1|7|黄测佳|10|2|||1|||||2018-10-26|2018-10-26|||
// 1)、读取数据
    val rdd: RDD[String] = sc.textFile(input)
// 2)、引入隐式
    import session.implicits._
    val rdd1: RDD[(String, String)] = rdd.map(t => {
      // 3)、切分数据
      val str: mutable.ArrayOps[String] = t.split("\\|")
      // 4)、过滤脏数据(只需要格式正确的数据)
      if (str.size == 15 && !str(0).isEmpty && !str(3).isEmpty && (str(0).size == 15 || str(0).size == 18)) {
        // 5)、身份证号脱敏(中间八位替换为“*”)
        val id: String = str(0).replace(str(0).substring(6, 14), "********")
        var star = ""
        // 6)、姓名脱敏(保留姓氏,名替换为“*”)
        for (i <- 1 to str(3).trim.size - 1) star += "*"
        val name: String = str(3).trim.substring(0, 1) + star
        // 7)、数据有效输出,无效用(无效信息)替代
        (id, name)
      } else
        ("无效信息", "无效信息")
    })
    // 8)、写出为parquet格式的数据
    rdd1.toDF("idnumber", "uname").write.parquet(output)
  }

  //2.读取userInfo.parquet文件,统计每个人的省份,并将身份证号idnumber,姓名uname和省份信息province保存成userInfo.json格式文件
  def datainfoToJson(session: SparkSession, inputParquet: String, inputIdRules: String, output: String): Unit = {
    //    1)、读取parquet数据:(350211********3016,黄**)
    val userinfo: RDD[(String, String)] = session.read.parquet(inputParquet).rdd.map(t => (t.getString(0), t.getString(1)))
    //  2)、 读取文本数据: (福建350000)
    val idRules: Array[(String, String)] = session.read.textFile(inputIdRules).rdd.map(t => {
      // 3)、过滤如果不是数字为省份,是数字为编号
      (t.filterNot(_.isDigit), t.filter(_.isDigit))
     // 4)、RDD中不能操作RDD,通过collect操作为Array(小型文件)
    }).collect()
    // 5)、引入隐式
    import session.implicits._
    val df: DataFrame = userinfo.map(t => {
      // 6)、根据需求身份证号前三位和编号对比,得到省份信息(相当于二嵌套循环过滤数据)
      val tuples: Array[(String, String)] = idRules.filter(f => f._2.substring(0, 2) == t._1.substring(0, 2))
      // 7)、如果数据无效使用(未知)替代,否则输出数据
      if (tuples.isEmpty) {
        (t._1, t._2, "未知")
      } else {
        (t._1, t._2, tuples(0)._1)
      }
    }).toDF("idnumber", "uname", "province")
    // 8)、写出为json格式数据
    df.write.json(output)
  }

 //    3.统计各省份人次,(同一身份证多次出现则多次计算),按人次降序保存到sql的count_province表中, 使用配置文件载入数据库配置
  def datainfoToMysql(session:SparkSession,inputJson:String,tableName:String): Unit ={
    // 1)、读取json数据
    val frame: DataFrame = session.read.json(inputJson)
    // 2)、创建临时表
    frame.createTempView("count_province")
    // 3)、打印schema信息
    frame.printSchema()
    // 4)、获取mysql连接相关参数
    val param: ConnectionProperties = getPropertiesParams()
    // 5)、通过sql语句查询:省份、人次,并排序
    val df: DataFrame = session.sql("select count(uname) as count,province from count_province group by province order by count desc")
     // 6)、通过jdbc写出
    df.write.mode("append").jdbc(param.url, tableName, param.pro)
    println("Saving data to a JDBC source win!")
  }
  // 获取配置文件中的相关参数信息
  def getPropertiesParams() = {
    val config: Config = ConfigFactory.load("jdbcMysql.properties")
    val url: String = config.getString("MYSQL_URL")
    val user: String = config.getString("MYSQL_USER")
    val password: String = config.getString("MYSQL_PASSWORD")
    val table: String = config.getString("MYSQL_DBTABLE")
    val driver: String = config.getString("MYSQL_DRIVER")
    val pro: Properties = new Properties()
    pro.setProperty("user", user)
    pro.setProperty("password", password)
    pro.setProperty("driver", driver)
    ConnectionProperties(url, user, password, table, driver, pro)
  }
}


case class ConnectionProperties(url: String, user: String, password: String, table: String, driver: String, pro: Properties)

case class coutProvince(province: String, count: Long)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 下文写于两年前,一直未公开,现在也换公司了,公开发布。 (一)需求本质 用户需求本质来源于人的需求本质,根据马斯洛...
    kevin282阅读 15,703评论 0 28
  • 图为女儿为学校母女讲座活动的宣传画。 叶乃天然赐, 壶为匠巧成 杯馨朋友胃, 盏沁母媛情。
    舒己怀_Frank阅读 365评论 21 16
  • 最近因为网络问题,下棋不是很顺,关键时刻会断网! 不过也是棋力问题,现在有点上不去了,最高到业3-1。 最近输得多...
    木林森_阿木阅读 67评论 0 0