Spark Structured Streaming写Hive HBase Mysql

组件版本

  • spark版本 2.3.1 (hdp)
  • hadoop 3.1.1 (hdp)
  • HDP hive 3.1.2
  • HBase 2.0.0
  • mysql 版本5.x

使用Spark Structured Streaming读取kafka的数据写入hive、HBase和MySQL在spark里没有原生支持,整理实测。

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.insight.spark</groupId>
    <artifactId>SparkDemo</artifactId>
    <version>1.1</version>

    <properties>
        <encoding>UTF-8</encoding>
        <spark.version>2.3.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/com.sun.jersey/jersey-core -->
        <dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-client</artifactId>
            <version>1.19</version>
        </dependency>

        <!-- Spark核心库 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jersey-client</artifactId>
                    <groupId>org.glassfish.jersey.core</groupId>
                </exclusion>
            </exclusions>
            <!-- <scope>provided</scope>-->
        </dependency>
        <!--Spark sql库 提供DF类API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--HBase相关库-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        
        <!--spark与hive交互 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>



    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

主要代码与使用方法

Usage: StructuredKafkaWordCount <bootstrap-servers> <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1] ……
程序接收多个参数:第一个是kafka的broker地址,第二个是消费的topic名称、第三个是输出类型,有4种,用 0 1 2 3 表示,第4个是checkpoint的路径,后续更多的参数可以传递给连接mysql使用。程序的逻辑是接收kafka的消息,做wordcount处理后输出结果。

package com.insight.spark.streaming

import com.insight.spark.util.ConfigLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

import java.sql
import java.sql.{DriverManager, PreparedStatement}
import java.util.UUID

object StructuredStreamingTest {
  System.setProperty("HADOOP_USER_NAME","hdfs")
  val conf: Configuration = HBaseConfiguration.create()

  def main(args: Array[String]): Unit = {
    SetLogLevel.setStreamingLogLevels()
    if (args.length < 2) {
      System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        " <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1]")
      System.exit(1)
    }

    val Array(bootstrapServers, topics, number, _*) = args
    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temp-spark-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topics)
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    /**
      * Start running the query with user params:0 1 2 3
      * 1:结果写入hive
      * 2:结果写入hbase
      * 3:结果写入mysql
      * 0/other:console 结果打印到控制台
      */
    val dsw = number match {
      //写hive
      case "1" =>
        wordCounts.writeStream
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime("10 seconds"))//批次时间
          .format("com.insight.spark.streaming.HiveSinkProvider")//自定义HiveSinkProvider
          .option("checkpointLocation", checkpointLocation)
          .queryName("write hive")

      case "2" =>
        wordCounts.writeStream
          .outputMode("update")
          .foreach(new ForeachWriter[Row] {
            var connection: Connection = _

            def open(partitionId: Long, version: Long): Boolean = {
              conf.set("hbase.zookeeper.quorum", ConfigLoader.getString("hbase.zookeeper.list"))
              conf.set("hbase.zookeeper.property.clientPort", ConfigLoader.getString("hbase.zookeeper.port"))
              conf.set("zookeeper.znode.parent", ConfigLoader.getString("zookeeper.znode.parent"))
              import org.apache.hadoop.hbase.client.ConnectionFactory
              connection = ConnectionFactory.createConnection(conf)
              true
            }

            def process(record: Row): Unit = {
              val tableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name")) //表名
              val table = connection.getTable(tableName)
              val put = new Put(Bytes.toBytes(record.mkString))
              put.addColumn("info".getBytes(), Bytes.toBytes("word"), Bytes.toBytes(record(0).toString))
              put.addColumn("info".getBytes(), Bytes.toBytes("count"), Bytes.toBytes(record(1).toString))
              table.put(put)
            }

            def close(errorOrNull: Throwable): Unit = {
              connection.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write hbase")

      case "3" =>

        /** 建表语句,先建个spark库
          * CREATE TABLE `words` (
          * `id` int(11) NOT NULL AUTO_INCREMENT,
          * `word` varchar(255) NOT NULL,
          * `count` int(11) DEFAULT 0,
          * PRIMARY KEY (`id`),
          * UNIQUE KEY `word` (`word`)
          * ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
          */
        val (url, user, pwd) = (args(4), args(5), args(6))
        wordCounts.writeStream
          .outputMode("complete")
          .foreach(new ForeachWriter[Row] {
            var conn: sql.Connection = _
            var p: PreparedStatement = _
            def open(partitionId: Long, version: Long): Boolean = {
              Class.forName("com.mysql.jdbc.Driver")
              conn = DriverManager.getConnection(url, user, pwd)
              p = conn.prepareStatement("replace into spark.words(word,count) values(?,?)")
              true
            }

            def process(record: Row): Unit = {
              p.setString(1, record(0).toString)
              p.setInt(2, record(1).toString.toInt)
              p.execute()
            }

            def close(errorOrNull: Throwable): Unit = {
              p.close()
              conn.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write mysql")

      case _ =>
        wordCounts.writeStream
          .outputMode("update")
          .format("console")
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .option("checkpointLocation", checkpointLocation)
          .queryName("print it")

    }

    dsw.start().awaitTermination()

  }
}

HiveSinkProvider源码

其中用到的HiveSinkProvider代码如下:

package com.insight.spark.streaming

import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.slf4j.LoggerFactory


case class HiveSink(sqlContext: SQLContext,
                    parameters: Map[String, String],
                    partitionColumns: Seq[String],
                    outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val logger = LoggerFactory.getLogger(this.getClass)

    val schema = StructType(Array(
      StructField("word", StringType),
      StructField("count", IntegerType)
    ))
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }
    // 转化df格式
    val df = data.sparkSession.createDataFrame(res, schema)
    df.write.mode(SaveMode.Append).format("hive").saveAsTable("words")

  }
}

class HiveSinkProvider extends StreamSinkProvider with DataSourceRegister {
  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    HiveSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "HiveSinkProvider"
}

打包运行,spark-submit --xxx this.jar ...就可以了。

点:结构化流、Spark Structured Streaming、hive、hbase、mysql
线:spark
面:内存计算

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • 17.分区分桶的区别,为什么要分区 分区表:原来的一个大表存储的时候分成不同的数据目录进行存储。如果说是单分区表,...
    qydong阅读 592评论 0 0
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 123,169评论 2 7
  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,018评论 0 4