一、Mysql数据样例
数据表为job_details,数据样例如下:
+--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
| job_id | job_name | job_url | job_location | job_salary | job_company | job_experience | job_class | job_given | job_detail | company_type | company_person | search_key | city |
+--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
| 1 | java开发工程师 | https://www.lagou.com/jobs/6713118.html | 南山区 | 9k-12k | 深圳市长亮科技股份有限公司 | 1-3年 | 本科 | 五险一金 年休假 公司总部技术部 | 岗位职责:1、负责项目开发和信贷系统开发与维护任职要求:1、本科及以上学历,1年以上Java开发经验,编程基础扎实;2. 熟悉java开发语言,熟悉各种主流框架:Spring Boot、Spring Cloud3. 对常用的消息队列,mq、redis等有一定了解; 熟悉常用的设计模式,有一定的分布式的经验;熟悉数据库常用的优化模式4.有金融和电商经验优先,有参与过信贷项目的经验优先。 | 企业服务 | 2000人以上 | java | 广东 |
+--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
需求1:求取每个搜索关键字search_key
下的职位数量,并将结果入库mysql,注意:实现高效入库
需求2:求取每个搜索关键字search_key
岗位下最高薪资的工作信息
二、Maven依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
三、Spark代码
import java.sql.{Connection, DriverManager}
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import scala.tools.scalap.scalax.util.StringUtil
//定义数据库记录样例类
case class Job_Detail(job_id:String, job_name:String, job_url:String,job_location:String, job_salary:String,
job_company:String,job_experience:String,job_class:String,job_given:String,
job_detail:String, company_type:String,company_person:String ,
search_key:String, city:String)
/**
* +--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
* | job_id | job_name | job_url | job_location | job_salary | job_company | job_experience | job_class | job_given | job_detail | company_type | company_person | search_key | city |
* +--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
* | 1 | java开发工程师 | https://www.lagou.com/jobs/6713118.html | 南山区 | 9k-12k | 深圳市长亮科技股份有限公司 | 1-3年 | 本科 | 五险一金 年休假 公司总部技术部 | 岗位职责:1、负责项目开发和信贷系统开发与维护任职要求:1、本科及以上学历,1年以上Java开发经验,编程基础扎实;2. 熟悉java开发语言,熟悉各种主流框架:Spring Boot、Spring Cloud3. 对常用的消息队列,mq、redis等有一定了解; 熟悉常用的设计模式,有一定的分布式的经验;熟悉数据库常用的优化模式4.有金融和电商经验优先,有参与过信贷项目的经验优先。 | 企业服务 | 2000人以上 | java | 广东 |
* +--------+---------------------+-----------------------------------------+--------------+------------+-----------------------------------------+----------------+-----------+----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+------------+--------+
*/
object JdbcOperate {
//定义一个函数,无参,返回一个jdbc的连接
val getConn:() => Connection = () => {
DriverManager.getConnection("jdbc:mysql://node03:3306/mydb?characterEncoding=UTF-8","root","123456")
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("jdbcOperate").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
/**
* class JdbcRDD[T: ClassTag](
* sc: SparkContext,
* getConnection: () => Connection,
* sql: String,
* lowerBound: Long,
* upperBound: Long,
* numPartitions: Int,
* mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
*/
//创建RDD,这个RDD会记录以后从Mysql中读取的数据
val jdbcRDD: JdbcRDD[Job_Detail] = new JdbcRDD(
sc,
getConn,
"select * from jobdetail where job_id >? and job_id <?",
1,
75000,
8,
rs => {
val job_id = rs.getString(1)
val job_name: String = rs.getString(2)
val job_url = rs.getString(3)
val job_location: String = rs.getString(4)
val job_salary = rs.getString(5)
val job_company: String = rs.getString(6)
val job_experience = rs.getString(7)
val job_class: String = rs.getString(8)
val job_given = rs.getString(9)
val job_detail: String = rs.getString(10)
val company_type = rs.getString(11)
val company_person: String = rs.getString(12)
val search_key = rs.getString(13)
val city: String = rs.getString(14)
Job_Detail(job_id, job_name, job_url,job_location, job_salary, job_company,job_experience,job_class,
job_given,job_detail, company_type,company_person ,search_key, city)
}
)
//需求一:求取每个搜索关键字`search_key`下的职位数量,并将结果入库mysql,注意:实现高效入库
//过滤search_key为空的记录
val jdbcRDD2: RDD[Job_Detail] = jdbcRDD.filter(x => x.search_key != null)
//获取search_key及对应的记录集合,组装成(String, Iterable[Job_Detail])类型
val searchKey: RDD[(String, Iterable[Job_Detail])] = jdbcRDD2.groupBy(x => x.search_key)
//获取search_key对应的集合长度,即岗位数量
val resultRdd: RDD[(String, Int)] = searchKey.map(x => (x._1, x._2.size))
//数据量变少,减少分区
val resultRdd2: RDD[(String, Int)] = resultRdd.coalesce(2)
//将计算结果写入Mysql
resultRdd2.foreachPartition(iter => {
//创建数据库连接
val conn: Connection = DriverManager.getConnection("jdbc:mysql://node03:3306/mydb?characterEncoding=UTF-8", "root", "123456")
//关闭自动提交
conn.setAutoCommit(false)
val statement = conn.prepareStatement("insert into job_count(search_name,job_num) values (?,?)")
//遍历
iter.foreach(record=>{
//赋值操作
statement.setString(1,record._1)
statement.setInt(2,record._2)
//添加到一个批处中
statement.addBatch()
})
//批量提交该分区所有数据
statement.executeBatch()
conn.commit()
conn.close()
//关闭资源
statement.close()
})
//需求二,求取每个搜索关键字岗位下最高薪资的工作信息
val getEachJobs: RDD[(String, Iterable[Job_Detail])] = jdbcRDD.groupBy(x => x.search_key)
val maxJobDetail: RDD[Job_Detail] = getEachJobs.map(x => {
val value: Iterable[Job_Detail] = x._2
val array: Array[Job_Detail] = value.toArray
array.maxBy(x => {
val job_salary: String = x.job_salary
val result = if (StringUtils.isNotEmpty(job_salary) && job_salary.contains("k")
&& job_salary.contains("-") && job_salary.replace("k", "").split("-").length >= 2) {
val strings: Array[String] = job_salary.replace("k", "").split("-")
val result2: Int = if (strings.length >= 2) {
strings(1).toInt
} else {
0
}
result2
} else {
0
}
result
})
})
// val details = maxJobDetail.foreach(x=>println(x.job_id+"\t"+x.job_salary+"\t"+x.search_key+"\t"+x.job_company))
/**
* 33386 45k-80k php 贝壳找房(北京)科技有限公司
* 20350 60k-90k 数据分析 青岛硕智博睿管理顾问有限公司
*/
val details: Array[Job_Detail] = maxJobDetail.collect()
details.foreach(x =>{
println(x.job_id + "\t" + x.job_salary + "\t" + x.search_key + "\t" + x.job_company)
})
/**
* 1325 60k-100k 数据仓库 北京世纪好未来教育科技有限公司
* 65135 120k-200k 人工智能 北京华商天诚投资基金管理有限公司
*/
sc.stop()
}
}