请求者向均衡服务发送请求

type Request struct {
  fn func() int  // The operation to perform.
  c  chan int    // The channel to return the result.
}

注意这返回的通道是放在请求内部的。通道是first-class值

能很好的模拟一个请求者,一个负载产生者

func requester(work chan<- Request) {
  c := make(chan int)
  for {
      // Kill some time (fake load).
      Sleep(rand.Int63n(nWorker * 2 * Second))
      work <- Request{workFn, c} // send request
      result := <-c              // wait for answer
      furtherProcess(result)  
  }    
}

请求通道,加上一些负载记录数据

type Worker struct {
  requests chan Request // work to do (buffered channel)
  pending  int          // count of pending tasks
  index     int         // index in the heap
}

均衡服务将请求发送给压力最小的worker

func (w *Worker) work(done chan *Worker) {
  for {
      req := <-w.requests // get Request from balancer
      req.c <- req.fn()   // call fn and send result
      done <- w           // we've finished this request
  }
}

请求通道(w.requests)将请求提交给各个worker。均衡服务跟踪请求待处理的数量来判断负载情况。

每个响应直接反馈给它的请求者。

定义负载均衡器

// 负载均衡器需要一个装很多worker的池子和一个通道来让请求者报告任务完成情况。
type Pool []*Worker
type Balancer struct {
  pool Pool
  done chan *Worker
}

负载均衡函数

func (b *Balancer) balance(work chan Request) {
  for {
      select {
      case req := <-work: // received a Request...
          b.dispatch(req) // ...so send it to a Worker
      case w := <-b.done: // a worker has finished ...
          b.completed(w)  // ...so update its info
      }
  }
}

将负载均衡的池子用一个Heap接口实现

// 使用堆来跟踪负载情况
func (p Pool) Less(i, j int) bool {
  return p[i].pending < p[j].pending
}

Dispatch

// Send Request to worker
func (b *Balancer) dispatch(req Request) {
  // Grab the least loaded worker...
  w := heap.Pop(&b.pool).(*Worker)
  // ...send it the task.
  w.requests <- req
  // One more in its work queue.
  w.pending++
  // Put it into its place on the heap.
  heap.Push(&b.pool, w)
}

Completed

// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
  // One fewer in the queue.
  w.pending--
  // Remove it from heap.                  
  heap.Remove(&b.pool, w.index)
  // Put it into its place on the heap.
  heap.Push(&b.pool, w)
}

一个复杂的问题可以被拆分成容易理解的组件。它们可以被并发的处理。结果就是容易理解,高效,可扩展,好用。或许更加并行。