今天主要来介绍一下SparkSql,2.x新版本操作hive的一个写法.
Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据
废话不多说,直接上代码:
package spark
import java.io.File
import java.util
import kafka.{PropertiesScalaUtils, RedisKeysListUtils}
import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams}
import net.sf.json.JSONObject
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import redis.RedisPool
/**
* 利用sparksql 2.0向hive中写数据;
*/
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
val spark = SparkSession.builder().appName("Spark SQL Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "2000")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.streaming.concurrentJobs", "10")
spark.conf.set("spark.streaming.kafka.maxRetries", "50")
@transient
val sc = spark.sparkContext
val scc = new StreamingContext(sc, Seconds(2))
val topic = "jason_20180511"
val topicSet: Set[String] = Set(topic) //设置kafka的topic;
val kafkaParams = Map[String, Object](
"auto.offset.reset" -> "latest",
"value.deserializer" -> classOf[StringDeserializer]
, "key.deserializer" -> classOf[StringDeserializer]
, "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
, "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
, "enable.auto.commit" -> (false: java.lang.Boolean)
)
val maxTotal = 200
val maxIdle = 100
val minIdle = 10
val testOnBorrow = false
val testOnReturn = false
val maxWaitMillis = 500
RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)
val jedis = RedisPool.getPool.getResource
jedis.select(dbIndex)
val keys: util.Set[String] = jedis.keys(topic + "*")
if (keys.size() == 0) {
kafkaStreams = KafkaUtils.createDirectStream[String, String](
scc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
} else {
val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic)
kafkaStreams = KafkaUtils.createDirectStream[String, String](
scc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets))
}
RedisPool.getPool.returnResource(jedis)
kafkaStreams.foreachRDD(rdd=>{
val jedis_jason = RedisPool.getPool.getResource
jedis_jason.select(dbIndex)
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
import spark.sql
if(!rdd.isEmpty()){
val rowRDD:RDD[Row] = rdd.map(x=>{
val json = JSONObject.fromObject(x.value().toString)
val a = json.get("name")
val b = json.get("addr")
Row(a,b)
})
val schemaString = "name addr"
val field = schemaString.split(" ").map(x=> StructField(x,StringType,nullable = true))
val schema = StructType(field)
val df = spark.createDataFrame(rowRDD, schema)
df.show()
df.createOrReplaceTempView("tempTable")
val sq = "insert into test_2 select * from tempTable"
sql(sq)
println("插入hive成功了")
}
offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")
}
})
scc.start()
scc.awaitTermination()
}
}
需要注意的是: spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录中的 spark-warehouse 目录 但请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。
还有一个问题是,这样写的话,会在hdfs上这个表的目录下生成很多的小文件,这个时候如果想在hive中进行统计,计算的时候,会产生很多个map,严重影响计算的速度,大家可以先考虑下这个问题.
为了解决在表目录下面生成很多小文件的问题,我们可以把hive表建成一个分区表,怎么建分区表在我的另一篇blog里面有写到,或者可以直接用: insert overwrite table combine_data partition (day_time='2018-08-01') select data,enter_time from combine_data where day_time = '2018-08-01';来合并小文件.
或者也可把用reparation减少分区数,但是这么写,会减少rdd的并行度,降低性能,自己参考使用.
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球