一、golang常用并发编程的几种模式
1、扇入扇出
1、所谓的扇入是指将多路通道聚合到一条通道中处理,Go语言最简单的扇入就是使用select聚合多条通道服务;
2、所谓的扇出是指将一条通道发散到多条通道中处理。在Go语言里面实现就是使用go关键字启动多个goroutine并发处理。
3、扇入就是合,扇出就是分。
4、当生产者的速度很慢时,需要使用扇入技术聚合多个生产者满足消费者,比如很耗时的加密/解密服务;
5、当消费者的速度很慢时,需要使用扇出技术,比如Web服务器并发请求处理。扇入和扇出是Go并发编程中常用的技术。
扇入扇出和pipeline(管道)的最大区在于,管道是串行的,但是扇入扇出是并行的。并行是指,一个管道可以接收其它多个数据源的输入,前提是管道对于多个数据源的输入顺序是不敏感的。
管道的核心思想在于,每个单独的输入都有一个单独goroutine处理,并写入同一个数据源。
扇入案例
package main
import (
"fmt"
"math/rand"
)
func GenerateIntA() chan int {
ch := make(chan int, 10)
go func() {
for {
ch <- rand.Int()
}
}()
return ch
}
func GenerateIntB() chan int {
ch := make(chan int, 10)
go func() {
for {
ch <- rand.Int()
}
}()
return ch
}
func GenerateInt() chan int {
ch := make(chan int, 20)
go func() {
// 使用select的扇入技术(Fan in)增加生成的随机源
for {
select {
case ch <- <-GenerateIntA():
case ch <- <-GenerateIntB():
}
}
}()
return ch
}
func main() {
ch := GenerateInt()
for i := 0; i < 100; i++ {
fmt.Println(<-ch)
}
}
2.优胜劣汰模式
场景:执行远程访问,远程服务响应不可靠的时候,同时开启go程,只取最快返回的,可以提高程序性能,但是占用资源会高一些
func job() int{
rand.Seed(time.Now().Unix())
ret := rand.Intn(5)
time.Sleep(time.Second * time.Duration(ret)) // 模拟业务访问延迟
return ret
}
func main() {
c := make(chan int)
for i := 0; i < 5; i++ {
go func() {
c <- job()
}()
}
fmt.Printf("最快的用了%d s", <-c)
}
3.生产者模式
func Producer(out chan int) {
defer close(out)
for i :=0; i < 5; i++ {
out <- i*2
time.Sleep(time.Second *2)
}
}
func Consumer(out chan int)(r chan struct{}){
r = make(chan struct {})
go func() {
defer close(r)
defer func() {
r <- struct{}{}
}()
for item := range out{
fmt.Println(item) // 模拟业务逻辑
}
}()
return r
}
func main() {
c := make(chan int)
go Producer(c)
r := Consumer(c)
<-r
4、goroutine 并发执行顺序的控制
var ch = make(chan struct{}, 1)
func job1() {
fmt.Println("job1...")
ch <- struct{}{}
}
func job2() {
<- ch // 阻塞等待 job1 执行完成
fmt.Println("job2...")
}
func do(fns ...func()) *sync.WaitGroup {
wg := &sync.WaitGroup{}
for _, fn := range fns{
wg.Add(1)
go func(f func()) {
defer wg.Done()
f()
}(fn)
}
return wg
}
func main() {
wg := do(job1,job2)
wg.Wait()
}
二、退出通知机制(close channel to broadcast)
1、读取已经关闭的通道不会引起阻塞,也不会导致panic,而是立即返回该通道存储类型的零值。
2、关闭select 监听的某个通道能使select立即感知此种通知,并能够进行相应的处理。
案例
package main
import(
"runtime"
"fmt"
"math/rand"
)
//Generate 是一个随机数生成器
func GenerateInt(done chan struct{}) chan int {
ch := make(chan int)
go func(){
Label:
for{
select {
case ch <- rand.Int():
//增加一路监听,对退出通知信号done的监听
case <- done:
break Label
}
}
//收到通知后,关闭通道ch
close(ch)
}()
return ch
}
func main(){
done := make(chan struct{})
ch := GenerateInt(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
//发送通知,告知生产者停止生产
close(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
//此时,生产者已经退出
println("NumGoroutinue=",runtime.NumGoroutine())
}
三、channel配合协程的通信
协程之间的通信,需要用到协程计数器:sync.WaitGroup和通信工具:channel。
1、协程计数器:用来判断该协程中函数是否运行完毕,若完毕则-1,直至为0。
2、Channel:是用来负责每个协程的信号,实现协程之间的通信和控制。
func printL(wg *sync.WaitGroup, chanl chan string, chanz chan string) {
//当前循环结束,说明该协程结束,协程计数器-1
defer wg.Done()
//协程结束,关闭该协程channel
defer close(chanl)
for i := 0; i < 5; i++ {
fmt.Println(<-chanl)
chanz <- "张三"
}
}
func printZ(wg *sync.WaitGroup, chanz chan string, chanw chan string) {
//当前循环结束,说明该协程结束,协程计数器-1
defer wg.Done()
//协程结束,关闭该协程channel
defer close(chanz)
for i := 0; i < 5; i++ {
fmt.Println(<-chanz)
chanw <- "王五"
}
}
func printW(wg *sync.WaitGroup, chanw chan string, chanl chan string) {
//当前循环结束,说明该协程结束,协程计数器-1
defer wg.Done()
//协程结束,关闭该协程channel
defer close(chanw)
for i := 0; i < 5; i++ {
fmt.Println(<-chanw)
//最后判断是否继续向chanl中塞名字
switch {
case i==4:
fmt.Println("main finished!")
default:
chanl <- "李四"
}
}
}
func main() {
//创建计数器
wg := sync.WaitGroup{}
//初始化创建协程的个数
wg.Add(3)
//申明三种协程所需要的channel
var chanl = make(chan string, 1)
var chanz = make(chan string, 1)
var chanw = make(chan string, 1)
//初始化第一个channel
chanl <- "李四"
//开启三个协程,
//每个协程中只有当前channel里面的数据用完了
//再赋值给下一个channel信号
//以保证三个协程按顺序执行
go printL(&wg, chanl, chanz)
go printZ(&wg, chanz, chanw)
go printW(&wg, chanw, chanl)
//等待直到协程计数器归0,再结束主程序
wg.Wait()
}
//输出:
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//李四
//张三
//王五
//main finished!
详细的扇入扇出思想案例参考:链接
关于并发编的几种模型参考:链接