一、现状

GogoroutineGogoroutinegoroutine
ch := generate() 
go func() { for range ch { } 
}()
generate()channelgoroutinechannelgoroutinechgenerate()goroutine
goroutineG-P-MGgoroutinegoroutine2-4kgoroutinegoroutine
func main()  {for i := 0; i < math.MaxInt64; i++ {go func(i int) {time.Sleep(5 * time.Second)}(i)}
}

如果不加以控制的话,直接崩溃

goroutine
1、Context
goroutineHTTP/RPCGoroutineRPCgoroutineContextgoroutine
Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
goroutineWithCancel
package mainimport ("context""fmt""time"
)func main() {ctx, cancel := context.WithCancel(context.Background())go Speak(ctx)time.Sleep(10 * time.Second)cancel()time.Sleep(2 * time.Second)fmt.Println("拜拜!")
}func Speak(ctx context.Context) {for range time.Tick(time.Second) {select {case <-ctx.Done():fmt.Println("拜拜!")returndefault:fmt.Println("开始执行")}}
}

运行结果:

PS D:\cpz\go-demo\test13> go run test4.go
开始执行
开始执行
开始执行
开始执行
开始执行
开始执行
开始执行
开始执行
开始执行
拜拜!
拜拜!
withCancelBackgroundctxgoroutine1s10sgoroutinegoroutine
2、channel
channelgoroutineGogoroutine+channelcontextgoroutinecontextchannelchannel+selectchannel+closegoroutine
方式1:channel+select
package mainimport "fmt"func fibonacci(ch chan int, done chan struct{}) {x, y := 0, 1for {select {case ch <- x:x, y = y, x+ycase <-done:fmt.Println("over")return}}
}
func main() {ch := make(chan int)done := make(chan struct{})go func() {for i := 0; i < 10; i++ {fmt.Println(<-ch)}done <- struct{}{}}()fibonacci(ch, done)
}
channelchannelchannelselectchannelselectdefaultselect
ChannelChannelcaseChanneldefault
defaultnil channeldefault casenil channelselect
方式2:channel+close
package mainimport ("fmt""time"
)func main() {ch := make(chan int, 10)go func() {for i := 0; i < 10; i++ {ch <- i}close(ch)}()go func() {for val := range ch {fmt.Println(val)}fmt.Println("receive data over")}()time.Sleep(5 * time.Second)fmt.Println("program over")
}
channelfor-rangechannelrange chchannelgoroutine
goroutine
goroutineGogoroutinegoroutinegoroutinegoroutinegoroutine

0、常用的三方库

  • go-playground/pool
  • nozzle/throttler
  • Jeffail/tunny
  • panjf2000/ants

1、协程池

gogoroutinegoroutinegoroutineantsgo-playground/pooljeffail/tunnyants
package mainimport ("fmt"ants "github.com/panjf2000/ants/v2""sync""sync/atomic""time"
)var sum int32func myFunc(i interface{}) {n := i.(int32)atomic.AddInt32(&sum, n)fmt.Printf("run with %d\n", n)
}func demoFunc() {time.Sleep(10 * time.Millisecond)fmt.Println("Hello World!")
}func main() {defer ants.Release()runTimes := 1000// Use the common pool.var wg sync.WaitGroupsyncCalculateSum := func() {demoFunc()wg.Done()}for i := 0; i < runTimes; i++ {wg.Add(1)_ = ants.Submit(syncCalculateSum)}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks.\n")// Use the pool with a function,// set 10 to the capacity of goroutine pool and 1 second for expired duration.p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)wg.Done()})defer p.Release()// Submit tasks one by one.for i := 0; i < runTimes; i++ {wg.Add(1)_ = p.Invoke(int32(i))}wg.Wait()fmt.Printf("running goroutines: %d\n", p.Running())fmt.Printf("finish all tasks, result is %d\n", sum)
}
ants.NewPoolWithFunc()goroutine10goroutinep.Invoke(data)antsgoroutinetaskFuncdata
Semaphore
Gogoroutine
package mainimport ("context""fmt""golang.org/x/sync/semaphore""runtime""sync""time"
)const (// 同时运行的goroutine上限Limit = 5// 信号量的权重Weight = 1
)func main() {start := time.Now()names := []int{1, 2, 3, 4, 5,}for i := 1; i <= 100000; i++ {names = append(names, i)}sem := semaphore.NewWeighted(Limit)var w sync.WaitGroupfor _, name := range names {w.Add(1)go func(name int) {sem.Acquire(context.Background(), Weight)// ... 具体的业务逻辑fmt.Printf("Items is %v, This NumGoroutine is %v\n", name, runtime.NumGoroutine())sem.Release(Weight)w.Done()}(name)}w.Wait()times := time.Since(start)fmt.Println(times)
}
NewWeighted()goroutine3Acquiregoroutinereleasewaiters
3、channel+sync
waitGroupchannelgoroutinechannelgoroutine
package mainimport ("fmt""runtime""sync""time"
)var (// channel长度poolCount = 5// 复用的goroutine数量goroutineCount = 10
)func main() {start := time.Now()jobsChan := make(chan int, poolCount)// workersvar wg sync.WaitGroupfor i := 0; i < goroutineCount; i++ {wg.Add(1)go func() {defer wg.Done()for item := range jobsChan {fmt.Printf("Items is %v, This NumGoroutine is %v\n", item, runtime.NumGoroutine())}}()}// sendersfor i := 0; i < 100000; i++ {jobsChan <- i}// 关闭channel,上游的goroutine在读完channel的内容,就会通过wg的done退出close(jobsChan)wg.Wait()times := time.Since(start)fmt.Println(times)
}
goroutine

四、总结

goroutinegoroutinegoroutinegoroutine