Etcd
在本章节我们不止一次提到etcd,因为etcd这个项目在go语言分布式生态里头非常的重要。etcd个高可用的分布式键值(key-value)数据库,采用raft协议作为一致性算法,基于Go语言实现。etcd在go分布式系统生态里头的位置相当于zookeeper在java分布式系统生态的位置。tidb的raft系统是基于etcd的实现然后用rust实现的,tidb的pd系统还直接内嵌了etcd。在微服务生态里头etcd可以做服务发现组件使用,kubernetes也用etcd保存集群所有的网络配置和对象的状态信息。
etcd的特点
- 简单:提供定义明确的面向用户的API(有http和grpc实现,注意http和grpc分别用了两套存储,数据不同)
- 安全:支持client端证书的TLS
- 快速:基准测试支持10,000次write/s
- 可靠:使用raft协议保证数据一致性
对于想研究分布式系统的gopher来说,etcd是一个非常值得好好阅读源码的项目。它的raft实现,mvcc,wal实现都非常典型。
etcd使用
etcd项目有两个go client,一个基于http restful对于源码目录里头的client,另外一个基于grpc对于源码目录的clientv3,需要注意是这两个api操作的数据是不一样的。client支持的功能较小,而且性能也比较有问题,在go生态里头都会用基于grpc的clientv3。
连接etcd
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/etcd-io/etcd/clientv3"
)
var client *clientv3.Client
func init() {
var err error
client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, //集群有多个需要写多台
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
}
kv操作
etcd 支持来时nosql数据库一样的 kv操作。
//kv操作
func kv() {
_, err := client.Put(context.Background(), "sample_key", "sample_value")
if err != nil {
log.Fatal(err)
}
resp, err := clientv3.NewKV(client).Get(context.Background(), "sample_key")
if n := len(resp.Kvs); n == 0 {
fmt.Println("not found")
return
}
fmt.Println(string(resp.Kvs[0].Value))
client.Delete(context.Background(), "sample_key")
resp, err = clientv3.NewKV(client).Get(context.Background(), "sample_key")
if n := len(resp.Kvs); n == 0 {
fmt.Println("not found")
return
}
fmt.Println(string(resp.Kvs[0].Value))
}
运行一下
$ go run main.go
sample_value
not foun
watch(监听)
监听是指etcd可以对单个key或者摸个前缀的key进行监听,如果这些key的信息有变化则会通知监听的client。利用这个功能etcd可以做配置中心,或者做服务发现。
//监听 watch
func watch() {
go func() {
for {
ch := client.Watch(context.Background(), "sample_key") //监听sample_key
for wresp := range ch {
for _, ev := range wresp.Events { //打印事件
fmt.Println(ev)
}
}
}
}()
_, err := client.Put(context.Background(), "sample_key", "sample_value1")
if err != nil {
log.Fatal(err)
}
client.Delete(context.Background(), "sample_key")
}
运行一下
$ go run main.go
&{PUT key:"sample_key" create_revision:18157 mod_revision:18157 version:1 value:"sample_value1" <nil> {} [] 0} #PUT事件
&{DELETE key:"sample_key" mod_revision:18158 <nil> {} [] 0}#DELETE事件
我看到通过Watch 我们监控到了PUT和DELETE事件,在实际项目中可以根据业务需求针对相关的事件做成处理。
Lease(租约)
我们在上个小节介绍了 全局时间戳的实现,里头用到了etcd Lease来做选主服务,etcd的Lease可以用来做服务状态监控,再配合watch就能做到故障恢复。上个小节的选主服务本质上是我们常见的主备切换过程。接下来我们看下etcd Lease是怎么使用的。
//租约
func lease() {
lease := clientv3.NewLease(client)
//设置10秒租约 (过期时间为10秒)
leaseResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
log.Fatal(err)
}
leaseID := leaseResp.ID
fmt.Printf("ttl %d \n", leaseResp.TTL)
keepRespChan, err := lease.KeepAlive(context.TODO(), leaseID) //启动自动续租服务
if err != nil {
fmt.Println(err)
return
}
go func() { //新开一个协程监控 keepalive结果
outer:
for {
select {
case keepResp := <-keepRespChan:
if keepResp == nil {
fmt.Println("租约失效了")
break outer
} else {
fmt.Printf("续租成功 time:%d id:%d,ttl:%d \n", time.Now().Unix(), keepResp.ID, keepResp.TTL)
}
}
}
}()
kv := clientv3.NewKV(client)
putResp, err := kv.Put(context.TODO(), "sample_key_lease", "", clientv3.WithLease(leaseID))
if err != nil {
fmt.Println(err)
return
} else {
fmt.Println("写入成功", putResp.Header.Revision)
}
}
运行一下
$ go run main.go
ttl 10
续租成功 time:1577158783 id:7587839846421449960,ttl:10
写入成功 18159
续租成功 time:1577158786 id:7587839846421449960,ttl:10
续租成功 time:1577158790 id:7587839846421449960,ttl:10
续租成功 time:1577158793 id:7587839846421449960,ttl:10
续租成功 time:1577158797 id:7587839846421449960,ttl:10
续租成功 time:1577158800 id:7587839846421449960,ttl:10
我们看到超时时间为10秒,续租程序会在租约失效前大概(1/3 TTL)
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
自动续租(修改TTL),从而实现key一直有效。在服务宕机或者异常的时候没有执行续租,那么这个key将会在10秒后失效,如果其他client watch这个key的话就能监听的这个key被删除了。
总结
本小节简要介绍了etcd的一些特性,以及go语言etcd v3版本的api是如何使用的。最后还是推荐想了解分布式系统的gopher去阅读etcd源码。