Spark读写mysql表数据进行统计分析

一、Mysql数据样例

数据表为job_details,数据样例如下:


| job_id | job_name            | job_url                                 | job_location | job_salary | job_company                             | job_experience | job_class | job_given                                    | job_detail                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | company_type | company_person | search_key | city   |

|      1 | java开发工程师      | https://www.lagou.com/jobs/6713118.html | 南山区       | 9k-12k     | 深圳市长亮科技股份有限公司              | 1-3年          | 本科      | 五险一金 年休假 公司总部技术部               | 岗位职责:1、负责项目开发和信贷系统开发与维护任职要求:1、本科及以上学历,1年以上Java开发经验,编程基础扎实;2. 熟悉java开发语言,熟悉各种主流框架:Spring Boot、Spring Cloud3. 对常用的消息队列,mq、redis等有一定了解; 熟悉常用的设计模式,有一定的分布式的经验;熟悉数据库常用的优化模式4.有金融和电商经验优先,有参与过信贷项目的经验优先。                                                                                                                                                     | 企业服务     | 2000人以上     | java       | 广东   |


需求1:求取每个搜索关键字search_key下的职位数量,并将结果入库mysql,注意:实现高效入库
需求2:求取每个搜索关键字search_key岗位下最高薪资的工作信息

二、Maven依赖

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>

三、Spark代码

import java.sql.{Connection, DriverManager}

import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import scala.tools.scalap.scalax.util.StringUtil

//定义数据库记录样例类
case class Job_Detail(job_id:String, job_name:String, job_url:String,job_location:String, job_salary:String,
                      job_company:String,job_experience:String,job_class:String,job_given:String,
                      job_detail:String, company_type:String,company_person:String ,
                      search_key:String, city:String)

/**

 * | job_id | job_name            | job_url                                 | job_location | job_salary | job_company                             | job_experience | job_class | job_given                                    | job_detail                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | company_type | company_person | search_key | city   |

 * |      1 | java开发工程师      | https://www.lagou.com/jobs/6713118.html | 南山区       | 9k-12k     | 深圳市长亮科技股份有限公司              | 1-3年          | 本科      | 五险一金 年休假 公司总部技术部               | 岗位职责:1、负责项目开发和信贷系统开发与维护任职要求:1、本科及以上学历,1年以上Java开发经验,编程基础扎实;2. 熟悉java开发语言,熟悉各种主流框架:Spring Boot、Spring Cloud3. 对常用的消息队列,mq、redis等有一定了解; 熟悉常用的设计模式,有一定的分布式的经验;熟悉数据库常用的优化模式4.有金融和电商经验优先,有参与过信贷项目的经验优先。                                                                                                                                                     | 企业服务     | 2000人以上     | java       | 广东   |

 */
object JdbcOperate {

  //定义一个函数,无参,返回一个jdbc的连接
  val getConn:() => Connection = () => {
    DriverManager.getConnection("jdbc:mysql://node03:3306/mydb?characterEncoding=UTF-8","root","123456")
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("jdbcOperate").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")


    /**
     * class JdbcRDD[T: ClassTag](
     * sc: SparkContext,
     * getConnection: () => Connection,
     * sql: String,
     * lowerBound: Long,
     * upperBound: Long,
     * numPartitions: Int,
     * mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
     */
    //创建RDD,这个RDD会记录以后从Mysql中读取的数据
    val jdbcRDD: JdbcRDD[Job_Detail] = new JdbcRDD(
      sc,
      getConn,
      "select * from jobdetail where job_id >? and job_id <?",
      1,
      75000,
      8,
      rs => {
        val job_id = rs.getString(1)
        val job_name: String = rs.getString(2)
        val job_url = rs.getString(3)
        val job_location: String = rs.getString(4)
        val job_salary = rs.getString(5)
        val job_company: String = rs.getString(6)
        val job_experience = rs.getString(7)
        val job_class: String = rs.getString(8)
        val job_given = rs.getString(9)
        val job_detail: String = rs.getString(10)
        val company_type = rs.getString(11)
        val company_person: String = rs.getString(12)
        val search_key = rs.getString(13)
        val city: String = rs.getString(14)
        Job_Detail(job_id, job_name, job_url,job_location, job_salary, job_company,job_experience,job_class,
          job_given,job_detail, company_type,company_person ,search_key, city)
      }
    )

    //需求一:求取每个搜索关键字`search_key`下的职位数量,并将结果入库mysql,注意:实现高效入库
    //过滤search_key为空的记录
    val jdbcRDD2: RDD[Job_Detail] = jdbcRDD.filter(x => x.search_key != null)
    //获取search_key及对应的记录集合,组装成(String, Iterable[Job_Detail])类型
    val searchKey: RDD[(String, Iterable[Job_Detail])] = jdbcRDD2.groupBy(x => x.search_key)
    //获取search_key对应的集合长度,即岗位数量
    val resultRdd: RDD[(String, Int)] = searchKey.map(x => (x._1, x._2.size))
    //数据量变少,减少分区
    val resultRdd2: RDD[(String, Int)] = resultRdd.coalesce(2)
    //将计算结果写入Mysql
    resultRdd2.foreachPartition(iter => {
      //创建数据库连接
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://node03:3306/mydb?characterEncoding=UTF-8", "root", "123456")
      //关闭自动提交
      conn.setAutoCommit(false)
      val statement = conn.prepareStatement("insert into job_count(search_name,job_num) values (?,?)")
      //遍历
      iter.foreach(record=>{
        //赋值操作
        statement.setString(1,record._1)
        statement.setInt(2,record._2)
        //添加到一个批处中
        statement.addBatch()
      })
      //批量提交该分区所有数据
      statement.executeBatch()
      conn.commit()
      conn.close()
      //关闭资源
      statement.close()
    })
    //需求二,求取每个搜索关键字岗位下最高薪资的工作信息
    val getEachJobs: RDD[(String, Iterable[Job_Detail])] = jdbcRDD.groupBy(x => x.search_key)
    val maxJobDetail: RDD[Job_Detail] = getEachJobs.map(x => {
      val value: Iterable[Job_Detail] = x._2
      val array: Array[Job_Detail] = value.toArray
      array.maxBy(x => {
        val job_salary: String = x.job_salary
        val result = if (StringUtils.isNotEmpty(job_salary) && job_salary.contains("k")
          && job_salary.contains("-") && job_salary.replace("k", "").split("-").length >= 2) {
          val strings: Array[String] = job_salary.replace("k", "").split("-")
          val result2: Int = if (strings.length >= 2) {
            strings(1).toInt
          } else {
            0
          }
          result2
        } else {
          0
        }
        result
      })
    })
//    val details = maxJobDetail.foreach(x=>println(x.job_id+"\t"+x.job_salary+"\t"+x.search_key+"\t"+x.job_company))
    /**
     * 33386    45k-80k php 贝壳找房(北京)科技有限公司
     * 20350    60k-90k 数据分析    青岛硕智博睿管理顾问有限公司
     */
    val details: Array[Job_Detail] = maxJobDetail.collect()
    details.foreach(x =>{
      println(x.job_id + "\t" + x.job_salary + "\t" + x.search_key + "\t" + x.job_company)
    })

    /**
     * 1325 60k-100k    数据仓库    北京世纪好未来教育科技有限公司
     * 65135    120k-200k   人工智能    北京华商天诚投资基金管理有限公司
     */
    sc.stop()
  }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容