一、现状
GogoroutineGogoroutinegoroutine
ch := generate() go func() { for range ch { } }()
generate()channelgoroutinechannelgoroutinechgenerate()goroutine
goroutineG-P-MGgoroutinegoroutine2-4kgoroutinegoroutine
func main() {for i := 0; i < math.MaxInt64; i++ {go func(i int) {time.Sleep(5 * time.Second)}(i)} }
如果不加以控制的话,直接崩溃
goroutine
1、Context
goroutineHTTP/RPCGoroutineRPCgoroutineContextgoroutine
Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) func WithValue(parent Context, key, val interface{}) Context
goroutineWithCancel
package mainimport ("context""fmt""time" )func main() {ctx, cancel := context.WithCancel(context.Background())go Speak(ctx)time.Sleep(10 * time.Second)cancel()time.Sleep(2 * time.Second)fmt.Println("拜拜!") }func Speak(ctx context.Context) {for range time.Tick(time.Second) {select {case <-ctx.Done():fmt.Println("拜拜!")returndefault:fmt.Println("开始执行")}} }
运行结果:
PS D:\cpz\go-demo\test13> go run test4.go 开始执行 开始执行 开始执行 开始执行 开始执行 开始执行 开始执行 开始执行 开始执行 拜拜! 拜拜!
withCancelBackgroundctxgoroutine1s10sgoroutinegoroutine
2、channel
channelgoroutineGogoroutine+channelcontextgoroutinecontextchannelchannel+selectchannel+closegoroutine
方式1:channel+select
package mainimport "fmt"func fibonacci(ch chan int, done chan struct{}) {x, y := 0, 1for {select {case ch <- x:x, y = y, x+ycase <-done:fmt.Println("over")return}} } func main() {ch := make(chan int)done := make(chan struct{})go func() {for i := 0; i < 10; i++ {fmt.Println(<-ch)}done <- struct{}{}}()fibonacci(ch, done) }
channelchannelchannelselectchannelselectdefaultselect
ChannelChannelcaseChanneldefault
defaultnil channeldefault casenil channelselect
方式2:channel+close
package mainimport ("fmt""time" )func main() {ch := make(chan int, 10)go func() {for i := 0; i < 10; i++ {ch <- i}close(ch)}()go func() {for val := range ch {fmt.Println(val)}fmt.Println("receive data over")}()time.Sleep(5 * time.Second)fmt.Println("program over") }
channelfor-rangechannelrange chchannelgoroutine
goroutine
goroutineGogoroutinegoroutinegoroutinegoroutinegoroutine
0、常用的三方库
- go-playground/pool
- nozzle/throttler
- Jeffail/tunny
- panjf2000/ants
1、协程池
gogoroutinegoroutinegoroutineantsgo-playground/pooljeffail/tunnyants
package mainimport ("fmt"ants "github.com/panjf2000/ants/v2""sync""sync/atomic""time" )var sum int32func myFunc(i interface{}) {n := i.(int32)atomic.AddInt32(&sum, n)fmt.Printf("run with %d\n", n) }func demoFunc() {time.Sleep(10 * time.Millisecond)fmt.Println("Hello World!") }func main() {defer ants.Release()runTimes := 1000// Use the common pool.var wg sync.WaitGroupsyncCalculateSum := func() {demoFunc()wg.Done()}for i := 0; i < runTimes; i++ {wg.Add(1)_ = ants.Submit(syncCalculateSum)}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks.\n")// Use the pool with a function,// set 10 to the capacity of goroutine pool and 1 second for expired duration.p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)wg.Done()})defer p.Release()// Submit tasks one by one.for i := 0; i < runTimes; i++ {wg.Add(1)_ = p.Invoke(int32(i))}wg.Wait()fmt.Printf("running goroutines: %d\n", p.Running())fmt.Printf("finish all tasks, result is %d\n", sum) }
ants.NewPoolWithFunc()goroutine10goroutinep.Invoke(data)antsgoroutinetaskFuncdata
Semaphore
Gogoroutine
package mainimport ("context""fmt""golang.org/x/sync/semaphore""runtime""sync""time" )const (// 同时运行的goroutine上限Limit = 5// 信号量的权重Weight = 1 )func main() {start := time.Now()names := []int{1, 2, 3, 4, 5,}for i := 1; i <= 100000; i++ {names = append(names, i)}sem := semaphore.NewWeighted(Limit)var w sync.WaitGroupfor _, name := range names {w.Add(1)go func(name int) {sem.Acquire(context.Background(), Weight)// ... 具体的业务逻辑fmt.Printf("Items is %v, This NumGoroutine is %v\n", name, runtime.NumGoroutine())sem.Release(Weight)w.Done()}(name)}w.Wait()times := time.Since(start)fmt.Println(times) }
NewWeighted()goroutine3Acquiregoroutinereleasewaiters
3、channel+sync
waitGroupchannelgoroutinechannelgoroutine
package mainimport ("fmt""runtime""sync""time" )var (// channel长度poolCount = 5// 复用的goroutine数量goroutineCount = 10 )func main() {start := time.Now()jobsChan := make(chan int, poolCount)// workersvar wg sync.WaitGroupfor i := 0; i < goroutineCount; i++ {wg.Add(1)go func() {defer wg.Done()for item := range jobsChan {fmt.Printf("Items is %v, This NumGoroutine is %v\n", item, runtime.NumGoroutine())}}()}// sendersfor i := 0; i < 100000; i++ {jobsChan <- i}// 关闭channel,上游的goroutine在读完channel的内容,就会通过wg的done退出close(jobsChan)wg.Wait()times := time.Since(start)fmt.Println(times) }
goroutine
四、总结
goroutinegoroutinegoroutinegoroutine