很久以前分析过一款断路器golang 源码分析(39)hystrix-go,最近看了一款类似的断路器sony/gobreaker,https://github.com/sony/gobreaker分析下它的源码,感觉理解又一步加深了,它的源码很简单就一个gobreaker.go,并且它携带了一个例子example/http_breaker.go,我们先从例子入手分析,它主要包含了两个函数,首先是初始化函数:

    func init()
      var st gobreaker.Settings
        st.ReadyToTrip = func(counts gobreaker.Counts) bool {}
      cb = gobreaker.NewCircuitBreaker(st)

它通过ReadyToTrip设置短路规则,用Settings来初始化一个断路器。

第二个函数是cb.Execute函数,它的参数是一个函数,我们可以在函数内部定义封装我们的业务请求,比如做成http client的middleware

    func Get(url string) ([]byte, error) 
      body, err := cb.Execute(func() (interface{}, error) {}

通过上面两步,我们可以很快实现我们的breaker接入,很简单有没有。

下面我们分析下gobreaker的源码gobreaker.go

首先定义了断路器的状态

type State int

它分为三种:关闭,半开,打开

const (
  StateClosed State = iota
  StateHalfOpen
  StateOpen
)

然后是断路器特有的两个错误码

    ErrTooManyRequests = errors.New("too many requests")
     // 请求数量超出半开的限制
    ErrOpenState = errors.New("circuit breaker is open")
     // 断路器打开

state有一个方法,用于序列化

  func (s State) String() string

接着我们看下计数器,它保存了请求的成功和失败计数,以及连续成功数和连续失败数,是决策断路器打开关闭的依据。断路器会定时会清理这些计数,每清理一次产生一个新的generation。

type Counts struct {
  Requests             uint32
  TotalSuccesses       uint32
  TotalFailures        uint32
  ConsecutiveSuccesses uint32
  ConsecutiveFailures  uint32
}

对应的,有四个修改计数器的函数,比如onSucess成功一次就会把连续失败次数清0,成功数和连续成功数加一

     func (c *Counts) onRequest() 
      func (c *Counts) onSuccess() 
      func (c *Counts) onFailure() 
      func (c *Counts) clear()

接着我们看下Setting数,它是我们使用的时候可控参数的集合:

type Settings struct {
  Name          string
  MaxRequests   uint32
  Interval      time.Duration
  Timeout       time.Duration
  ReadyToTrip   func(counts Counts) bool
  OnStateChange func(name string, from State, to State)
  IsSuccessful  func(err error) bool
}

断路器的设置参数含义分析如下:

  • MaxRequests:半开状态下允许通过的最大请求数,超过这个数会失败报错
  • Interval:闭合状态下清理计数器的时间间隔;闭合状态下如果间隔<=0计数器不清零
  • Timeout:断路器打开状态的时间,超过这个时间后变成半开,如果设置的值小于等于0,默认会设置成60s
  • ReadyToTrip:闭合状态下,请求失败一次它就会调用,如果返回true,就会变成打开状态,如果是nil会调用默认的,它的定义是连续失败5次,返回true。我们可以在这个函数里定义我们期望的断路器打开策略。
  • OnStateChange:状态变化的时候调用
  • IsSuccessful:请求是否成功,根据这个值来修改计数器;如果是nil会调用默认函数,当返回的错误不是nil就认为是false

接下来就是非常重要的断路器结构,维护了请求过程中三个状态切换的状态机:

type CircuitBreaker struct {
  name          string
  maxRequests   uint32
  interval      time.Duration
  timeout       time.Duration
  readyToTrip   func(counts Counts) bool
  isSuccessful  func(err error) bool
  onStateChange func(name string, from State, to State)


  mutex      sync.Mutex
  state      State
  generation uint64
  counts     Counts
  expiry     time.Time
}

两阶段断路器提供了另外一种使用方法,不是通过Execute函数来包裹整个请求,而是通过单独一步回调的方式来检查请求结果是否成功。

type TwoStepCircuitBreaker struct {
  cb *CircuitBreaker
}

我们看下断路器的构造函数做了啥:

func NewCircuitBreaker(st Settings) *CircuitBreaker 
      defaultReadyToTrip
        counts.ConsecutiveFailures > 5
      defaultIsSuccessful
        err == nil
      cb.toNewGeneration(time.Now())

它通过Settings来初始化了断路器的参数,判断两个函数ReadyToTrip,IsSuccessful 是否有指定,没有指定的话使用默认函数,最后用当前时间戳,初始化了我们状态机。

两阶段断路器类似:

func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker
    func (cb *CircuitBreaker) Name() string

获取当前状态:

    func (cb *CircuitBreaker) State() State 
      cb.mutex.Lock()
      now := time.Now()
      state, _ := cb.currentState(now)
func (cb *CircuitBreaker) Counts() Counts 

执行传入的请求,如果成功更新成功计数,panic也当作失败处理:

func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error)
      generation, err := cb.beforeRequest()
      cb.afterRequest(generation, false)
      result, err := req()
      cb.afterRequest(generation, cb.isSuccessful(err))
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error)
      generation, err := tscb.cb.beforeRequest()
      return func(success bool) {
        tscb.cb.afterRequest(generation, success)
      }, nil

每次执行的时候都会调用两个函数beforeRequest和afterRequest,前者获取当前的状态和generation,增加访问计数,返回对应的错误,如果断路器打开,或者半开但是访问计数达到了最大值,不发送请求:

func (cb *CircuitBreaker) beforeRequest() (uint64, error)
  state, generation := cb.currentState(now)
  if state == StateOpen {
    return generation, ErrOpenState
  } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
    return generation, ErrTooManyRequests
  }
   cb.counts.onRequest()

后者根据请求结果的成功失败来更新我们的计数器,成功失败由settings里面的IsSuccess函数的返回值来决定。

func (cb *CircuitBreaker) afterRequest(before uint64, success bool)
    state, generation := cb.currentState(now)
   if generation != before {
    return
  }
   if success {
    cb.onSuccess(state, now)
  } else {
    cb.onFailure(state, now)
  }

计数器的更新是通过onSuccess和onFailure两个函数来实现的,如果连续成功的请求数达到了设置里面的最大请求数,断路器从半开状态变为关闭状态

func (cb *CircuitBreaker) onSuccess(state State, now time.Time) 
        switch state {
  case StateClosed:
    cb.counts.onSuccess()
  case StateHalfOpen:
    cb.counts.onSuccess()
    if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
      cb.setState(StateClosed, now)
    }
  }

onFailure根据settings里面的readyToTrip函数来决策是否由关闭状态到达打开状态,如果是半开状态,本次请求又失败了,它会变回打开状态。

func (cb *CircuitBreaker) onFailure(state State, now time.Time)
        switch state {
  case StateClosed:
    cb.counts.onFailure()
    if cb.readyToTrip(cb.counts) {
      cb.setState(StateOpen, now)
    }
  case StateHalfOpen:
    cb.setState(StateOpen, now)
  }

currentState获取当前状态和分代信息,如果当前是关闭状态,到了过期时间,会把计数器清零,并且产生一个新的分代。如果是打开状态,并且已经过期,会把状态改为半开状态:

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) 
  switch cb.state {
  case StateClosed:
    if !cb.expiry.IsZero() && cb.expiry.Before(now) {
      cb.toNewGeneration(now)
    }
  case StateOpen:
    if cb.expiry.Before(now) {
      cb.setState(StateHalfOpen, now)
    }
  }

每次修改状态都会根据当前时间戳产生一个新的分代,并且调用onStateChange函数

func (cb *CircuitBreaker) setState(state State, now time.Time) {
      cb.toNewGeneration(now)
      cb.onStateChange(cb.name, prev, state)

产生新的分代的时候,会清零计数器,更新过期时间,过期时间只和打开和关闭两个状态有关,半开没有过期时间

func (cb *CircuitBreaker) toNewGeneration(now time.Time) 
      cb.generation++
      cb.counts.clear()
      cb.expiry = now.Add(cb.interval)
      switch cb.state {
  case StateClosed:
    if cb.interval == 0 {
      cb.expiry = zero
    } else {
      cb.expiry = now.Add(cb.interval)
    }
  case StateOpen:
    cb.expiry = now.Add(cb.timeout)
  default: // StateHalfOpen
    cb.expiry = zero
  }

整体代码就分析完了,是不是很简洁明了?当然其中修改状态,使用了锁,分析过程中略去了,其实这里还是有优化空间的,比如换成更轻量级的原子操作。