目录
go微服务框架kratos学习笔记六(kratos 服务发现 discovery)
http api
register 服务注册
fetch 获取实例
fetchs 批量获取实例
polls 批量获取实例
nodes 批量获取节点
renew 心跳
cancel 下线
应用发现逻辑
服务注册
服务注册demo
服务注册逻辑
服务发现
测试调用
简单看看官方grpc服务发现逻辑
context deadline exceeded
简单看看官方grpc服务发现逻辑
除了上次的warden直连方式外,kratos有另一个服务发现sdk : discovery
discovery 可以先简单理解为一个http服务、
它最简单的发现过程可能是这样的:
1、service 向discovery 服务注册 appid
2、client 通过 appid 从discovery 查询 service 的addr
当然 远不止这么简单,还包含了很多功能在里面的,例如服务自发现、负载均衡等
本节仅先看个最简单的服务发现的demo
首先走一遍discovery的http的api
http api
// innerRouter init local router api path.
func innerRouter(e *bm.Engine) {
group := e.Group("/discovery")
{
group.POST("/register", register)
group.POST("/renew", renew)
group.POST("/cancel", cancel)
group.GET("/fetch/all", initProtect, fetchAll)
group.GET("/fetch", initProtect, fetch)
group.GET("/fetchs", initProtect, fetchs)
group.GET("/poll", initProtect, poll)
group.GET("/polls", initProtect, polls)
//manager
group.POST("/set", set)
group.GET("/nodes", initProtect, nodes)
}
}
discovery里面的bm引擎注册了这些接口, 接着我用postman 测了测。
register 服务注册
fetch 获取实例
fetchs 批量获取实例
polls 批量获取实例
nodes 批量获取节点
renew 心跳
POST http://HOST/discovery/renew
curl 'http://127.0.0.1:7171/discovery/renew' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"
*****成功*****
{
"code":0,
"message":""
}
****失败****
{
"code":-400,
"message":"-400"
}
cancel 下线
POST http://HOST/discovery/cancel
curl 'http://127.0.0.1:7171/discovery/cancel' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"
*****成功*****
{
"code":0,
"message":""
}
****失败****
{
"code":-400,
"message":"-400"
}
应用发现逻辑
官方应用发现实现逻辑
选择可用的节点,将应用appid加入poll的appid列表
如果polls请求返回err,则切换node节点,切换逻辑与自发现错误时切换逻辑一致
如果polls返回-304 ,说明appid无变更,重新发起poll监听变更
polls接口返回appid的instances列表,完成服务发现,根据需要选择不同的负载均衡算法进行节点的调度
服务注册
服务注册demo
直接new一个demo服务然后将demo服务注册到discovery
主函数里面服务注册部分添加类似下面注册代码。
ip := "127.0.0.1"
port := "9000"
hn, _ := os.Hostname()
dis := discovery.New(nil)
ins := &naming.Instance{
Zone: env.Zone,
Env: env.DeployEnv,
AppID: "demo.service",
Hostname: hn,
Addrs: []string{
"grpc://" + ip + ":" + port,
},
}
cancel, err := dis.Register(context.Background(), ins)
if err != nil {
panic(err)
}
defer cancel()
panic 找不到节点,这个是我们discovery的节点地址 可以在环境变量里面添加。
I:\VSProject\kratos-note\kratos-note\warden\discovery\server>kratos run
INFO 01/04-19:32:28.198 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
panic: invalid discovery config nodes:[] region:region01 zone:zone01 deployEnv:dev host:DESKTOP-NUEKD5O
配置discovery节点后成功注册
I:\VSProject\kratos-note\kratos-note\warden\discovery\server>set DISCOVERY_NODES=127.0.0.1:7171
I:\VSProject\kratos-note\kratos-note\warden\discovery\server>kratos run
INFO 01/04-19:40:25.426 I:/VSProject/kratos-note/kratos-note/warden/discovery/server/cmd/main.go:23 abc start
2020/01/04 19:40:25 start watch filepath: I:\VSProject\kratos-note\kratos-note\warden\discovery\server\configs
INFO 01/04-19:40:25.497 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 0.0.0.0:8000
[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.
INFO 01/04-19:40:25.500 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
INFO 01/04-19:40:25.501 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:248 disocvery: AddWatch(infra.discovery) already watch(false)
INFO 01/04-19:40:25.514 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:631 discovery: successfully polls(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-NUEKD5O&latest_timestamp=0) instances ({"infra.discovery":{"instances":{"sh001":[{"region":"sh","zone":"sh001","env":"dev","appid":"infra.discovery","hostname":"test1","addrs":["http://127.0.0.1:7171"],"version":"","latest_timestamp":1578122538945305700,"metadata":null,"status":1}]},"latest_timestamp":1578122538945305700,"scheduler":null}})
INFO 01/04-19:40:25.527 I:/VSProject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:414 discovery: register client.Get(http://127.0.0.1:7171/discovery/register) env(dev) appid(demo.service) addrs([grpc://127.0.0.1:9000]) success
服务注册逻辑
现在我们跟着日志走一遍。
如图理解,服务注册逻辑应该是register -> renew ->cancel 注册 然后 不停给心跳 最后取消注册。
截取一条本地服务注册日志
操作大概是:
1、启动discovery服务
2、启动demo.server 注册demo.server appid 服务
3、过一小会等待心跳,关闭demo.server
接着可以看到整个日志的过程大致上是 :
1、 0 : 启动dicovery服务
2、 2/3/4 : 服务初始化
3、 5 : polls 长轮循 infra.discovery 服务自发现
4、 6/7: 新的连接 & 服务注册、这时候我们起动的demo.server服务注册上来了
5、 9 : polls 长轮循 infra.discovery 服务自发现
6、 10 : renew心跳
7、 12 : 最后我杀掉了注册的服务,出现了cancel请求。
从日志看逻辑理解基本上也没有太多偏差,接着看看服务发现。
0:discovery -conf discovery-example.toml -log.v=0
1:
2:INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/discovery/syncup.go:160 discovery 3:changed nodes:[127.0.0.1:7171] zones:map[]
4:INFO 01/10-10:31:19.575 C:/server/src/go/pkg/mod/github.com/bilibili/kratos@v0.1.0/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 127.0.0.1:7171
INFO 01/10-10:31:19.575 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(test1) new connection(1)
5:INFO 01/10-10:31:31.796 http-access-log ts=0 method=GET ip=127.0.0.1 traceid= user=no_user params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=0 msg=0 stack=<nil> err= timeout_quota=39.98 path=/discovery/polls ret=0
6:INFO 01/10-10:31:31.798 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1)
7:INFO 01/10-10:31:31.799 http-access-log method=POST user=no_user path=/discovery/register err= ts=0 params=addrs=grpc%3A%2F%2F127.0.0.1%3A9000&appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0&metadata=®ion=region01&status=1&version=&zone=zone01 stack=<nil> ret=0 timeout_quota=39.98 ip=127.0.0.1 msg=0 traceid=
8:INFO 01/10-10:32:01.799 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(DESKTOP-9NFHKD0) delete(1)
9:ERROR 01/10-10:32:01.799 http-access-log method=GET ip=127.0.0.1 err=-304 timeout_quota=39.98 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578623479566211700 ret=-304 msg=-304 stack=-304 ts=30.0011342 traceid=
10:INFO 01/10-10:32:01.799 http-access-log msg=0 err= timeout_quota=39.98 method=POST ip=127.0.0.1 user=no_user ret=0 path=/discovery/renew traceid= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01 stack=<nil> ts=0
11:INFO 01/10-10:32:01.800 C:/server/src/go/src/discovery/registry/registry.go:219 Polls from(DESKTOP-9NFHKD0) new connection(1)
12:INFO 01/10-10:32:08.499 http-access-log timeout_quota=39.98 path=/discovery/cancel ret=0 stack=<nil> ip=127.0.0.1 msg=0 traceid= ts=0 method=POST user=no_user err= params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01
服务发现
同样先配置discovert节点 set DISCOVERY_NODES=127.0.0.1:7171
NewClient()改成如下方式
package dao
import (
"context"
"github.com/bilibili/kratos/pkg/naming/discovery"
"github.com/bilibili/kratos/pkg/net/rpc/warden"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"
"google.golang.org/grpc"
)
// AppID your appid, ensure unique.
const AppID = "demo.service" // NOTE: example
func init(){
// NOTE: 注意这段代码,表示要使用discovery进行服务发现
// NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行
// NOTE: !!!切记不要在一个进程内进行多个不同中间件的Register!!!
// NOTE: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(DISCOVERY_NODES)指定discovery节点
resolver.Register(discovery.Builder())
}
// NewClient new member grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) {
client := warden.NewClient(cfg, opts...)
conn, err := client.Dial(context.Background(), "discovery://default/"+AppID)
if err != nil {
return nil, err
}
// 注意替换这里:
// NewDemoClient方法是在"api"目录下代码生成的
// 对应proto文件内自定义的service名字,请使用正确方法名替换
return NewDemoClient(conn), nil
}
同时嵌入dao结构里面、和上次warden direct方式一样做SayHello接口测试调用。
// dao dao.
type dao struct {
db *sql.DB
redis *redis.Redis
mc *memcache.Memcache
demoClient demoapi.DemoClient
cache *fanout.Fanout
demoExpire int32
}
// New new a dao and return.
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, err error) {
var cfg struct{
DemoExpire xtime.Duration
}
if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
return
}
grpccfg := &warden.ClientConfig{
Dial: xtime.Duration(time.Second * 10),
Timeout: xtime.Duration(time.Millisecond * 250),
Subset: 50,
KeepAliveInterval: xtime.Duration(time.Second * 60),
KeepAliveTimeout: xtime.Duration(time.Second * 20),
}
//paladin.Get("grpc.toml").UnmarshalTOML(grpccfg)
var grpcClient demoapi.DemoClient
grpcClient, err = NewClient(grpccfg)
d = &dao{
db: db,
redis: r,
mc: mc,
demoClient : grpcClient,
cache: fanout.New("cache"),
demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
}
return
}
测试调用
操作流程
1、启动discovery服务
2、启动demo.server 注册为 demo.server 服务
3、启动demo.client、
4、最后从demo.client的SayHello http接口 调到demo.server的grpc SayHello 接口。
简单看看官方grpc服务发现逻辑
context deadline exceeded
我发现个别时候调用做服务发现,会发现client起不来, context deadline exceeded。
因为我把new client加在了dao里面,超时的话,demo.client就直接pannic了
根据client日志可以发现
warden client: dial discovery://default/demo.service?subset=50 error context deadline exceeded!panic: context deadline exceeded
client : host:127.0.0.1:7171, url:http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578902420717217500
在调用discovery polls的时候超时了,我配置的grpc dial 期限为10s, 在官方discovery文档介绍中写到discovery在做服务节点自发现的时候,如果server节点实例没有变更,则接口会阻塞直到30s返回-304。(poll(polls) 接口为长轮训接口)
关于服务自发现的话,这里不细看了,本节只关注应用发现逻辑,感兴趣可以去discovery上看看。
INFO 01/10-15:22:34.436 http-access-log method=GET path=/discovery/polls user=no_user params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=0 stack=<nil> err= timeout_quota=39.98 ts=0 msg=0 traceid= ip=127.0.0.1 ret=0
INFO 01/10-15:22:34.438 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(CLII) new connection(1)
INFO 01/10-15:22:34.440 C:/server/src/go/src/discovery/registry/registry.go:228 Polls from(CLII) reuse connection(2)
INFO 01/10-15:22:44.219 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(DESKTOP-9NFHKD0) delete(1)
ERROR 01/10-15:22:44.219 http-access-log path=/discovery/polls ret=-304 msg=-304 timeout_quota=39.98 ip=127.0.0.1 params=appid=infra.discovery&env=dev&hostname=DESKTOP-9NFHKD0&latest_timestamp=1578637331623587200 user=no_user ts=39.9808023 err=-304 traceid= method=GET stack=-304
INFO 01/10-15:22:44.221 C:/server/src/go/src/discovery/registry/registry.go:222 Polls from(DESKTOP-9NFHKD0) new connection(1)
INFO 01/10-15:22:44.525 http-access-log ts=0 method=POST ip=127.0.0.1 user=no_user stack=<nil> path=/discovery/renew err= traceid= ret=0 msg=0 timeout_quota=39.98 params=appid=demo.service&env=dev&hostname=DESKTOP-9NFHKD0®ion=region01&zone=zone01
INFO 01/10-15:23:04.438 C:/server/src/go/src/discovery/registry/registry.go:370 DelConns from(CLII) count decr(2)
ERROR 01/10-15:23:04.438 http-access-log msg=-304 ts=30.0002154 method=GET err=-304 stack=-304 timeout_quota=39.98 ip=127.0.0.1 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=CLII&latest_timestamp=1578637331623587200 ret=-304 traceid=
INFO 01/10-15:23:04.440 C:/server/src/go/src/discovery/registry/registry.go:373 DelConns from(CLII) delete(1)
ERROR 01/10-15:23:04.440 http-access-log ts=30.0013758 traceid= user=no_user path=/discovery/polls ret=-304 err=-304 method=GET ip=127.0.0.1 params=appid=infra.discovery&appid=demo.service&env=dev&hostname=CLII&latest_timestamp=1578637331623587200&latest_timestamp=0 msg=-304 stack=-304 timeout_quota=39.98
结合discovery 日志
15:22:34的client发dial
15:22:45左右client panic
15:23:04dicovery才回复一个-304 (实例信息无变更)
这实际上是因为 client.Dial() 里面封装了grpc官方的服务发现,当然最终走的是kratos warden里面的实现的grpc官方服务发现逻辑。
下面简单看看这层逻辑,很绕,我也没看懂,只能简单看看,有机会接触再补个详细的。
简单看看官方grpc服务发现逻辑
// NewClient new grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (demoapi.DemoClient, error) {
client := warden.NewClient(cfg, opts...)
cc, err := client.Dial(context.Background(), fmt.Sprintf("discovery://default/%s", AppID))
if err != nil {
return nil, err
}
return demoapi.NewDemoClient(cc), nil
}
实际上 client.Dial() 里面会有会有这么一个流程 :
client.Dial() - > grpc里面DialContext() -> parser target 的 scheme 然后获取 (这里是discovery) 对应的Builder
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc.parsedTarget = parseTarget(cc.target)
grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
if cc.dopts.resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
}
} else {
cc.parsedTarget = resolver.Target{Endpoint: target}
}
DialContext() 成功会得到 -> 结构体ClientConn -> ClientConn.resolverWrapper 初始化 -> 调用build()
defer ccr.resolverMu.Unlock()
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
}
用户Builder的实现进行UpdateState —> ClientConn的updateResolverState -> updateResolverState -> Address初始化等grpc官方逻辑
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
kratos discovery
warden包装了gRPC的整个服务发现实现逻辑,代码分别位于pkg/naming/naming.go和warden/resolver/resolver.go中
naming.goInstanceRegistryResolver
// Resolver resolve naming service
type Resolver interface {
Fetch(context.Context) (*InstancesInfo, bool)
Watch() <-chan struct{}
Close() error
}
// Registry Register an instance and renew automatically.
type Registry interface {
Register(ctx context.Context, ins *Instance) (cancel context.CancelFunc, err error)
Close() error
}
// InstancesInfo instance info.
type InstancesInfo struct {
Instances map[string][]*Instance `json:"instances"`
LastTs int64 `json:"latest_timestamp"`
Scheduler *Scheduler `json:"scheduler"`
}
resolver.goresolver.Builderresolver.Resolvernaming.Buildernaming.Resolver
// Resolver resolve naming service
type Resolver interface {
Fetch(context.Context) (*InstancesInfo, bool)
Watch() <-chan struct{}
Close() error
}
// Builder resolver builder.
type Builder interface {
Build(id string) Resolver
Scheme() string
}
naming.Buildernaming.Resolverappid服务发现中间件discoveryFetchWatch
在naming/discovery内实现了基于discovery为中间件的服务注册与发现逻辑。大致上也可以在这里面看到做了对discovery服务中间件的polls请求。
// Build disovery resovler builder.
func (d *Discovery) Build(appid string, opts ...naming.BuildOpt) naming.Resolver {
r := &Resolve{
id: appid,
d: d,
event: make(chan struct{}, 1),
opt: new(naming.BuildOptions),
}
for _, opt := range opts {
opt.Apply(r.opt)
}
d.mutex.Lock()
app, ok := d.apps[appid]
if !ok {
app = &appInfo{
resolver: make(map[*Resolve]struct{}),
}
d.apps[appid] = app
cancel := d.cancelPolls
if cancel != nil {
cancel()
}
}
app.resolver[r] = struct{}{}
d.mutex.Unlock()
if ok {
select {
case r.event <- struct{}{}:
default:
}
}
log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok)
d.once.Do(func() {
go d.serverproc()
})
return r
}
func (d *Discovery) serverproc() {
var (
retry int
ctx context.Context
cancel context.CancelFunc
)
ticker := time.NewTicker(time.Minute * 30)
defer ticker.Stop()
for {
if ctx == nil {
ctx, cancel = context.WithCancel(d.ctx)
d.mutex.Lock()
d.cancelPolls = cancel
d.mutex.Unlock()
}
select {
case <-d.ctx.Done():
return
case <-ticker.C:
d.switchNode()
default:
}
apps, err := d.polls(ctx)
if err != nil {
d.switchNode()
if ctx.Err() == context.Canceled {
ctx = nil
continue
}
time.Sleep(time.Second)
retry++
continue
}
retry = 0
d.broadcast(apps)
}
}