协程 Coroutine
- 轻量级“线程”
- 非抢占式多任务处理,由协程主动交出控制权
- 编译器/解释器/虚拟机层面的多任务
- 多个协程可能在一个或多个线程上运行
Subroutines are special cases of more general program components,called coroutines.In contrast to the unsymmetric
– 子程序是协程的一个特例
其它语言中的协程
- C++: Boost.Coroutine
- Java: 不支持
- python
使用yield关键字实现协程
Python3.5加入了async def对协程原生支持
goroutine
- 任何函数只需加上go就能送给调度器运行
- 不需要在定义时区分是否是一步函数
- 调度器在合适的点进行切换
- 使用-race来检测数据访问冲突
goroutine可能的切换点
- I/O, select
- channel
- 等待锁
- 函数调用(有时)
- runtime.Gosched()
只是参考,不能保证在其它地方不切换
channel
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for n := range c {
//n, ok := <-c
//if !ok {
// break
//}
fmt.Printf("Worker %d received %d\n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func chanDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
time.Sleep(time.Millisecond)
}
func bufferedChannel() {
c := make(chan int, 3) //提升性能
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}
func channelClose() {
c := make(chan int, 3) //提升性能
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}
func main() {
//Channel as first-class citizen
chanDemo()
//Buffered channel
bufferedChannel()
//Channel close and rage
channelClose()
}
理论基础:Communication Sequential Process(CSP)
Don’t communicate by sharing memory;share memory by communicating.
– 不要通过共享内存来通信;通过通信来共享内存
WaitGroup 使用Channel等待任务结束
package main
import (
"fmt"
"sync"
)
func doWorker(id int, w worker) {
for n := range w.in {
fmt.Printf("Worker %d received %c\n", id, n)
//go func() { done <- true }()
w.done()
}
}
type worker struct {
in chan int
done func()
}
func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWorker(id, w)
return w
}
func chanDemo() {
var wg sync.WaitGroup
wg.Add(20)
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
for i, worker := range workers {
worker.in <- 'a' + i
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wg.Wait()
}
func main() {
//Channel as first-class citizen
chanDemo()
}
使用Channel进行树的遍历
type Node struct {
Value int
Left, Right *Node
}
func (node *Node) TraverseFunc(f func(*Node)) {
if node == nil {
return
}
node.Left.TraverseFunc(f)
f(node)
node.Right.TraverseFunc(f)
}
func (node *Node) TraverseWithChannel() chan *Node {
out := make(chan *Node)
go func() {
node.TraverseFunc(func(node *Node) {
out <- node
})
close(out)
}()
return out
}
func main() {
c := TraverseWithChannel()
maxNode := 0
for node := range c {
if node.Value > maxNode {
maxNode = node.Value
}
}
fmt.Println("Max node value:", maxNode)
}
使用select来进行调度
- select 的使用
- 定时器的使用
- 在select只能够使用nil channel
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func worker(id int, c chan int) {
for n := range c {
time.Sleep(time.Second)
fmt.Printf("Worker %d received %d\n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func main() {
var c1, c2 = generator(), generator()
var worker = createWorker(0)
var values []int
tm := time.After(10 * time.Second)
tick := time.Tick(time.Second)
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = worker
activeValue = values[0]
}
select {
case n := <-c1:
values = append(values, n)
case n := <-c2:
values = append(values, n)
case activeWorker <- activeValue:
values = values[1:]
case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
case <-tick:
fmt.Println("queue len =", len(values))
case <-tm:
fmt.Println("bye")
return
}
}
}
传统同步机制(尽量少用)
- WaitGroup
- Mutex(互斥量)
- Cond
package main
import (
"fmt"
"sync"
"time"
)
type atomicInt struct {
value int
lock sync.Mutex
}
func (a *atomicInt) increment() {
fmt.Println("safe increment")
func() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}()
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get())
}