ES使用
分布式全文搜索引擎,文档数据库
- ES是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索。
- ES能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。
- GitHub的搜索引擎是基于ES构建的
基本概念
- Near Realtime(NRT) 几乎实时
- Cluster集群
- Node节点
- Index索引 ---------------- Database(数据库)
- Type类型 ---------------- Table(表)
- Document文档 ---------------- Row(数据行)
- Field字段 ---------------- Column(数据列)
- Mapping映射 ---------------- Schema(模式)
- Shards & Replicas分片与副本
curl命令操作ES
- _cat系列 curl -XGET localhost:9200/_cat/***
- _cluster系列
- _nodes系列
索引的增删
# 创建索引www
curl -X PUT 127.0.0.1:9200/www
# 删除索引www
curl -X DELETE 127.0.0.1:9200/www
节点状态查询
curl -XGET http://localhost:9200/_nodes/stats?pretty=true
curl -XGET http://localhost:9200/_nodes/192.168.1.2/stats?pretty=true
curl -XGET http://localhost:9200/_nodes/process
curl -XGET http://localhost:9200/_nodes/_all/process
curl -XGET http://localhost:9200/_nodes/192.168.1.2,192.168.1.3/jvm,process
curl -XGET http://localhost:9200/_nodes/192.168.1.2,192.168.1.3/info/jvm,process
curl -XGET http://localhost:9200/_nodes/192.168.1.2,192.168.1.3/_all
curl -XGET http://localhost:9200/_nodes/hot_threads
cluster操作
# 1、查询设置集群状态
curl -XGET localhost:9200/_cluster/health?pretty=true
pretty=true表示格式化输出
level=indices 表示显示索引状态
level=shards 表示显示分片信息
# 2、显示集群系统信息,包括CPU JVM等等
curl -XGET localhost:9200/_cluster/stats?pretty=true
# 3、集群的详细信息。包括节点、分片
curl -XGET localhost:9200/_cluster/state?pretty=true
# 4、获取集群堆积的任务
curl -XGET localhost:9200/_cluster/pending_tasks?pretty=true
# 3、修改集群配置
curl -XPUT localhost:9200/_cluster/settings -d '{
"persistent" : {
"discovery.zen.minimum_master_nodes" : 2
}
}'
transient 表示临时的,persistent表示永久的
# 4、对shard的手动控制,参考http://zhaoyanblog.com/archives/687.html
curl -XPOST localhost:9200/_cluster/reroute -d `xxxxxx`
# 5、关闭节点, 关闭指定192.168.1.1节点
curl -XPOST http://192.168.1.1:9200/_cluster/nodes/_local/_shutdown
curl -XPOST http://localhost:9200/_cluster/nodes/192.168.1.1/_shutdown
关闭主节点
curl -XPOST http://localhost:9200/_cluster/nodes/_master/_shutdown
关闭整个集群
$ curl -XPOST http://localhost:9200/_shutdown?delay=10s
$ curl -XPOST http://localhost:9200/_cluster/nodes/_shutdown
$ curl -XPOST http://localhost:9200/_cluster/nodes/_all/_shutdown
delay=10s表示延迟10秒关闭
索引操作
# 1、获取索引
curl -XGET http://localhost:9200/{index}/{type}/{id}
# 2、索引数据
curl -XPOST http://localhost:9200/{index}/{type}/{id} -d'{"a":"avalue","b":"bvalue"}'
# 3、删除索引
curl -XDELETE http://localhost:9200/{index}/{type}/{id}
# 4、设置mapping
curl -XPUT http://localhost:9200/{index}/{type}/_mapping -d '{
"{type}" : {
"properties" : {
"date" : {
"type" : "long"
},
"name" : {
"type" : "string",
"index" : "not_analyzed"
},
"status" : {
"type" : "integer"
},
"type" : {
"type" : "integer"
}
}
}
}'
# 5、获取mapping
curl -XGET http://localhost:9200/{index}/{type}/_mapping
# 6、搜索
curl -XGET 'http://localhost:9200/{index}/{type}/_search' -d '{
"query" : {
"term" : { "user" : "kimchy" } //查所有 "match_all": {}
},
"sort" : [{ "age" : {"order" : "asc"}},{ "name" : "desc" } ],
"from":0,
"size":100
}'
curl -XGET 'http://localhost:9200/{index}/{type}/_search' -d '{
"filter": {"and":{"filters":[{"term":{"age":"123"}},{"term":{"name":"张三"}}]},
"sort" : [{ "age" : {"order" : "asc"}},{ "name" : "desc" } ],
"from":0,
"size":100
}
golang使用ES
连接ES
package main
import (
"github.com/elastic/go-elasticsearch"
"log"
)
func main() {
// NewDefaultClient 默认客户端 127.0.0.1:9200
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
"http://localhost:9201",
},
// ...
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 获取 当前es的详细信息
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Printf("%#v\n",res)
log.Printf("%v\n",res)
}
新增
package main
import (
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch"
"github.com/elastic/go-elasticsearch/esapi"
"log"
"strconv"
"strings"
)
func main() {
// NewDefaultClient 默认客户端 127.0.0.1:9200
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
// ...
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 获取 当前es的详细信息
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
for i, title := range []string{"Test One", "Test Two"} {
var b strings.Builder
b.WriteString(`{"title" : "`)
b.WriteString(title)
b.WriteString(`"}`)
req := esapi.IndexRequest{
Index: "test",
DocumentType: "",
DocumentID: strconv.Itoa(i+1),
Body: strings.NewReader(b.String()),
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Printf("[%s] Error indexing document ID=%d", res.Status(), i+1)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
}else {
// 打印响应状态和索引文档版本.
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}
}
查询
package main
import (
"bytes"
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch"
"log"
"strings"
)
func main() {
// NewDefaultClient 默认客户端 127.0.0.1:9200
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
// ...
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// 获取 当前es的详细信息
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
var buf bytes.Buffer
query :=map[string]interface{}{
"query":map[string]interface{}{
"match":map[string]interface{}{
"title":"test",
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// 执行搜索请求.
res, err = es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex("test"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}else {
// Print the response status and error information.
log.Fatalf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// 打印响应状态,结果数和请求持续时间.
log.Printf(
"[%s] %d hits; took: %dms",
res.Status(),
int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
int(r["took"].(float64)),
)
// 打印每次匹配的ID和文档来源.
for _, hit :=range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
}
log.Println(strings.Repeat("=", 37))
}