摘要:Spark
,ElasticSearch
依赖准备
注意Scala的版本(2.11),es的版本(6.7,2)和Maven仓库中的jar一致
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.7.2</version>
</dependency>
读es生成Sparek DataFrame
scala> val df = spark.read.format("org.elasticsearch.spark.sql")
.options(
Map("es.nodes" -> "192.168.61.240",
"es.port" -> "8200")
).load("ent_label")
只需要指定es的ip和端口号即可,这个读进来是全表的所有字段。
另一种使用spark.esDF结合DSL
查询语句实现条件过滤查询,例如过滤条件为isList等于Y
scala> import org.elasticsearch.spark.sql._
scala> val options = Map("es.nodes" -> "192.168.61.240", "es.port" -> "8200")
scala> val df = spark.esDF("ent_label/_doc", "?q=isList:Y", options)
使用请求体
scala> val df = spark.esDF("ent_label/_doc", """{"query":{"match":{"isList":"Y"}}}""", Map("es.nodes" -> "192.168.61.240", "es.port" -> "8200"))
Spark DataFrame写入es
scala> val df = Seq((1, "d"), (2, "c")).toDF("id", "a")
scala> df.write.format("org.elasticsearch.spark.sql")
.options(Map("es.nodes" -> "192.168.61.240",
"es.port" -> "8200",
"es.mapping.id" -> "id",
"es.index.auto.create" -> "true"))
.mode("append")
.save("my_label/_doc")
写入的时候除了ip和port之外,需要指定es主键字段es.mapping.id
,如果不设置_id字段会自动随机生成,mode存在overwrite
和append
两种模型,overwrite为完全覆盖,即先删除某个索引所有数据再插入,append为只对DataFrame的id在索引中的,删除索引的文档数据,重新插入。最后指定save到哪个es索引,my_label/_doc
为索引名和type名,新版本es的index中只有一个默认的type即_doc
。
append只是文档级别的追加,不是字段级别的,如果要实现字段级别的有则更新无则插入,需要在options中指定参数
"es.write.operation" -> "upsert"
,或者"es.write.operation" -> "update"
,这个和ES本身的update,upsert操作效果是一致的
例如使用upsert,当id不在时更新,id存在是修改(append),修改的方式是字段不存在就插入,存在就修改(upsert)
df.write.format("org.elasticsearch.spark.sql").options(
Map("es.nodes" -> "192.168.61.240",
"es.port" -> "8200",
"es.mapping.id" -> "id",
"es.index.auto.create" -> "true",
"es.write.operation" -> "upsert")).mode("append").save("my_label/_doc")
如果指定es.index.auto.create -> update,则DataFrame中的id列不能出现es索引中不存在的,否则直接报错
注意:如果Spark的DataFrame中列的值为None(null),但是es中现有字段有值,则采用upsert+append模式之后
不会更新,还是采用es的现有值
还要一种写法使用saveToEs
api,这种貌似只能append插入
scala> import org.elasticsearch.spark.sql._
scala> df.saveToEs("my_label/_doc", Map("es.nodes" -> "192.168.61.240","es.port" -> "8200","es.mapping.id" -> "id", "es.index.auto.create" -> "true"))
最后看一下es中插入数据
GET /my_label/_doc/_search
{
"query": {
"match_all": {}
}
}
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "my_label",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"a" : "c"
}
},
{
"_index" : "my_label",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"a" : "d"
}
}
]
}
}