ETCD 的理解和简单使用
1. etcd介绍
etcd是使用Go语言开发的一个开源、高可用的分布式key-value存储系统,可以用于配置共享
和服务注册和发现
特点:
完全复制
:集群中的每个节点都可以使用完整的存档。
高可用性
:etcd可用于避免硬件的单点故障和网络问题。
一致性
:每次读取都会返回跨多主机的最新写入。
简单
:保罗一个定义良好、面向用户的API。
安全
:实现了带有可选的客户端证书身份验证的自动化TLS。
快速
:每秒1W次写入的基准速度。
可靠
: RAFT算法实现了强一致、高可用的服务存储目录。
2. etcd的应用场景
2.1 服务发现
服务发现:即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立了解?本质上来说,服务发现就是想要了解进群众是否有进程监听UDP和TCP端口,并且通过域名就可以查找和连接。
配置中心
将一些配置信息放到etcd上进行集中管理。
应用在启动的时候主动从etcd获取一次配置信息,同事,在etcd节点上注册一个WATCHER并等待,以后每次配置有更新的时候,etcd都会实时通知订阅者,一次达到获取配置信息的目的。
分布式锁
详细请参考:
https://www.cnblogs.com/jiujuan/p/12147809.html
https://segmentfault.com/a/1190000021603215?utm_source=tag-newest
因为etcd使用Raft算法保持了数据的强一致性,其次操作存储到集群中的值必然是全局一直的,所以很容易实现分布式锁。锁服务有两种使用方式。
-
保持独占
即所有获取锁的用户最终之后一个用户可以得到。etcd为此提供了一套实现分布式锁算子操作CAS(CompareAndswap)的API. 通过设置prevExist值,可以保证在多个节点同事创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。 -
控制时序
: 即所有想要获得所的用户都会被安排执行,但是获得所的顺序也是全局唯一的,同事决定了执行顺序。etcd也提供了一套API(自动创建有序键),对一个目录建值同时指定为POST动作,这样etcd会自动在目录下生一个当前最大值为键,存储这个新的值(客户端编号)。同时还可以使用API按顺序列出所有当前目录下的键值。此时这些建的值就是客户端的时序,而这些键中存储的值可以代表客户端的编号。
etcd集群
etcd作为一个高可用键值存储系统,天生就是为集群哈而设计的。由于Raft算法在做决策时需要多数节点的投票,所以etcd一般部署集群推荐奇数个节点,推荐数量为3、5或者7个节点构成一个集群。
为什么用etcd而不用zookeeper?
- etcd简单,使用Go语言编写部署简单,支持HTTP/JSON API,使用简单:使用Raft算法保证强一致性,让用户易于理解。
- etcd默认数据一更新就进行持久化。
- etcd支持SSL客户端安全认证。
- zookeeper部署维护复杂,其使用的PAXOS强一致性算法难懂。官方只提供了JAVA和C两种语言的接口。
- zookeeper 使用JAVA编写引入大量依赖。运维人员维护起来比较麻烦。
拓展:Raft
etcd下载与安装
- 源码码下载地址
https://github.com/etcd-io/etcd/releases
3. etcd命令简单使用
1. 启动etcd
`./etcd`
2. etcdclt 交互
2.1 put
通过put将key和value存储到etcd集群中。 每个存储的key都通过Raft协议复制到所有etcd集群成员,以实现一致性和可靠性。
# etcdctl put name dong
OK
2.2 get
通过get可以从一个etcd集群中读取key的值。
现有K-V对:
age = 18
age2 = 19
name = dong
name2 = zhang
name3 = zhao
name4 = gan
- 1. 通过key来直接读取valu
$ etcdctl get name
name # key
dong # value
- 2. 通过key来直接读取value,只显示value
$ etcdctl get name --print-value-only
dong # value
- 3. 通过key来直接读取value,只显示key
$ etcdctl get name --keys-only
name # key
- 4. 读取指定范围的key,从name~name3 左闭右开区间
$ etcdctl get name name3
name # k1
dong # v1
name2 # k2
zhang # v2
- 5. 按前缀读取
$ etcdctl get n --prefix --keys-only
name
name2
name3
name4
- 6. 读取数量限制
$ etcdctl get n --prefix --keys-only --limit 3
name
name2
name3
- 7. 读取大于或等于指定键的字节值的键(从字母b开始,包含b)
$ etcdctl get --from-key b --keys-only
name
name2
name3
name4
- 8.
2.3 del
- 1. 删除指定的 key
$ etcdctl del name
1 # 返回值,影响的个数
- 2. 删除指定的键值对
$ etcdctl del --prev-kv name
1 # 返回值,影响的个数
name # k
dong # v
- 3. 删除指定范围的key
$ etcdctl del name name3
2 # 返回受影响的个数
- 4. 删除具有前缀的key
$ etcdctl del --prefix a
2
-5. 删除大于或等于键的字节值的键的命令
$ etcdctl del --from-key b
4
2.4 watch
Watch 用于监测一个 key-value 的变化,一旦 key-value 发生更新,就会输出最新的值。
- 在新的终端输入
etcdctl watch key
监听对应的key。
$ etcdctl put key 001 OK $ etcdctl put key 002 OK $ etcdctl del key 1 ###### 以下是etcdctl watch key的输出 $ etcdctl watch key PUT # TYPE key # K 001 # V PUT key 002 DELETE key
2.5 lock(分布式锁)具体可参考:
etcd 的 lock 指令对指定的 key 进行加锁。注意,只有当正常退出且释放锁后,lock 命令的退出码是 0,否则这个锁会一直被占用直到过期(默认 60 秒)。
- 在第一个终端输入如下命令:
$ etcdctl lock mutex1
mutex1/694d7a4c4cf36947
- 在第二个终端输入同样的命令:
$ etcdctl lock mutex1
在此可以发现第二个终端发生了阻塞,并未返回类似 mutex1/694d7a4c4cf36947
的输出。此时,如果我们使用 Ctrl+C 结束了第一个终端的 lock,然后第二个终端的显示如下:
mutex1/694d7a4c4cf3694b
可见,这就是一个分布式锁的实现。
2.6 transactions(事务)
txn支持从标准输入中读取多个请求,并将他们看作一个原子性的事务执行。 事务是由条件列表,条件判断成功时的执行列表(条件列表中全部条件为真表示成功)和条件判断失败时的执行列表(条件列表中有一个为假即为失败)组成的。
$ etcdctl txn -i # 交互
compares:
value("name") = "dong" # 条件,可以写多个
success requests (get, put, del): # 条件为true,执行的命令
put result ok
failure requests (get, put, del): # 条件为fales,执行的命令
put result failed
SUCCESS
OK
####### 因为value("name") = "dong" 为TRUE,所以命令put result ok 执行
$ etcdctl get result
result
ok
2.7 compact(压缩)
etcd 会保存数据的修订版本,以便用户可以读取旧版本的 key。但是为了避免累积无尽头的版本历史,就需要压缩过去的修订版本。压缩后,etcd 会删除历史版本并释放资源。
$ etcdctl compact 5
compacted revision 5
$ etcdctl get --rev=4 foo
Error: etcdserver: mvcc: required revision has been compacted
2.8 lease(租约)
KEY的TTL(time to live 生存时间)是etcd的重要特征之一,即设置KEY的超时时间。 与Redis不同,etcd需要先闯进lease(租约),通过put --lease=
设置。 而lease又有TTL管理,以此来实现key的超时时间。
-
创建lease
$ etcdctl lease grant 60 # 创建lease,注意时间是60s lease 694d7a4c4cf36969 granted with TTL(60s) $ etcdctl put --lease=694d7a4c4cf36969 name soo # 将lease设置到指定的key中 OK $ etcdctl get name # 获取对应的key,以及返回值 name soo $ etcdctl get name name soo $ etcdctl get name # 获取对应的key,超过存活时间,没有返回值
-
废除指定的lease
$ etcdctl lease grant 60 # 创建lease lease 694d7a4c4cf36970 granted with TTL(60s) $ etcdctl put --lease=694d7a4c4cf36970 name soo # 将lease设置到指定的key中 OK $ etcdctl lease revoke 694d7a4c4cf36970 # 撤销对应的lease lease 694d7a4c4cf36970 revoked $ etcdctl get name # 无法获取key对应的value
-
查看租约期内对应的key,以及租约的状态
$ etcdctl lease grant 240 # 创建lease lease 694d7a4c4cf3697b granted with TTL(240s) $ etcdctl put --lease=694d7a4c4cf3697b name liu # 设置key OK $ etcdctl put --lease=694d7a4c4cf3697b name2 song # 设置key OK $ etcdctl lease timetolive 694d7a4c4cf3697b # 获取租约信息 lease 694d7a4c4cf3697b granted with TTL(240s), remaining(184s) $ etcdctl lease timetolive --keys 694d7a4c4cf3697b # 获取租约信息以及对应的key lease 694d7a4c4cf3697b granted with TTL(240s), remaining(165s), attached keys([name name2]) $ etcdctl lease timetolive --keys 694d7a4c4cf3697b #租约已经过期所对应的返回值 lease 694d7a4c4cf3697b already expired
-
续约
$ etcdctl lease grant 30 lease 694d7a4c4cf36984 granted with TTL(30s) $ etcdctl put --lease=694d7a4c4cf36984 name aaa OK $ etcdctl lease keep-alive 694d7a4c4cf36984 # 该命令不会退出,一直会续约 lease 694d7a4c4cf36984 keepalived with TTL(30) lease 694d7a4c4cf36984 keepalived with TTL(30) lease 694d7a4c4cf36984 keepalived with TTL(30) lease 694d7a4c4cf36984 keepalived with TTL(30)
4. GO Client SDK 交互
下载对应的库:go get go get go.etcd.io/etcd/clientv3
注: 这里开启mod可能会有error: undefined: balancer.PickOptions, 需要在mod文件中:replace google.golang.org/grpc => google.golang.org/grpc v1.26.0`
1. put、get 和 del
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建链接
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
_, err = cli.Put(ctx, "name", "dong")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
resp, err := cli.Get(ctx, "name")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
// del
delresp, err := cli.Delete(context.TODO(), "name")
if err != nil {
fmt.Printf("del key failed, err:%v\n", err)
return
}
if delresp.Deleted == 1{
fmt.Println("del key success!")
}
}
2. wathc
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建连接
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// watch
rch := cli.Watch(context.TODO(), "name") // 返回一个channel
// 一直监听key的变化
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
3. lease
package main
import (
"fmt"
"time"
)
// etcd lease
import (
"context"
"log"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建连接
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println("connect to etcd success.")
defer cli.Close()
// 创建一个5秒的租约
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// 5秒钟之后, name 这个key就会被移除
_, err = cli.Put(context.TODO(), "name", "dong", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// 设置 keep-alive
ch, err := cli.KeepAlive(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
for {
ka := <-ch
fmt.Println("ttl:", ka.TTL)
}
4. 事务
package main
import (
"fmt"
"time"
)
// etcd lease
import (
"context"
"log"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建连接
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 10 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println("connect to etcd success.")
defer cli.Close()
// 先设置了一个key
_, _ = cli.Put(context.TODO(), "name", "dong")
// 获取事务对象
txn := cli.Txn(context.TODO())
// 这里的if是不成立的,执行else,会将name的值设置为liu
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("name"), "=", "liu")).
Else(clientv3.OpPut("name", "liu")).Commit()
if err != nil{
fmt.Printf("commit failed, err:%v\n", err)
return
}
if !txnResp.Succeeded {
getResp, _ := cli.Get(context.TODO(), "name")
fmt.Println(string(getResp.Kvs[0].Value)) // 打印出liu
}
}
5. 分布式锁(待研究)
参考: