es的使用与原理12 -- shard分布式,es内部乐观锁方案,bulk语法,批处理等等

浅析es分布式架构

1 Elasticsearch是一套分布式的系统,分布式是为了应对大数据量,隐藏了复杂的分布式机制
2 如图
3 es会尽量保证所有shard在es集群中的均衡,如7个shard分配给6个节点时,会有一个节点分配到2个shard,现在加上新加入一个节点,那么es会保证每个节点都是一个shard。
4 master节点负责管理es集群元数据
5 节点平等的分布式架构
每个节点都能接收所有的请求,自动请求路由 ,路由之后的返回数据依然通过这个节点返回给客户端


image.png
浅析shard&replica机制

(1)index包含多个shard
(2)每个shard都是一个最小工作单元,承载部分数据,lucene实例,完整的建立索引和处理请求的能力
(3)增减节点时,shard会自动在nodes中负载均衡
(4)primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard
(5)replica shard是primary shard的副本,负责容错,以及承担读请求负载
(6)primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改
(7)primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard
(8)primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上


image.png
index在node节点中的分配情况

单node环境下创建index是什么样子的
(1)单node环境下,创建一个index,有3个primary shard,3个replica shard
(2)集群status是yellow
(3)这个时候,只会将3个primary shard分配到仅有的一个node上去,另外3个replica shard是无法分配的
(4)集群可以正常工作,但是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求
创建索引语句如下

PUT /test_index
{
   "settings" : {
      "number_of_shards" : 3, // primary 的数量
      "number_of_replicas" : 1//每个primary 有1个replicas副本
   }
}
image.png

2个node环境下replica shard是如何分配的


image.png
es扩容过程,如何超出扩容极限,以及如何提升容错性

(1)primary&replica自动负载均衡,6个shard,3 primary,3 replica
(2)每个node有更少的shard,IO/CPU/Memory资源给每个shard分配更多,每个shard性能更好
(3)扩容的极限,6个shard(3 primary,3 replica),最多扩容到6台机器,每个shard可以占用单台服务器的所有资源,性能最好
(4)超出扩容极限,动态修改replica数量,9个shard(3primary,6 replica),扩容到9台机器,比3台机器时,拥有3倍的读吞吐量
(5)3台机器下,9个shard(3 primary,6 replica),资源更少,但是容错性更好,最多容纳2台机器宕机,6个shard只能容纳1台机器宕机


image.png

es 容错机制:master选举,replica容错,数据恢复


image.png
es元数据

1、_index元数据
(1)代表一个document存放在哪个index中
(2)index中包含了很多类似的document:类似是什么意思,其实指的就是说,这些document的fields很大一部分是相同的,你说你放了3个document,每个document的fields都完全不一样,这就不是类似了,就不太适合放到一个index里面去了。
(3)索引名称必须是小写的,不能用下划线开头,不能包含逗号:product,website,blog
2、_type元数据
(1)代表document属于index中的哪个类别(type)
(2)一个索引通常会划分为多个type,逻辑上对index中有些许不同的几类数据进行分类:因为一批相同的数据,可能有很多相同的fields,但是还是可能会有一些轻微的不同,可能会有少数fields是不一样的,举个例子,就比如说,商品,可能划分为电子商品,生鲜商品,日化商品,等等。
(3)type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号
3、_id元数据
(1)代表document的唯一标识,与index和type一起,可以唯一标识和定位一个document
(2)我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id

为什么相似的数据要放到一个index里面图解
es手动生成id与自动生成id

手动
一般来说,是从某些其他的系统中,导入一些数据到es时,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。我们之前插入数据的方式都是手动
自动
语法是: put /index/type/id
如果说,我们是在做一个系统,这个系统主要的数据存储就是es一种,也就是说,数据产生出来以后,可能就没有id,直接就放es一个存储,那么这个时候,
可能就不太适合说手动指定document id的形式了,因为你也不知道id应该是什么,此时可以采取下面要讲解的让es自动生成id的方式。
语法是:post /index/type (到这里就够了 不需要指定id) 如:

POST /test_index/test_type 
{
  "test_content": "my test"
}

自动生成的id,长度为20个字符,URL安全,base64编码,GUID,分布式系统并行生成时不可能会发生冲突


image.png
document的全量替换、强制创建以及lazy delete机制

1、document的全量替换
(1)语法与创建文档是一样的,如果document id不存在,那么就是创建;如果document id已经存在,那么就是全量替换操作,替换document的json串内容
(2)document是不可变的,如果要修改document的内容,第一种方式就是全量替换,直接对document重新建立索引,替换里面所有的内容
(3)es会将老的document标记为deleted,然后新增我们给定的一个document,当我们创建越来越多的document的时候,es会在适当的时机在后台自动删除标记为deleted的document
2、document的强制创建
(1)创建文档与全量替换的语法是一样的,有时我们只是想新建文档,不想替换文档,如果强制进行创建呢?
(2)PUT /index/type/id?op_type=create,或者 PUT /index/type/id/_create
3、document的删除
(1)DELETE /index/type/id
(2)不会理解物理删除,只会将其标记为deleted,当数据越来越多的时候,在后台自动删除

es并发冲突问题
image.png
解析悲观锁与乐观锁两种并发控制方案

es用的解决方案是乐观锁方案,其类似于juc中的CAS


image.png
es内部如何基于_version进行乐观锁并发控制,后面还有es的全局锁,悲观锁,乐观锁(读写锁)的方案
image.png

第一次创建一个document的时候,它的_version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1

我们会发现,在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1

乐观锁测试
1 先构造一条数据出来

PUT /test_index/test_type/16
{
  "test_field": "test test"
}
//返回数据
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "16",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

2 两个客户端都获取同一条数据

GET test_index/test_type/16
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "16",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test test"
  }
}

3 其中一个客户端,先更新了一下这个数据,同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改

PUT /test_index/test_type/16?version=1 
{
  "test_field": "test client 1"
}
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "16",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

4 另外一个客户端,尝试基于version=1的数据去进行修改,同样带上version版本号,进行乐观锁的并发控制

PUT /test_index/test_type/16?version=1 
{
  "test_field": "test client 1"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][16]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "MzecFggXT1qW5O9HrzKKJw",
        "shard": "3",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][16]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "MzecFggXT1qW5O9HrzKKJw",
    "shard": "3",
    "index": "test_index"
  },
  "status": 409
}

5 在乐观锁成功阻止并发问题之后,尝试正确的完成更新,重新去获取数据,
基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号。这样才能成功修改数据(自旋)。可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下

PUT /test_index/test_type/16?version=2 
{
  "test_field": "test client 1"
}
基于external version进行乐观锁并发控制

es提供了一个feature,就是说,你可以不用它提供的内部_version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。举个列子,加入你的数据在mysql里也有一份,然后你的应用系统本身就维护了一个版本号,无论是什么自己生成的,程序控制的。这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。

external version语法: ?version=1&version_type=external
唯一的区别在于,_version,只有当你提供的version与es中的_version一模一样的时候,才可以进行修改,只要不一样,就报错;
当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改
1 先构造一条数据

PUT /test_index/test_type/17
{
  "test_field": "test"
}
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "17",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

(2)模拟两个客户端同时查询到这条数据

GET /test_index/test_type/17
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "17",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test"
  }
}

(3)第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2

PUT /test_index/test_type/17?version=2&version_type=external
{
  "test_field": "test client 1"
}
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "17",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

然后另外一个客户端也使用version=2去更新数据,这时会更新失败

PUT /test_index/test_type/17?version=2&version_type=external
{
  "test_field": "test client 1"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][17]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "MzecFggXT1qW5O9HrzKKJw",
        "shard": "4",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][17]: version conflict, current version [2] is higher or equal to the one provided [2]",
    "index_uuid": "MzecFggXT1qW5O9HrzKKJw",
    "shard": "4",
    "index": "test_index"
  },
  "status": 409
}

因此我们需要获取更高版本号再来更新

PUT /test_index/test_type/17?version=3&version_type=external
{
  "test_field": "test client 2"
}
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "17",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
es中的partial update

一般对应到应用程序中,每次的执行流程基本是这样的:

(1)应用程序先发起一个get请求,获取到document,展示到前台界面,供用户查看和修改
(2)用户在前台界面修改数据,发送到后台
(3)后台代码,会将用户修改的数据在内存中进行执行,然后封装好修改后的全量数据
(4)然后发送PUT请求,到es中,进行全量替换
(5)es将老的document标记为deleted,然后重新创建一个新的document

为什么使用partial update?

partial update 语法,看起来,好像就比较方便了,每次就传递少数几个发生修改的field即可,不需要将全量的document数据发送过去

post /index/type/id/_update 
{
   "doc": {
      "要修改的少数几个field即可,不需要全量的数据"
   }
}

操作例子

PUT /test_index/test_type/10
{
  "test_field1": "test1",
  "test_field2": "test2"
}

POST /test_index/test_type/10/_update
{
  "doc": {
    "test_field2": "updated test2"
  }
}
es 批处理

就是一条一条的查询:总花费时间是n次网络时间+n次服务器执行时间
批处理:1次网络时间+n次执行时间
可以说mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api
尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍,非常非常之重要
mget的语法

GET /_mget
{
   "docs" : [
      {
         "_index" : "test_index",
         "_type" :  "test_type",
         "_id" :    1
      },
      {
         "_index" : "test_index",
         "_type" :  "test_type",
         "_id" :    2
      }
   ]
}

{
  "docs": [
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "1",
      "_version": 1,
      "found": true,
      "_source": {
        "test_field1": "111",
        "test_field2": "2222"
      }
    },
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "2",
      "found": false
    }
  ]
}

mget变种语法
1 如果查询的document是一个index下的不同type种的话
2 如果查询的数据都在同一个index下的同一个type下,最简单了

GET /test_index/_mget
{
   "docs" : [
      {
         "_type" :  "test_type",
         "_id" :    1
      },
      {
         "_type" :  "test_type",
         "_id" :    2
      }
   ]
}

GET /test_index/test_type/_mget
{
   "ids": [1, 2]
}

bulk语法
有哪些类型的操作可以使用bulk语法?
(1)delete:删除一个文档,只要1个json串就可以了
(2)create:PUT /index/type/id/_create,强制创建
(3)index:普通的put操作,可以是创建文档,也可以是全量替换文档
(4)update:执行的partial update操作
bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行。bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志
语法如下

POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} 
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

bulk语法也和mget一样 也有变种 这里就不写了。
bulk size最佳大小:
bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从1000到5000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在5~15MB之间。

为什么bulk语法格式如此奇怪并且不能允许换行?这里会涉及到es的效率问题,就不详细去说了 具体可以百度

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容