package main import ( "fmt" "math/rand" ) /* 使用空结构体计数的方式获取goroutine结束信号!而非sync.waitgroup控制 */ // 声明空结构体doneCh var doneCh chan struct{} // 随机数结构体 type randomNum struct { id int64 number int64 } // 求和结果结构体 type resultNum struct { randomNum *randomNum // 继承 sum int64 } // producer func producer(ch chan *randomNum) { var id int64 // 初始化序列号 for i := 0; i < 10000; i++ { // 指定数据量 id++ // 序列号自增 num := rand.Int63() // 获取随机数 // 封装randomNum tmp := &randomNum{ id: id, number: num, } // 写入ch1 ch <- tmp // 这里调用的是本地定义的ch1 } close(ch) // 完成指定数据量之后立即关闭通道,释放资源,-- 带缓冲区的channel关闭之后不影响读取,只是不能写入! } // consumer func consumer(ch1 chan *randomNum, ch2 chan *resultNum) { // 生产者不停的生产,消费者也需要不停的拿 for v := range ch1 { // 处理数据 sum := calc(v.number) // 封装result结构体 result := &resultNum{ randomNum: v, // 源数据 sum: sum, // result数据 } // 处理后的数据放入ch2 ch2 <- result } doneCh <- struct{}{} // goroutine完成任务就往doneCh中写入一个空结构体,代表结束信号 } // 打印结果 func print(ch chan *resultNum) { for v := range ch { fmt.Printf("id:%v randomNum:%v sum:%v\n", v.randomNum.id, v.randomNum.number, v.sum) } } // calc -- 整数的每一位相加 func calc(i int64) int64 { var sum int64 for i > 0 { // 每位数字求个 sum += i % 10 i /= 10 // 抹掉末尾的数字(舍去浮点数) } return sum } // 启动指定数量的消费者goroutine func nConsumerGoroutine(n int, ch1 chan *randomNum, ch2 chan *resultNum) { for i := 0; i < n; i++ { go func(chan *randomNum, chan *resultNum) { consumer(ch1, ch2) }(ch1, ch2) } } // 监测goroutine执行完毕的数量 func closeCh2(n int, doneCh chan struct{}, ch2 chan *resultNum) { for i := 0; i < n; i++ { <-doneCh // 没有取完10个空结构体,就代表没执行完! } close(doneCh) // 执行完毕之后关闭通道(doneCh是全局变量,所以不需要传参数!) close(ch2) } func main() { // 初始化ch1,ch2 ch1 := make(chan *randomNum, 10000) ch2 := make(chan *resultNum, 10000) doneCh = make(chan struct{}, 10) // producer go func(chan *randomNum) { producer(ch1) }(ch1) // consumer nConsumerGoroutine(10, ch1, ch2) closeCh2(10, doneCh, ch2) // 打印结果 print(ch2) }