工作池

在这个例子中,我们将看到如何使用 Go 协程和通道实现一个工作池 。

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "processing job", j)
		time.Sleep(time.Second)
		results <- j * 2
	}
}

// 这是我们将要在多个并发实例中支持的任务了。
// 这些执行者将从 jobs 通道接收任务,并且通过 results 发送对应的结果。
// 我们将让每个任务间隔 1s 来模仿一个耗时的任务
func main() {
	// 为了使用 worker 工作池并且收集他们的结果,我们需要2 个通道。
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	// 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// 这里我们发送 9 个 jobs,然后 close 这些通道来表示这些就是所有的任务了。
	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	close(jobs)

	// 最后,我们收集所有这些任务的返回值。
	for a := 1; a <= 9; a++ {
		<-results
	}
}

输出:

worker 2 processing job 2
worker 3 processing job 3
worker 1 processing job 1
worker 1 processing job 4
worker 3 processing job 5
worker 2 processing job 6
worker 1 processing job 7
worker 3 processing job 8
worker 2 processing job 9

执行这个程序,显示 9 个任务被多个 worker 执行。
整个程序处理所有的任务仅执行了 3s 而不是 9s,是因为 3 个 worker是并行的。

速率限制

速率限制(英) 是一个重要的控制服务资源利用和质量的途径。Go 通过 Go 协程、通道和打点器优美的支持了速率限制。

func main() {
	// 首先我们将看一下基本的速率限制。
	// 假设我们想限制我们接收请求的处理,我们将这些请求发送给一个相同的通道。
	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	// 这个 limiter 通道将每 200ms 接收一个值。
	// 这个是速率限制任务中的管理器。
	limiter := time.Tick(time.Millisecond * 200)

	// 通过在每次请求前阻塞 limiter 通道的一个接收,我们限制自己每 200ms 执行一次请求。
	for req := range requests {
		<-limiter
		fmt.Println("request-1", req, time.Now())
	}

	// 有时候我们想临时进行速率限制,并且不影响整体的速率控制我们可以通过通道缓冲来实现。
	// 这个 burstyLimiter 通道用来进行 3 次临时的脉冲型速率限制。
	burstyLimiter := make(chan time.Time, 3)

	// 想将通道填充需要临时改变3次的值,做好准备。这里写入相同的3个时间
	for i := 0; i < 3; i++ {
		burstyLimiter <- time.Now()
	}

	// 每 200 ms 我们将添加一个新的值到 burstyLimiter中,直到达到 3 个的限制。
	go func() {
		for t := range time.Tick(time.Millisecond * 200) {
			burstyLimiter <- t
		}
	}()

	// 现在模拟超过 5 个的接入请求。它们中刚开始的 3 个将由于受 burstyLimiter 的“脉冲”影响。
	burstyRequests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		burstyRequests <- i * 10
	}
	close(burstyRequests)
	for req := range burstyRequests {
		<-burstyLimiter
		fmt.Println("request-2", req, time.Now())
	}
}

输出:

request-1 1 2020-12-17 10:15:54.4409286 +0800 CST m=+0.206155901
request-1 2 2020-12-17 10:15:54.6419619 +0800 CST m=+0.407189201
request-1 3 2020-12-17 10:15:54.8420742 +0800 CST m=+0.607301501
request-1 4 2020-12-17 10:15:55.0410305 +0800 CST m=+0.806257801
request-1 5 2020-12-17 10:15:55.2409456 +0800 CST m=+1.006172901
request-2 10 2020-12-17 10:15:55.2409456 +0800 CST m=+1.006172901
request-2 20 2020-12-17 10:15:55.2409456 +0800 CST m=+1.006172901
request-2 30 2020-12-17 10:15:55.2409456 +0800 CST m=+1.006172901
request-2 40 2020-12-17 10:15:55.4417999 +0800 CST m=+1.207027201
request-2 50 2020-12-17 10:15:55.6415683 +0800 CST m=+1.406795601

运行程序,我们看到第一批请求意料之中的大约每 200ms 处理一次。
第二批请求,我们直接连续处理了 3 次,这是由于这个“脉冲”速率控制,然后大约每 200ms 处理其余的 2 个。

原子计数器

Go 中最主要的状态管理方式是通过通道间的沟通来完成的,前面的工作池的例子中碰到过,但是还是有一些其他的方法来管理状态的。
这里我们将看看如何使用 sync/atomic包在多个 Go 协程中进行 原子计数 。

import (
	"fmt"
	"runtime"
	"sync/atomic"
	"time"
)

func main() {
	// 我们将使用一个无符号整型数来表示(永远是正整数)这个计数器。
	var ops uint64 = 0

	// 为了模拟并发更新,我们启动 50 个 Go 协程,对计数器每隔 1ms (译者注:应为非准确时间)进行一次加一操作。
	for i := 0; i < 50; i++ {
		go func() {
			for {
				// 使用 AddUint64 来让计数器自动增加,使用& 语法来给出 ops 的内存地址。
				atomic.AddUint64(&ops, 1)

				// 允许其它 Go 协程的执行
				// 用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其它等待的任务运行,并在下次某个时候从该位置恢复执行。
				runtime.Gosched()
			}
		}()
	}

	// 等待一秒,让 ops 的自加操作执行一会。
	time.Sleep(time.Second)

	// 为了在计数器还在被其它 Go 协程更新时,安全的使用它,我们通过 LoadUint64 将当前值的拷贝提取到 opsFinal中。
	// 和上面一样,我们需要给这个函数所取值的内存地址 &ops
	opsFinal := atomic.LoadUint64(&ops)
	fmt.Println("ops:", opsFinal)
}

输出:

ops: 5071946

执行这个程序,显示我们执行了大约 5071946 次操作
操作次数和当前计算机的CPU性能有关

互斥锁

在前面的例子中,我们看到了如何使用原子操作来管理简单的计数器。
对于更加复杂的情况,我们可以使用一个互斥锁来在 Go 协程间安全的访问数据。

import (
	"fmt"
	"math/rand"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

func main() {
	// 在我们的例子中,state 是一个 map。
	var state = make(map[int]int)

	// 这里的 mutex 将同步对 state 的访问。
	var mutex = &sync.Mutex{}

	// we'll see later, ops will count how manyoperations we perform against the state.
	// 为了比较基于互斥锁的处理方式和我们后面将要看到的其他方式,ops 将记录我们对 state 的操作次数。
	var ops int64 = 0

	// 这里我们运行 100 个 Go 协程来重复读取 state。
	for r := 0; r < 100; r++ {
		go func() {
			total := 0
			for {
				// 每次循环读取,我们使用一个键来进行访问,Lock() 这个 mutex 来确保对 state 的独占访问,读取选定的键的值,Unlock() 这个mutex,并且 ops 值加 1。
				key := rand.Intn(5)
				mutex.Lock()
				total += state[key]
				mutex.Unlock()
				atomic.AddInt64(&ops, 1)

				// 为了确保这个 Go 协程不会在调度中饿死,我们在每次操作后明确的使用 runtime.Gosched()进行释放。
				// 这个释放一般是自动处理的,像例如每个通道操作后或者 time.Sleep 的阻塞调用后相似,但是在这个例子中我们需要手动的处理。
				runtime.Gosched()
			}
		}()
	}

	// 同样的,我们运行 10 个 Go 协程来模拟写入操作,使用和读取相同的模式。
	for w := 0; w < 10; w++ {
		go func() {
			for {
				key := rand.Intn(5)
				val := rand.Intn(100)
				mutex.Lock()
				state[key] = val
				mutex.Unlock()
				atomic.AddInt64(&ops, 1)
				runtime.Gosched()
			}
		}()
	}

	// 让这 10 个 Go 协程对 state 和 mutex 的操作运行 1 s。
	time.Sleep(time.Second)

	// 获取并输出最终的操作计数。
	opsFinal := atomic.LoadInt64(&ops)
	fmt.Println("ops:", opsFinal)

	// 对 state 使用一个最终的锁,显示它是如何结束的。
	mutex.Lock()
	fmt.Println("state:", state)
	mutex.Unlock()
}

输出:

ops: 3476188
state: map[0:64 1:20 2:47 3:61 4:89]

运行这个程序,显示我们对已进行了同步的 state 执行了3,500,000 次操作。