spark读取Hive

导入依赖

导入关键的依赖包

    compile("org.scala-lang:scala-library:$scalaVersion")
    compile("org.scala-lang:scala-reflect:$scalaVersion")
    compile("org.scala-lang:scala-compiler:$scalaVersion")

    compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
    compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")

启动hive支持

val warehouseLocation = new File("spark-warehouse").getAbsolutePath
   //配置spark
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
      .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
      .config("hive.input.dir.recursive", "true")
      .config("hive.mapred.supports.subdirectories", "true")
      .config("hive.supports.subdirectories", "true")
      .config("spark.driver.maxResultSize", "5g")
      //启动hive支持
      .enableHiveSupport()
      .getOrCreate()
    var startDay = "2019-03-08 00:00:00"
    var endDay = "2019-03-10 23:59:59"
    var srcIp = "10.28.137.84"
    //直接使用sparksql进行查询,返回为df
    var resultDf = spark.sql("select * from http_origin where  date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '"+startDay+"'" +
      "and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '"+endDay+"' and http_origin.srcip = '"+srcIp+"'")

hive与hbase关联,可以作为一种查询hbase的方式

创建hive对应的映射语句

CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='\t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='\t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')

将结果保存csv到HDFS

var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/" + "123"
        resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
         .save(url);
//        //创建样例列表

创建视图返回局部结果

       resultDf.registerTempTable("offlineResult")
//      var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader," +
//        "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
        var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader," +
  "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
      samples.show()
           var rows = samples.collect()
           for(row <- rows){
              println(row(1),row(0),row(7))
           }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容