一.优胜劣汰模式
场景:执行远程访问,远程服务响应不可靠的时候,同时开启go程,只取最快返回的,可以提高程序性能,但是占用资源会高一些
func job() int{
rand.Seed(time.Now().Unix())
ret := rand.Intn(5)
time.Sleep(time.Second * time.Duration(ret)) // 模拟业务访问延迟
return ret
}
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
go func() {
c <- job()
}()
}
fmt.Printf("最快的用了%d s", <-c)
}
二.扇入和扇出
扇入:多个channel 读取相同数据合并输出到一个总的channel里;
扇出:主要用来收集扇入汇总的数据做进一步处理。
// In 扇入
func in(chs ... <- chan interface{}) <- chan interface{} {
retChan := make(chan interface{})
wg := sync.WaitGroup{}
for _, ch := range chs{
wg.Add(1)
go func(ch <- chan interface{}) {
defer wg.Done()
// 遍历 ch 会出现阻塞
for v := range ch{
retChan <- v
}
}(ch)
}
// 优雅关闭grouting
go func() {
defer close(retChan)
wg.Wait()
}()
return retChan
}
// getUser 模拟一个业务函数
func getUser() <- chan interface{}{
userChan := make(chan interface{})
go func() {
defer close(userChan)
time.Sleep(time.Second * 5) // 模拟业务访问耗时
userChan <- "User:heihei" // 整个扇入和扇出环节, 对channel的写入只在该环节
}()
return userChan
}
// getGoods 模拟一个业务函数
func getGoods() <- chan interface{}{
goodsChan := make(chan interface{})
go func() {
defer close(goodsChan)
time.Sleep(time.Second * 2) // 模拟业务访问耗时
goodsChan <- "goods:haha" // 整个扇入和扇出环节, 对channel的写入只在该环节
}()
return goodsChan
}
func main() {
ret := in(getUser(), getGoods())
for v := range ret{
fmt.Println(v)
}
三.生产者模式
func Producer(out chan int) {
defer close(out)
for i :=0; i < 5; i++ {
out <- i*2
time.Sleep(time.Second *2)
}
}
func Consumer(out chan int)(r chan struct{}){
r = make(chan struct {})
go func() {
defer close(r)
defer func() {
r <- struct{}{}
}()
for item := range out{
fmt.Println(item) // 模拟业务逻辑
}
}()
return r
}
func main() {
c := make(chan int)
go Producer(c)
r := Consumer(c)
<-r
四.goroutine 并发执行顺序的控制
方式一.通过channel控制
var ch = make(chan struct{}, 1)
func job1() {
fmt.Println("job1...")
ch <- struct{}{}
}
func job2() {
<- ch // 阻塞等待 job1 执行完成
fmt.Println("job2...")
}
func do(fns ...func()) *sync.WaitGroup {
wg := &sync.WaitGroup{}
for _, fn := range fns{
wg.Add(1)
go func(f func()) {
defer wg.Done()
f()
}(fn)
}
return wg
}
func main() {
wg := do(job1,job2)
wg.Wait()
}
五、控制协程数量
func myJob() {
time.Sleep(time.Second * 3)
fmt.Println(rand.Intn(100))
}
func setPool(ch chan struct{}) {
for i := 0; i < 5; i++ {
ch <- struct{}{}
}
}
func main() {
// 空struct{} 不占空间
pool := make(chan struct{}, 5)
setPool(pool) // 1.先 set 5个任务到channel中
wg := sync.WaitGroup{}
wg.Add(5) // 2.等待5个任务执行完
go func() {
for {
wg.Wait() // 5.等待上一组5个任务跑完
fmt.Println("发放了5个任务...")
setPool(pool) // 6.再分发5个任务
wg.Add(5)
}
}()
for {
<- pool // 3.通过接收1中set到的任务, 先跑
go func() {
defer wg.Done() // 4.每跑完1个任务就通过Done来减1
myJob()
}()
}