一、golang常用并发编程的几种模式

1、扇入扇出

1、所谓的扇入是指将多路通道聚合到一条通道中处理,Go语言最简单的扇入就是使用select聚合多条通道服务;
2、所谓的扇出是指将一条通道发散到多条通道中处理。在Go语言里面实现就是使用go关键字启动多个goroutine并发处理。
3、扇入就是合,扇出就是分。
4、当生产者的速度很慢时,需要使用扇入技术聚合多个生产者满足消费者,比如很耗时的加密/解密服务;
5、当消费者的速度很慢时,需要使用扇出技术,比如Web服务器并发请求处理。扇入和扇出是Go并发编程中常用的技术。

扇入扇出和pipeline(管道)的最大区在于,管道是串行的,但是扇入扇出是并行的。并行是指,一个管道可以接收其它多个数据源的输入,前提是管道对于多个数据源的输入顺序是不敏感的。

管道的核心思想在于,每个单独的输入都有一个单独goroutine处理,并写入同一个数据源。

在这里插入图片描述

扇入案例

package main

import (
	"fmt"
	"math/rand"
)

func GenerateIntA() chan int {
	ch := make(chan int, 10)
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}

func GenerateIntB() chan int {
	ch := make(chan int, 10)
	go func() {
		for {
			ch <- rand.Int()
		}
	}()
	return ch
}

func GenerateInt() chan int {
	ch := make(chan int, 20)
	go func() {
		// 使用select的扇入技术(Fan in)增加生成的随机源
		for {
			select {
				case ch <- <-GenerateIntA():
				case ch <- <-GenerateIntB():
			}
		}
	}()
	return ch
}

func main() {
	ch := GenerateInt()
	for i := 0; i < 100; i++ {
		fmt.Println(<-ch)
	}
}

2.优胜劣汰模式

场景:执行远程访问,远程服务响应不可靠的时候,同时开启go程,只取最快返回的,可以提高程序性能,但是占用资源会高一些

func job() int{
   rand.Seed(time.Now().Unix())
   ret := rand.Intn(5)
   time.Sleep(time.Second * time.Duration(ret)) // 模拟业务访问延迟
   return ret
}

func main() {
   c := make(chan int)
   for i := 0; i < 5; i++ {
      go func() {
         c <- job()
      }()
   }
   fmt.Printf("最快的用了%d s", <-c)
}

3.生产者模式

func Producer(out chan int) {
   defer close(out)
   for i :=0; i < 5; i++ {
      out <- i*2
      time.Sleep(time.Second *2)
   }
}

func Consumer(out chan int)(r chan struct{}){
   r = make(chan struct {})
   go func() {
      defer close(r)
      defer func() {
         r <- struct{}{}
      }()
      for item := range out{    
          fmt.Println(item) // 模拟业务逻辑
       }
   }()
   return r
}
func main() {
   c := make(chan int)
   go Producer(c)
   r := Consumer(c)
   <-r

4、goroutine 并发执行顺序的控制

var ch = make(chan struct{}, 1)

func job1() {
   fmt.Println("job1...")
   ch <- struct{}{}
}

func job2() {
   <- ch  // 阻塞等待 job1 执行完成
   fmt.Println("job2...")
}
func do(fns ...func()) *sync.WaitGroup {
   wg := &sync.WaitGroup{}
   for _, fn := range fns{
      wg.Add(1)
      go func(f func()) {
         defer wg.Done()
         f()
      }(fn)
   }
   return  wg
}
func main() {
   wg := do(job1,job2)
   wg.Wait()
}

二、退出通知机制(close channel to broadcast)

1、读取已经关闭的通道不会引起阻塞,也不会导致panic,而是立即返回该通道存储类型的零值。
2、关闭select 监听的某个通道能使select立即感知此种通知,并能够进行相应的处理。

案例

package main

import(
	"runtime"
	"fmt"
	"math/rand"
)
//Generate 是一个随机数生成器
func GenerateInt(done chan struct{}) chan int {
	ch := make(chan int)
	go func(){
	Label:
		for{

			select {
			case ch <- rand.Int():
			//增加一路监听,对退出通知信号done的监听
			case <- done:
				break Label
			}
		}
		//收到通知后,关闭通道ch
		close(ch)
	}()

	return ch
}

func main(){
	done := make(chan struct{})
	ch := GenerateInt(done)

	fmt.Println(<-ch)
	fmt.Println(<-ch)

	//发送通知,告知生产者停止生产
	close(done)

	fmt.Println(<-ch)
	fmt.Println(<-ch)

	//此时,生产者已经退出
	println("NumGoroutinue=",runtime.NumGoroutine())

}

三、channel配合协程的通信

协程之间的通信,需要用到协程计数器:sync.WaitGroup和通信工具:channel。
1、协程计数器:用来判断该协程中函数是否运行完毕,若完毕则-1,直至为0。
2、Channel:是用来负责每个协程的信号,实现协程之间的通信和控制。

func printL(wg *sync.WaitGroup, chanl chan string, chanz chan string) {
	//当前循环结束,说明该协程结束,协程计数器-1
	defer wg.Done()
	//协程结束,关闭该协程channel
	defer close(chanl)
	for i := 0; i < 5; i++ {
		fmt.Println(<-chanl)
		chanz <- "张三"
	}
}

func printZ(wg *sync.WaitGroup, chanz chan string, chanw chan string) {
	//当前循环结束,说明该协程结束,协程计数器-1
	defer wg.Done()
	//协程结束,关闭该协程channel
	defer close(chanz)
	for i := 0; i < 5; i++ {
		fmt.Println(<-chanz)
		chanw <- "王五"
	}
}

func printW(wg *sync.WaitGroup, chanw chan string, chanl chan string) {
	//当前循环结束,说明该协程结束,协程计数器-1
	defer wg.Done()
	//协程结束,关闭该协程channel
	defer close(chanw)
	for i := 0; i < 5; i++ {
		fmt.Println(<-chanw)
		//最后判断是否继续向chanl中塞名字
		switch {
		case i==4:
			fmt.Println("main finished!")
		default:
			chanl <- "李四"
		}
	}
}

func main() {
	//创建计数器
	wg := sync.WaitGroup{}
	//初始化创建协程的个数
	wg.Add(3)
	//申明三种协程所需要的channel
	var chanl = make(chan string, 1)
	var chanz = make(chan string, 1)
	var chanw = make(chan string, 1)
	//初始化第一个channel
	chanl <- "李四"
	//开启三个协程,
	//每个协程中只有当前channel里面的数据用完了
	//再赋值给下一个channel信号
	//以保证三个协程按顺序执行
	go printL(&wg, chanl, chanz)
	go printZ(&wg, chanz, chanw)
	go printW(&wg, chanw, chanl)
	//等待直到协程计数器归0,再结束主程序
	wg.Wait()
}

//输出:
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//main finished!

详细的扇入扇出思想案例参考:链接

关于并发编的几种模型参考:链接