golang使用channel实现生产者消费者模型
package main
import (
"fmt"
"math/rand"
)
/*
使用空结构体计数的方式获取goroutine结束信号!而非sync.waitgroup控制
*/
// 声明空结构体doneCh
var doneCh chan struct{}
// 随机数结构体
type randomNum struct {
id int64
number int64
}
// 求和结果结构体
type resultNum struct {
randomNum *randomNum // 继承
sum int64
}
// producer
func producer(ch chan *randomNum) {
var id int64 // 初始化序列号
for i := 0; i < 10000; i++ { // 指定数据量
id++ // 序列号自增
num := rand.Int63() // 获取随机数
// 封装randomNum
tmp := &randomNum{
id: id,
number: num,
}
// 写入ch1
ch <- tmp // 这里调用的是本地定义的ch1
}
close(ch) // 完成指定数据量之后立即关闭通道,释放资源,-- 带缓冲区的channel关闭之后不影响读取,只是不能写入!
}
// consumer
func consumer(ch1 chan *randomNum, ch2 chan *resultNum) {
// 生产者不停的生产,消费者也需要不停的拿
for v := range ch1 {
// 处理数据
sum := calc(v.number)
// 封装result结构体
result := &resultNum{
randomNum: v, // 源数据
sum: sum, // result数据
}
// 处理后的数据放入ch2
ch2 <- result
}
doneCh <- struct{}{} // goroutine完成任务就往doneCh中写入一个空结构体,代表结束信号
}
// 打印结果
func print(ch chan *resultNum) {
for v := range ch {
fmt.Printf("id:%v randomNum:%v sum:%v\n", v.randomNum.id, v.randomNum.number, v.sum)
}
}
// calc -- 整数的每一位相加
func calc(i int64) int64 {
var sum int64
for i > 0 {
// 每位数字求个
sum += i % 10
i /= 10 // 抹掉末尾的数字(舍去浮点数)
}
return sum
}
// 启动指定数量的消费者goroutine
func nConsumerGoroutine(n int, ch1 chan *randomNum, ch2 chan *resultNum) {
for i := 0; i < n; i++ {
go func(chan *randomNum, chan *resultNum) {
consumer(ch1, ch2)
}(ch1, ch2)
}
}
// 监测goroutine执行完毕的数量
func closeCh2(n int, doneCh chan struct{}, ch2 chan *resultNum) {
for i := 0; i < n; i++ {
<-doneCh // 没有取完10个空结构体,就代表没执行完!
}
close(doneCh) // 执行完毕之后关闭通道(doneCh是全局变量,所以不需要传参数!)
close(ch2)
}
func main() {
// 初始化ch1,ch2
ch1 := make(chan *randomNum, 10000)
ch2 := make(chan *resultNum, 10000)
doneCh = make(chan struct{}, 10)
// producer
go func(chan *randomNum) {
producer(ch1)
}(ch1)
// consumer
nConsumerGoroutine(10, ch1, ch2)
closeCh2(10, doneCh, ch2)
// 打印结果
print(ch2)
}