golang微服务之注册与发现

在微服务中,服务注册与发现是必不可少的一环,其中etcd,zookeeper,consul在golang中较为常用。

程序对于这种中间件的依赖,都建议加一层接口,基于接口去实现

下面会分层三大板块去说明

接口定义

因为是上层调用是基于接口调用的,所以需要将discover和register接口化。 在discover发现endpoint有变化时,需要调用callback去更新本地的缓存,所以也需要endpoint的接口。如下

const (
    EtcdBackend      = "etcd"
    ZookeeperBackend = "zookeeper"
    ConsulBackend    = "consul"
)

// 服务发现接口

type Discover interface {
    // Start watch with block, 需要一个callback去更新本地endpoint
    Start(callback EndpointCacher)
    Stop()
}

// 服务注册接口

type Register interface {
    Start() error
    Stop() error
}

// endpoint接口
type EndpointCacher interface {
    AddOrUpdate(endpoint string, attribute []byte)
    Delete(endpoint string)
    AddError(err error)
    Error(err error)
}

接口定义完了,需要有连接注册中心的信息配置。
// 服务发现端配置

type DiscoverConfig struct {
	BackendType      string   // one of etcd|consul|zookeeper
	BackendEndPoints []string // register backend endpoint
	DiscoverPrefix   string
	ServiceName      string
	HostName         string
}

// 注册端配置

type RegisterConfig struct {
	BackendType         string   // one of etcd|consul|zookeeper
	BackendEndPoints    []string // register backend endpoint
	DiscoverPrefix      string
	ServiceName         string
	HeartBeatPeriod     int64
	ServiceEndPoint     string // register service endpoint to backend
	Attr                string // custom attribute. like: {"hostname": "xxx", "weight": 1}
	HealthCheckEndPoint string
}

有了配置之后,需要有注册和服务发现实例的创建方法

// NewDiscover 创建一个服务发现实例
func NewDiscover(cfg *DiscoverConfig) (Discover, error) {
    switch cfg.BackendType {
    case EtcdBackend:
        return newEtcdDiscover(cfg)
    case ConsulBackend:
        return newConsulDiscover(cfg)
    case ZookeeperBackend:
        return newZookeeperDiscover(cfg)
    }
    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}



// NewRegister 创建一个注册实例
func NewRegister(cfg *RegisterConfig) (Register, error) {
    switch cfg.BackendType {
    case EtcdBackend:
        return newEtcdRegister(cfg)
    case ConsulBackend:
        return newConsulRegister(cfg)
    case ZookeeperBackend:
        return newZookeeperRegister(cfg)
    }
    return nil, fmt.Errorf("unknown backend: %s, use etcd|consul|zookeeper", cfg.BackendType)
}

这里实现了简易版的endpoint,可供参考,主要是将endpoint存在map中,然后有变化时做变更

// LiteEndpoint EndpointCacher lite impl
type LiteEndpoint struct {
    Endpoints map[string][]byte `json:"value"`
    lock      sync.Mutex
    Err       error
}

func NewLiteEndpoint() *LiteEndpoint {
    return &LiteEndpoint{
        Endpoints: map[string][]byte{},
        lock:      sync.Mutex{},
    }
}

func (e *LiteEndpoint) AddOrUpdate(endpoint string, attribute []byte) {
    e.lock.Lock()
    defer e.lock.Unlock()
    e.Endpoints[endpoint] = attribute
}

func (e *LiteEndpoint) Delete(endpoint string) {
    e.lock.Lock()
    defer e.lock.Unlock()
    delete(e.Endpoints, endpoint)
}

func (e *LiteEndpoint) Error(err error) {
    e.Err = err
}

func (e *LiteEndpoint) List() []string {
    var endpointSlice []string
    for k, _ := range e.Endpoints {
        endpointSlice = append(endpointSlice, k)
    }
    return endpointSlice
}

func (e *LiteEndpoint) Attr(endpoint string) []byte {
    return e.Endpoints[endpoint]
}

接口的具体实现

etcd

注册服务
服务注册到etcd后,利用etcd租期的特性,每次续租几秒,在续期过期前完成续租。当实例异常时无法续租,则会在etcd端该实例会被过期删除,达到下线异常节点的效果。

func newEtcdRegister(cfg *RegisterConfig) (*etcdRegister, error) {
    var err error
    etcdClient, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
    if err != nil {
        return nil, err
    }

    r := &etcdRegister{
        etcdEndpoints:  cfg.BackendEndPoints,
        discoverPrefix: cfg.DiscoverPrefix,
        serviceName:    cfg.ServiceName,
        endpoint:       cfg.ServiceEndPoint,
        attr:           cfg.Attr,
        ttl:            cfg.HeartBeatPeriod,
        stopCh:         make(chan struct{}),
        etcdClient:     etcdClient,
    }
    return r, nil
}

// Start 开启一个协程
func (r *etcdRegister) Start() error {

    go r.keepAlive()
    return nil
}

func (r *etcdRegister) Stop() error {
    close(r.stopCh)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    _, err := r.etcdClient.Delete(ctx, r.key())
    //if r.grpcResolver != nil {
    //    return r.grpcResolver.Update(ctx, r.key(), grpcnaming.Update{Op: grpcnaming.Delete, Addr: r.endpoint})
    //}
    return err
}

// 定时续租
func (r *etcdRegister) keepAlive() {
    duration := time.Duration(r.ttl) * time.Second
    timer := time.NewTimer(duration)
    for {
        select {
        case <-r.stopCh:
            return
        case <-timer.C:
            if r.leaseID > 0 {
                if err := r.leaseRenewal(); err != nil {
                    logrus.Warnf("%s leaseid[%x] keepAlive err: %s, try to reset...", r.endpoint, r.leaseID, err.Error())
                    r.leaseID = 0
                }
            } else {
                if err := r.register(); err != nil {
                    logrus.Warnf("register endpoint %s error: %s", r.endpoint, err.Error())
                } else {
                    logrus.Infof("register endppint %s success", r.endpoint)
                }
            }
            timer.Reset(duration)
        }
    }
}

func (r *etcdRegister) register() error {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    resp, err := r.etcdClient.Grant(ctx, r.ttl+3)
    if err != nil {
        return err
    }

    _, err = r.etcdClient.Put(ctx, r.key(), r.attr, clientv3.WithLease(resp.ID))
    r.leaseID = resp.ID
    return err
}

func (r *etcdRegister) leaseRenewal() error {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()
    _, err := r.etcdClient.KeepAliveOnce(ctx, r.leaseID)
    return err
}

func (r *etcdRegister) key() string {
    return toEtcdKey(r.discoverPrefix, r.serviceName, r.endpoint)
}

func toEtcdKey(elem ...string) string {
    return strings.Join(elem, "/")
}

func tailKey(key []byte) string {
    keyStr := string(key)
    topicSlice := strings.Split(keyStr, "/")
    if len(topicSlice) != 0 {
        return topicSlice[len(topicSlice)-1]
    }
    return keyStr
}

服务发现端
服务发现端需要不断的监听key的变化,所以需要watch,有更新后需要调用callback来更新本地的endpoint列表

// etcd discover impl
type etcdDiscover struct {
    ctx        context.Context
    cancel     context.CancelFunc
    etcdClient *clientv3.Client
    prefix     string
}

func newEtcdDiscover(cfg *DiscoverConfig) (*etcdDiscover, error) {
    cli, err := clientv3.New(clientv3.Config{Endpoints: cfg.BackendEndPoints})
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    return &etcdDiscover{
        ctx:        ctx,
        cancel:     cancel,
        etcdClient: cli,
        prefix:     cfg.DiscoverPrefix,
    }, nil
}

func (d *etcdDiscover) Start(callback EndpointCacher) {
    d.discover(callback)
}

func (d *etcdDiscover) discover(callback EndpointCacher) {
    ctx, cancel := context.WithCancel(d.ctx)
    defer cancel()

    if err := d.listService(ctx, callback); err != nil {
        callback.AddError(err)
    }

    watch := d.etcdClient.Watch(ctx, d.prefix, clientv3.WithPrefix())
    for {
        select {
        case <-d.ctx.Done():
            return
        case resp := <-watch:
            if err := resp.Err(); err != nil {
                callback.AddError(err)
                return
            }
            for _, event := range resp.Events {
                if event.Kv == nil {
                    continue
                }
                switch event.Type {
                case mvccpb.PUT:
                    callback.AddOrUpdate(tailKey(event.Kv.Key), event.Kv.Value)
                case mvccpb.DELETE:
                    callback.Delete(tailKey(event.Kv.Key))
                }
            }
        }
    }
}

func (d *etcdDiscover) Stop() {
    d.cancel()
}

func (d *etcdDiscover) listService(ctx context.Context, callback EndpointCacher) error {
    resp, err := d.etcdClient.Get(ctx, d.prefix, clientv3.WithPrefix())
    if err != nil {
        return err
    }
    for _, kv := range resp.Kvs {
        callback.AddOrUpdate(tailKey(kv.Key), kv.Value)
    }
    return nil
}

zookeeper

zookeeper则利用临时节点的特性,来做异常服务下线功能
服务注册端

type zookeeperRegister struct {
    zkEndpoints []string
    prefix      string
    serviceName string
    endpoint    string
    attr        string
    ttl         int64
    stopCh      chan struct{}
    conn        *zk.Conn
}

func newZookeeperRegister(cfg *RegisterConfig) (*zookeeperRegister, error) {
    r := zookeeperRegister{
        zkEndpoints: cfg.BackendEndPoints,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
        endpoint:    cfg.ServiceEndPoint,
        attr:        cfg.Attr,
        ttl:         cfg.HeartBeatPeriod,
    }
    return &r, nil
}

func (r *zookeeperRegister) Start() error {
    var err error
    r.conn, _, err = zk.Connect(r.zkEndpoints, time.Second*5)
    if err != nil {
        return err
    }

    return r.register()
}

func (r *zookeeperRegister) Stop() error {
    if r.conn != nil {
        r.conn.Close()
    }
    return nil
}

func (r *zookeeperRegister) register() error {
    if err := r.createIfNotExist(r.node(), nil, 0); err != nil {
        return err
    }

    return r.createOrUpdateEndpoint(r.key(), []byte(r.attr))
}

func (r *zookeeperRegister) createOrUpdateEndpoint(path string, data []byte) error {
    exist, _, err := r.conn.Exists(path)
    if err != nil {
        return err
    }

    if !exist {
        _, err = r.conn.Create(path, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {
            return err
        }
        return nil
    }

    _, stat, err := r.conn.Get(path)
    if err != nil {
        return err
    }
    _, err = r.conn.Set(path, []byte(r.attr), stat.Version)
    if err != nil {
        return err
    }

    return nil

}

func (r *zookeeperRegister) createIfNotExist(path string, data []byte, flag int32) error {
    exist, _, err := r.conn.Exists(r.node())
    if err != nil {
        return err
    }

    if !exist {
        _, err = r.conn.Create(path, data, flag, zk.WorldACL(zk.PermAll))
        if err != nil {
            return err
        }
    }
    return nil
}

func (r *zookeeperRegister) node() string {
    return fmt.Sprintf("%s/%s", r.prefix, r.serviceName)
}

func (r *zookeeperRegister) key() string {
    return fmt.Sprintf("%s/%s/%s", r.prefix, r.serviceName, r.endpoint)
}

服务发现端

type zookeeperDiscover struct {
    ctx         context.Context
    cancel      context.CancelFunc
    conn        *zk.Conn
    prefix      string
    serviceName string
}

func newZookeeperDiscover(cfg *DiscoverConfig) (*zookeeperDiscover, error) {
    conn, _, err := zk.Connect(cfg.BackendEndPoints, 10*time.Second)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())
    d := &zookeeperDiscover{
        ctx:         ctx,
        cancel:      cancel,
        conn:        conn,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
    }
    return d, nil
}

func (d *zookeeperDiscover) Start(callback EndpointCacher) {
    d.discover(callback)
}

func (d *zookeeperDiscover) Stop() {
    d.cancel()
    if d.conn != nil {
        d.conn.Close()
    }
}

func (d *zookeeperDiscover) discover(callback EndpointCacher) {
    if err := d.listService(callback); err != nil {
        callback.AddError(err)
        return
    }
    for {
        snapshot, _, ch, err := d.conn.ChildrenW(d.key())
        if err != nil {
            callback.AddError(err)
            return
        }
        select {
        case e := <-ch:
            switch e.Type {
            case zk.EventNodeCreated, zk.EventNodeChildrenChanged:
                for _, v := range snapshot {
                    callback.Delete(v)
                }
                if err := d.listService(callback); err != nil {
                    callback.AddError(err)
                }
            case zk.EventNodeDeleted:
                for _, v := range snapshot {
                    callback.Delete(v)
                }
            }
        }
    }
}

func (d *zookeeperDiscover) getNodeProperty(path string) ([]byte, error) {
    value, _, err := d.conn.Get(path)
    return value, err
}

func (d *zookeeperDiscover) listService(callback EndpointCacher) error {
    childs, _, err := d.conn.Children(d.key())
    if err != nil {
        return err
    }

    for _, c := range childs {
        value, _, err := d.conn.Get(fmt.Sprintf("%s/%s", d.key(), c))
        if err != nil {
            return err
        }
        callback.AddOrUpdate(c, value)
    }
    return nil
}

func (d *zookeeperDiscover) key() string {
    return fmt.Sprintf("%s/%s", d.prefix, d.serviceName)
}

consul

服务注册端

type consulRegister struct {
    prefix              string
    serviceName         string
    serviceId           string
    endpoint            string
    healthCheckEndpoint string
    attr                string
    ttl                 int64
    stopCh              chan struct{}
    client              *consulapi.Client
}

func newConsulRegister(cfg *RegisterConfig) (*consulRegister, error) {
    config := consulapi.DefaultConfig()
    config.Address = strings.Join(cfg.BackendEndPoints, ",")
    client, err := consulapi.NewClient(config)
    if err != nil {
        return nil, err
    }

    r := &consulRegister{
        prefix:              cfg.DiscoverPrefix,
        serviceName:         cfg.ServiceName,
        endpoint:            cfg.ServiceEndPoint,
        healthCheckEndpoint: cfg.HealthCheckEndPoint,
        attr:                cfg.Attr,
        ttl:                 cfg.HeartBeatPeriod,
        stopCh:              make(chan struct{}),
        client:              client,
    }
    return r, nil
}

func (s *consulRegister) Start() error {
    return s.register()
}

func (s *consulRegister) Stop() error {
    if s.serviceId != "" {
        return s.client.Agent().ServiceDeregister(s.serviceId)
    }
    return nil
}

func (s *consulRegister) register() error {
    registration := new(consulapi.AgentServiceRegistration)

    address, port, err := net.SplitHostPort(s.endpoint)
    if err != nil {
        return err
    }

    registration.Address = address
    portInt, err := strconv.Atoi(port)
    if err != nil {
        return err
    }
    registration.Port = portInt

    serviceId := fmt.Sprintf("%s_%s", s.serviceName, address)
    s.serviceId = serviceId

    registration.Name = s.serviceName
    registration.ID = serviceId

    serviceCheck := new(consulapi.AgentServiceCheck)
    serviceCheck.HTTP = fmt.Sprintf("http://%s", s.healthCheckEndpoint)
    serviceCheck.Timeout = "2s"
    serviceCheck.Interval = "2s"
    serviceCheck.DeregisterCriticalServiceAfter = "30s"

    registration.Check = serviceCheck
    return s.client.Agent().ServiceRegister(registration)
}

服务发现端

type consulDiscover struct {
    ctx         context.Context
    cancel      context.CancelFunc
    prefix      string
    serviceName string
    client      *consulapi.Client
}

func newConsulDiscover(cfg *DiscoverConfig) (*consulDiscover, error) {
    config := consulapi.DefaultConfig()
    config.Address = strings.Join(cfg.BackendEndPoints, ",")
    client, err := consulapi.NewClient(config)
    if err != nil {
        return nil, err
    }

    ctx, cancel := context.WithCancel(context.Background())
    d := &consulDiscover{
        ctx:         ctx,
        cancel:      cancel,
        prefix:      cfg.DiscoverPrefix,
        serviceName: cfg.ServiceName,
        client:      client,
    }
    return d, nil
}

func (d *consulDiscover) Start(callback EndpointCacher) {
    d.discover(callback)
}

func (d *consulDiscover) Stop() {
    d.cancel()
}

func (d *consulDiscover) discover(callback EndpointCacher) {
    var lastIndex uint64
    for {
        select {
        case <-d.ctx.Done():
            return
        default:
            services, queryMeta, err := d.client.Health().Service(
                d.serviceName, "", false, &consulapi.QueryOptions{
                    WaitIndex: lastIndex,
                })
            if err != nil {
                callback.AddError(err)
            }
            lastIndex = queryMeta.LastIndex

            for _, service := range services {
                var attr []byte
                endpoint := fmt.Sprintf("%s:%v", service.Service.Address, service.Service.Port)
                switch service.Checks.AggregatedStatus() {
                case consulapi.HealthPassing:
                    if service.Service.Meta != nil {
                        attr, _ = json.Marshal(service.Service.Meta)
                    }
                    callback.AddOrUpdate(endpoint, attr)
                case consulapi.HealthCritical, consulapi.HealthWarning:
                    callback.Delete(endpoint)
                }
            }
        }
    }
}

func (d *consulDiscover) listService() {
    d.client.Agent().Services()
}

示例代码

etcd

服务发现端

package main

import (
    "fmt"
    "os"
    
    "github.com/goeasya/discox"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:      discox.EtcdBackend,
        BackendEndPoints: []string{"http://10.1.1.1:23790"},
        DiscoverPrefix:   "/discox/etcddemo",
        ServiceName:      "demo",
        HeartBeatPeriod:  5,
        ServiceEndPoint:  "127.0.0.1:8111",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }
    defer service.Stop()
    select {}
}

服务注册端

package main

import (
    "fmt"
    "os"
    "time"
    
    "github.com/goeasya/discox"
)

func main() {
    timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{
        BackendEndPoints: []string{"http://10.1.1.1:23790"},
        BackendType:      discox.EtcdBackend,
        DiscoverPrefix:   "/discox/etcddemo",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

zookeeper

服务发现端

package main

import (
    "fmt"
    "github.com/goeasya/discox"
    "os"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:      discox.ZookeeperBackend,
        BackendEndPoints: []string{"10.1.1.1:2181"},
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
        HeartBeatPeriod:  5,
        ServiceEndPoint:  "127.0.0.1:8111",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }
    defer service.Stop()
    select {}
}

服务注册端

package main

import (
    "fmt"
    "github.com/goeasya/discox"
    "os"
    "time"
)

func main() {
    timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{
        BackendEndPoints: []string{"10.1.1.1:2181"},
        BackendType:      discox.ZookeeperBackend,
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

consul

服务发现端

package main

import (
    "fmt"
    "net/http"
    "os"
    
    "github.com/goeasya/discox"
)

func main() {
    cfg := discox.RegisterConfig{
        BackendType:         discox.ConsulBackend,
        BackendEndPoints:    []string{"consul.test.com"},
        DiscoverPrefix:      "/soaservices",
        ServiceName:         "demo",
        HeartBeatPeriod:     5,
        ServiceEndPoint:     "172.18.1.1:8080",
        HealthCheckEndPoint: "172.18.1.1:8080/check",
    }
    service, err := discox.NewRegister(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    if err = service.Start(); err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }
    http.HandleFunc("/check", consulCheck)
    go http.ListenAndServe(":8080", nil)
    defer service.Stop()
    select {}

}

var count int64

func consulCheck(w http.ResponseWriter, r *http.Request) {

    s := "consulCheck" + fmt.Sprint(count) + "remote:" + r.RemoteAddr + " " + r.URL.String()
    fmt.Println(s)
    fmt.Fprintln(w, s)
    count++
}

服务注册端

package main

import (
    "fmt"
    "os"
    "time"
    
    "github.com/goeasya/discox"
)

func main() {
    timer := time.NewTimer(time.Second * 5)
    cfg := discox.DiscoverConfig{
        BackendEndPoints: []string{"consul.test.com"},
        BackendType:      discox.ConsulBackend,
        DiscoverPrefix:   "/soaservices",
        ServiceName:      "demo",
    }
    server, err := discox.NewDiscover(&cfg)
    if err != nil {
        fmt.Println(err.Error())
        os.Exit(1)
    }

    endpointCacher := discox.NewLiteEndpoint()
    go server.Start(endpointCacher)
    defer server.Stop()

    for {
        select {
        case <-timer.C:
            fmt.Println("time 5 seconds")
            endpoints := endpointCacher.List()
            fmt.Println(endpoints, len(endpoints))
            timer.Reset(time.Second * 5)
        }
    }
}

代码github地址