spark读写mysql、hive、kafka数据demo

读取hive库数据

pom.xml依赖配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取hive数据demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
              .setMaster("spark://master:7077")//申明spark运行模式
              .setAppName("risk")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                .config(conf)
                .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                .getOrCreate()
    import spark.implicits._
    spark.sql("use bmkp")
    val df= spark.sql("select * from customer")//在hive中执行sql语句,返回DataSet格式数据
    df.show()
    spark.stop()
  }
}

读取mysql数据

pom.xml配置文件

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取mysql数据demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
                  .setMaster("spark://master:7077")//申明spark运行模式
                  .setAppName("risk")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
                 .config(conf)
                 .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
                 .getOrCreate()
//读取mysql中数据,返回数据类型为DataSet
val df = spark.read
        .format("jdbc")
        .options(Map("url" ->   
//配置mysql连接参数,包括mysql ip 端口  数据库名称 登录名和密码
"jdbc:mysql://***.***.***.***:3036/bmkpstress?user=root&password=**********",
//定义驱动程序
         "driver"->"com.mysql.jdbc.Driver",
//编写sql  在mysql中执行该sql并返回数据
         "dbtable" -> "(select * from test group by id) as aaa"))
          .load()
    spark.stop()
  }
}

SPARKSTREAMING读取kafka数据

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

读取kafka数据demo

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object Main {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setMaster("spark://master:7077")
      .setAppName("kafka_hive");
    val spark = SparkSession.builder().master("spark://master:7077").config(conf).enableHiveSupport().getOrCreate()
    var ssc = new StreamingContext(conf, Seconds(10));
    var topics = Array("service_cksc","service_ckxc","service_dcyy");//kafka  topic名称
    var group = "bmkp" //定义groupID
    val kafkaParam = Map(   //申明kafka相关配置参数
      "bootstrap.servers" -> "***.104.42.127:19092,***.104.202.222:19092,***.135.73.152:19092", //kafka 集群IP及端口
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> group, //定义groupID
      "auto.offset.reset" -> "earliest",//设置丢数据模式  有 earliest,latest, none
      "enable.auto.commit" -> (false: java.lang.Boolean)//设置是否自动存储offset 这里设置为否
    );
    val offsetRanges = Array()
    var stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam))//从kafka读取数据 获取数据流
    stream.foreachRDD { rdd =>
      import spark.implicits._
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取offset
      /*
      这里处理从kafka获取的数据,在确定获取的数据已经存储或者处理后将该RDD的offset存储
       */
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //存储offset
    }
  }
}

SPARK写数据到HIVE

pom.xml配置信息

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
</dependency

写数据到hive库demo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object Main {
  case class Person(name:String,col1:Int,col2:String)
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
      .setMaster("spark://master:7077")//申明spark运行模式
      .setAppName("kettle")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    import spark.implicits._ //引入隐式转换 否则RDD无法转换成DataSet(DataFrame)
    spark.sql("use DataBaseName"//在hive中执行sql语句
    val data = spark.read.textFile("path")//读取hdfs中的文件,返回的是RDD格式数据,RDD格式数据不能直接写入hive,(这里代表任意的RDD类型数据)
      .map(x=>x.split(","))
      .map(x=>Person(x(0),x(1).toInt,x(2)))//利用用例类将RDD格式居转换成DataSetG格式数据,从而可以写入hive中
    data.toDF().createOrReplaceTempView("table1") //将DataSet格式数据映射到临时表中
/*
***********************************************************************************************************************
特别注意  这里会造成hive中出现大量小文件,需要对小文件进行合并
************************************************************************************************************************
*/
    spark.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")//在hive上运行sql语句将临时表中数据抽出并存入hive中
    spark.close()
  }
}

写数据到mysql

pom.xml配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

spark 写数据到mysql库demo 1

  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SparkSession
object Main {
  case class Blog(name: String, count: Int)
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setMaster("spark://master:7077")//申明spark运行模式
      .setAppName("kettle")//设置job名称(可不写)
    val spark = SparkSession.builder()//spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport()//添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
    val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
    var conn: Connection = null//定义mysql连接
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"//需要执行的sql语句,两个 “?”代表后面需要替换的数据
    data.foreachPartition(rdd=>
      try {
        //具体定义mysql的驱动管理器,主要设置mysql地址   端口  数据库  用户名  密码
        conn = DriverManager.getConnection("jdbc:mysql://***.***.***.***:3306/test","root", "******")
        rdd.toIterator.foreach(data => {
          ps = conn.prepareStatement(sql)
          ps.setString(1, data._1)//将需要写入mysql的数据进行映射
          ps.setInt(2, data._2)
          ps.executeUpdate()//在mysql上执行sql语句将数据插入到相应的表中
        })
      } catch {
        case e: Exception => println("Mysql Exception")
      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()//关闭mysql连接
        }
      })
  }
}

写数据到mysql库demo 2

  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{SaveMode, SparkSession}

object Main {
  case class Blog(name: String, count: Int)
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setMaster("spark://master:7077") //申明spark运行模式
      .setAppName("kettle")
    //设置job名称(可不写)
    val spark = SparkSession.builder() //spark-2.0采用SparkSession代替sparkContext
      .config(conf)
      .enableHiveSupport() //添加对HIVE的支持,否则无法访问hive库
      .getOrCreate()
    //获取RDD数据  这里只是做一个实例 代表spark处理产生的所有RDD类型的数据
    val data = spark.sparkContext.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
    import spark.implicits._
    val df = data.map(x=>new Blog(x._1,x._2)).toDF()//将RDD类型数据转换成DataSet类型
    df.write.mode(SaveMode.Append).format("jdbc")
      .option("url", "jdbc:mysql://***.***.***.***:3306/test")//定义mysql 地址 端口 数据库
      .option("dbtable", "blog")//定义需要插入的mysql目标表
      .option("user", "****")//定义登录用户名
      .option("password", "************")//定义登录密码
      .save()//保存数据
  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容