elasticsearch spark

试用场景:TB级历史数据导入(hdfs2es)

1.添加maven依赖

   <dependency>
     <groupId>org.elasticsearch</groupId>
     <artifactId>elasticsearch-spark-20_2.11</artifactId>
     <version>5.1.1</version>
   </dependency>

   <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-core_2.11</artifactId>
     <version>2.1.1</version>
   </dependency>

注:①5.1.1添加elasticsearch-hadoop会报错
  ②es-spark版本与es版本一致

2.编写spark程序

package com.hualala.bi

import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
//隐式转换  rdd savetoes
import org.elasticsearch.spark._

object esSparkApp {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
          .setMaster("local").setAppName("es-spark-test")

    val inputPath = args.apply(0)
    val nodes = args.apply(1)
    
    //配置es参数 包括id routing
    conf.set("es.nodes", nodes)
    conf.set("es.index.auto.create", "true")
    conf.set("es.mapping.id", "id")
    conf.set("es.mapping.routing", "rout")
    conf.set("es.input.json", "yes")

    val sc = new SparkContext(conf)

    val dataRdd = sc.textFile(inputPath)

    //处理字段  id routing
    val billRDD = dataRdd.map(...)

    billRDD.saveToEs("{index}/{type}")

    sc.stop()
  }

}

3.es优化设置

①关闭动态索引

PUT  {index}/{type}/_mapping -d'{"dynamic":false}'

注:id rout 会保存到source 但是不会被索引
②优化gc算法
默认cms 更改为g1 大内存cms stop word会引起节点脱离
③增加refresh_interval、translog flush size、将durability同步改为异步
④加大zen.discover相关设置
⑤一次程序不建议写入过多的索引(100+) 要合理设计索引

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容