在微服务中,服务注册与发现是必不可少的一环,其中etcd,zookeeper,consul在golang中较为常用。
程序对于这种中间件的依赖,都建议加一层接口,基于接口去实现
下面会分层三大板块去说明
接口定义
因为是上层调用是基于接口调用的,所以需要将discover和register接口化。 在discover发现endpoint有变化时,需要调用callback去更新本地的缓存,所以也需要endpoint的接口。如下
const (
EtcdBackend = "etcd"
ZookeeperBackend = "zookeeper"
ConsulBackend = "consul"
)
// 服务发现接口
type Discover interface {
// Start watch with block, 需要一个callback去更新本地endpoint
Start(callback EndpointCacher)
Stop()
}
// 服务注册接口
type Register interface {
Start() error
Stop() error
}
// endpoint接口
type EndpointCacher interface {
AddOrUpdate(endpoint string, attribute []byte)
Delete(endpoint string)
AddError(err error)
Error(err error)
}
接口定义完了,需要有连接注册中心的信息配置。
// 服务发现端配置
type DiscoverConfig struct {
BackendType string // one of etcd|consul|zookeeper
BackendEndPoints []string // register backend endpoint
DiscoverPrefix string
ServiceName string
HostName string
}
// 注册端配置
type RegisterConfig struct {
BackendType string // one of etcd|consul|zookeeper
BackendEndPoints []string // register backend endpoint
DiscoverPrefix string
ServiceName string
HeartBeatPeriod int64
ServiceEndPoint string // register service endpoint to backend
Attr string // custom attribute. like: {"hostname": "xxx", "weight": 1}
HealthCheckEndPoint string
}
有了配置之后,需要有注册和服务发现实例的创建方法
// NewDiscover 创建一个服务发现实例
func NewDiscover(cfg *DiscoverConfig) (Discover, error) {
switch cfg.BackendType {
case EtcdBackend:
return newEtcdDiscover(cfg)
case ConsulBackend:
return newConsulDiscover(cfg)
case ZookeeperBackend:
return newZookeeperDiscover(cfg)
}
return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}
// NewRegister 创建一个注册实例
func NewRegister(cfg *RegisterConfig) (Register, error) {
switch cfg.BackendType {
case EtcdBackend:
return newEtcdRegister(cfg)
case ConsulBackend:
return newConsulRegister(cfg)
case ZookeeperBackend:
return newZookeeperRegister(cfg)
}
return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}
这里实现了简易版的endpoint,可供参考,主要是将endpoint存在map中,然后有变化时做变更
// LiteEndpoint EndpointCacher lite impl
type LiteEndpoint struct {
Endpoints map[string][]byte `json:"value"`
lock sync.Mutex
Err error
}
func NewLiteEndpoint() *LiteEndpoint {
return &LiteEndpoint{
Endpoints: map[string][]byte{},
lock: sync.Mutex{},
}
}
func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) {
e.lock.Lock()
defer e.lock.Unlock()
e.Endpoints[endpoint] = attribute
}
func (e *LiteEndpoint) Delete(endpoint string) {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.Endpoints, endpoint)
}
func (e *LiteEndpoint) Error(err error) {
e.Err = err
}
func (e *LiteEndpoint) List() []string {
var endpointSlice []string
for k, _ := range e.Endpoints {
endpointSlice = append(endpointSlice, k)
}
return endpointSlice
}
func (e *LiteEndpoint) Attr(endpoint string) []byte {
return e.Endpoints[endpoint]
}
接口的具体实现
etcd
注册服务
服务注册到etcd后,利用etcd租期的特性,每次续租几秒,在续期过期前完成续租。当实例异常时无法续租,则会在etcd端该实例会被过期删除,达到下线异常节点的效果。
func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) {
var err error
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
if err != nil {
return nil, err
}
r := &etcdRegister{
etcdEndpoints: cfg.BackendEndPoints,
discoverPrefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
stopCh: make(chan struct{}),
etcdClient: etcdClient,
}
return r, nil
}
// Start 开启一个协程
func (r *etcdRegister) Start() error {
go r.keepAlive()
return nil
}
func (r *etcdRegister) Stop() error {
close(r.stopCh)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := r.etcdClient.Delete(ctx, r.key())
//if r.grpcResolver != nil {
// return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint})
//}
return err
}
// 定时续租
func (r *etcdRegister) keepAlive() {
duration := time.Duration(r.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
case <-r.stopCh:
return
case <-timer.C:
if r.leaseID > 0 {
if err := r.leaseRenewal(); err != nil {
logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error())
r.leaseID = 0
}
} else {
if err := r.register(); err != nil {
logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error())
} else {
logrus.Infof("register endppint %s success", r.endpoint)
}
}
timer.Reset(duration)
}
}
}
func (r *etcdRegister) register() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := r.etcdClient.Grant(ctx, r.ttl+3)
if err != nil {
return err
}
_, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID))
r.leaseID = resp.ID
return err
}
func (r *etcdRegister) leaseRenewal() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID)
return err
}
func (r *etcdRegister) key() string {
return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)
}
func toEtcdKey(elem ...string) string {
return strings.Join(elem, "/")
}
func tailKey(key []byte) string {
keyStr := string(key)
topicSlice := strings.Split(keyStr, "/")
if len(topicSlice) != 0 {
return topicSlice[len(topicSlice)-1]
}
return keyStr
}
服务发现端
服务发现端需要不断的监听key的变化,所以需要watch,有更新后需要调用callback来更新本地的endpoint列表
// etcd discover impl
type etcdDiscover struct {
ctx context.Context
cancel context.CancelFunc
etcdClient *clientv3.Client
prefix string
}
func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) {
cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &etcdDiscover{
ctx: ctx,
cancel: cancel,
etcdClient: cli,
prefix: cfg.DiscoverPrefix,
}, nil
}
func (d *etcdDiscover) Start(callback EndpointCacher) {
d.discover(callback)
}
func (d *etcdDiscover) discover(callback EndpointCacher) {
ctx, cancel := context.WithCancel(d.ctx)
defer cancel()
if err := d.listService(ctx, callback); err != nil {
callback.AddError(err)
}
watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix())
for {
select {
case <-d.ctx.Done():
return
case resp := <-watch:
if err := resp.Err(); err != nil {
callback.AddError(err)
return
}
for _, event := range resp.Events {
if event.Kv == nil {
continue
}
switch event.Type {
case mvccpb.PUT:
callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value)
case mvccpb.DELETE:
callback.Delete(tailKey(event.Kv.Key))
}
}
}
}
}
func (d *etcdDiscover) Stop() {
d.cancel()
}
func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error {
resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
callback.AddOrUpdate(tailKey(kv.Key), kv.Value)
}
return nil
}
zookeeper
zookeeper则利用临时节点的特性,来做异常服务下线功能
服务注册端
type zookeeperRegister struct {
zkEndpoints []string
prefix string
serviceName string
endpoint string
attr string
ttl int64
stopCh chan struct{}
conn *zk.Conn
}
func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) {
r := zookeeperRegister{
zkEndpoints: cfg.BackendEndPoints,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
}
return &r, nil
}
func (r *zookeeperRegister) Start() error {
var err error
r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5)
if err != nil {
return err
}
return r.register()
}
func (r *zookeeperRegister) Stop() error {
if r.conn != nil {
r.conn.Close()
}
return nil
}
func (r *zookeeperRegister) register() error {
if err := r.createIfNotExist(r.node(), nil, 0); err != nil {
return err
}
return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))
}
func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error {
exist, _, err := r.conn.Exists(path)
if err != nil {
return err
}
if !exist {
_, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
return nil
}
_, stat, err := r.conn.Get(path)
if err != nil {
return err
}
_, err = r.conn.Set(path, []byte(r.attr), stat.Version)
if err != nil {
return err
}
return nil
}
func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error {
exist, _, err := r.conn.Exists(r.node())
if err != nil {
return err
}
if !exist {
_, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
return nil
}
func (r *zookeeperRegister) node() string {
return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)
}
func (r *zookeeperRegister) key() string {
return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)
}
服务发现端
type zookeeperDiscover struct {
ctx context.Context
cancel context.CancelFunc
conn *zk.Conn
prefix string
serviceName string
}
func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) {
conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
d := &zookeeperDiscover{
ctx: ctx,
cancel: cancel,
conn: conn,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
}
return d, nil
}
func (d *zookeeperDiscover) Start(callback EndpointCacher) {
d.discover(callback)
}
func (d *zookeeperDiscover) Stop() {
d.cancel()
if d.conn != nil {
d.conn.Close()
}
}
func (d *zookeeperDiscover) discover(callback EndpointCacher) {
if err := d.listService(callback); err != nil {
callback.AddError(err)
return
}
for {
snapshot, _, ch, err := d.conn.ChildrenW(d.key())
if err != nil {
callback.AddError(err)
return
}
select {
case e := <-ch:
switch e.Type {
case zk.EventNodeCreated, zk.EventNodeChildrenChanged:
for _, v := range snapshot {
callback.Delete(v)
}
if err := d.listService(callback); err != nil {
callback.AddError(err)
}
case zk.EventNodeDeleted:
for _, v := range snapshot {
callback.Delete(v)
}
}
}
}
}
func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) {
value, _, err := d.conn.Get(path)
return value, err
}
func (d *zookeeperDiscover) listService(callback EndpointCacher) error {
childs, _, err := d.conn.Children(d.key())
if err != nil {
return err
}
for _, c := range childs {
value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c))
if err != nil {
return err
}
callback.AddOrUpdate(c, value)
}
return nil
}
func (d *zookeeperDiscover) key() string {
return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)
}
consul
服务注册端
type consulRegister struct {
prefix string
serviceName string
serviceId string
endpoint string
healthCheckEndpoint string
attr string
ttl int64
stopCh chan struct{}
client *consulapi.Client
}
func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) {
config := consulapi.DefaultConfig()
config.Address = strings.Join(cfg.BackendEndPoints, ",")
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
r := &consulRegister{
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
endpoint: cfg.ServiceEndPoint,
healthCheckEndpoint: cfg.HealthCheckEndPoint,
attr: cfg.Attr,
ttl: cfg.HeartBeatPeriod,
stopCh: make(chan struct{}),
client: client,
}
return r, nil
}
func (s *consulRegister) Start() error {
return s.register()
}
func (s *consulRegister) Stop() error {
if s.serviceId != "" {
return s.client.Agent().ServiceDeregister(s.serviceId)
}
return nil
}
func (s *consulRegister) register() error {
registration := new(consulapi.AgentServiceRegistration)
address, port, err := net.SplitHostPort(s.endpoint)
if err != nil {
return err
}
registration.Address = address
portInt, err := strconv.Atoi(port)
if err != nil {
return err
}
registration.Port = portInt
serviceId := fmt.Sprintf("%s_%s", s.serviceName, address)
s.serviceId = serviceId
registration.Name = s.serviceName
registration.ID = serviceId
serviceCheck := new(consulapi.AgentServiceCheck)
serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint)
serviceCheck.Timeout = "2s"
serviceCheck.Interval = "2s"
serviceCheck.DeregisterCriticalServiceAfter = "30s"
registration.Check = serviceCheck
return s.client.Agent().ServiceRegister(registration)
}
服务发现端
type consulDiscover struct {
ctx context.Context
cancel context.CancelFunc
prefix string
serviceName string
client *consulapi.Client
}
func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) {
config := consulapi.DefaultConfig()
config.Address = strings.Join(cfg.BackendEndPoints, ",")
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
d := &consulDiscover{
ctx: ctx,
cancel: cancel,
prefix: cfg.DiscoverPrefix,
serviceName: cfg.ServiceName,
client: client,
}
return d, nil
}
func (d *consulDiscover) Start(callback EndpointCacher) {
d.discover(callback)
}
func (d *consulDiscover) Stop() {
d.cancel()
}
func (d *consulDiscover) discover(callback EndpointCacher) {
var lastIndex uint64
for {
select {
case <-d.ctx.Done():
return
default:
services, queryMeta, err := d.client.Health().Service(
d.serviceName, "", false, &consulapi.QueryOptions{
WaitIndex: lastIndex,
})
if err != nil {
callback.AddError(err)
}
lastIndex = queryMeta.LastIndex
for _, service := range services {
var attr []byte
endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port)
switch service.Checks.AggregatedStatus() {
case consulapi.HealthPassing:
if service.Service.Meta != nil {
attr, _ = json.Marshal(service.Service.Meta)
}
callback.AddOrUpdate(endpoint, attr)
case consulapi.HealthCritical, consulapi.HealthWarning:
callback.Delete(endpoint)
}
}
}
}
}
func (d *consulDiscover) listService() {
d.client.Agent().Services()
}
示例代码
etcd
服务发现端
package main
import (
"fmt"
"os"
"github.com/goeasya/discox"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.EtcdBackend,
BackendEndPoints: []string{"http://10.1.1.1:23790"},
DiscoverPrefix: "/discox/etcddemo",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "127.0.0.1:8111",
}
service, err := discox.NewRegister(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
defer service.Stop()
select {}
}
服务注册端
package main
import (
"fmt"
"os"
"time"
"github.com/goeasya/discox"
)
func main() {
timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{
BackendEndPoints: []string{"http://10.1.1.1:23790"},
BackendType: discox.EtcdBackend,
DiscoverPrefix: "/discox/etcddemo",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}
zookeeper
服务发现端
package main
import (
"fmt"
"github.com/goeasya/discox"
"os"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.ZookeeperBackend,
BackendEndPoints: []string{"10.1.1.1:2181"},
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "127.0.0.1:8111",
}
service, err := discox.NewRegister(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
defer service.Stop()
select {}
}
服务注册端
package main
import (
"fmt"
"github.com/goeasya/discox"
"os"
"time"
)
func main() {
timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{
BackendEndPoints: []string{"10.1.1.1:2181"},
BackendType: discox.ZookeeperBackend,
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}
consul
服务发现端
package main
import (
"fmt"
"net/http"
"os"
"github.com/goeasya/discox"
)
func main() {
cfg := discox.RegisterConfig{
BackendType: discox.ConsulBackend,
BackendEndPoints: []string{"consul.test.com"},
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
HeartBeatPeriod: 5,
ServiceEndPoint: "172.18.1.1:8080",
HealthCheckEndPoint: "172.18.1.1:8080/check",
}
service, err := discox.NewRegister(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
if err = service.Start(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
http.HandleFunc("/check", consulCheck)
go http.ListenAndServe(":8080", nil)
defer service.Stop()
select {}
}
var count int64
func consulCheck(w http.ResponseWriter, r *http.Request) {
s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()
fmt.Println(s)
fmt.Fprintln(w, s)
count++
}
服务注册端
package main
import (
"fmt"
"os"
"time"
"github.com/goeasya/discox"
)
func main() {
timer := time.NewTimer(time.Second * 5)
cfg := discox.DiscoverConfig{
BackendEndPoints: []string{"consul.test.com"},
BackendType: discox.ConsulBackend,
DiscoverPrefix: "/soaservices",
ServiceName: "demo",
}
server, err := discox.NewDiscover(&cfg)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
endpointCacher := discox.NewLiteEndpoint()
go server.Start(endpointCacher)
defer server.Stop()
for {
select {
case <-timer.C:
fmt.Println("time 5 seconds")
endpoints := endpointCacher.List()
fmt.Println(endpoints, len(endpoints))
timer.Reset(time.Second * 5)
}
}
}