func (set *threadSafeSet) Iter() <-chan interface{} {
ch := make(chan interface{})
go func() {
set.RLock()
for elem := range set.s {
ch <- elem
}
close(ch)
set.RUnlock()
}()
return ch
}
考点:chan缓存池
解答:
看到这道题,我也在猜想出题者的意图在哪里。 chan?sync.RWMutex?go?chan缓存池?迭代? 所以只能再读一次题目,就从迭代入手看看。 既然是迭代就会要求set.s全部可以遍历一次。但是chan是为缓存的,那就代表这写入一次就会阻塞。 我们把代码恢复为可以运行的方式,看看效果
package main
import (
"fmt"
"sync"
)
//下面的迭代会有什么问题?
type threadSafeSet struct {
sync.RWMutex
s []interface{}
}
func (set *threadSafeSet) Iter() <-chan interface{} {
ch := make(chan interface{}) // 解除注释看看!
//ch := make(chan interface{},len(set.s))
go func() {
set.RLock()
for elem,value := range set.s {
ch <- elem
println("Iter:",elem,value)
}
close(ch)
set.RUnlock()
}()
return ch
}
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
fmt.Println(<- vs)
}
设置缓冲区输出:
或:
不设置缓冲区输出:
如果不设置缓冲区,写入chan的代码 ch <- elem 将只执行一次就会阻塞。
-------------------------将代码改为循环读取,不设置缓冲区:-------------------------
package main
import (
"fmt"
"sync"
)
//下面的迭代会有什么问题?
type threadSafeSet struct {
sync.RWMutex
s []interface{}
}
func (set *threadSafeSet) Iter() <-chan interface{} {
ch := make(chan interface{}) // 解除注释看看!
//ch := make(chan interface{},len(set.s))
go func() {
set.RLock()
for elem,value := range set.s {
ch <- elem
println("Iter:",elem,value)
}
close(ch)
set.RUnlock()
}()
return ch
}
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
for true {
v,ok := <- vs
if ok {
fmt.Println(v)
}else {
return
}
}
//fmt.Println(<- vs)
}
结果:
改为不设置缓冲区循环读取后写入chan的代码 ch <- elem 也执行了两次,但是第二次写入操作一定是在第一次读取之后。
如果设置缓冲区,则写入顺序和读取顺序不固定。
-------------------------接下来将close(ch)这行代码注释掉。-------------------------
程序运行结果:
无论是否设置缓冲区都会报fatal error: all goroutines are asleep - deadlock!的错。
-------------------------再将循环读取改为读取一次。-------------------------
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
time.Sleep(2*time.Second)
//for true {
// v,ok := <- vs
// if ok {
// fmt.Println(v)
// }else {
// return
// }
//}
fmt.Println(<- vs)
}
结果:程序不再报错,如图:
将循环读取操作另开一个线程:
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
go func() {
for true {
v,ok := <- vs
if ok {
fmt.Println(v)
}else {
return
}
}
}()
fmt.Println(<- vs)
}
执行结果:
确实不再报错,但事实并不是这样:
接下来修改代码:
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
go func() {
for true {
v,ok := <- vs
if ok {
fmt.Println(v)
}else {
return
}
}
}()
go func() {
//一直阻塞
select {}
}()
fmt.Println(<- vs)
}
执行结果:
我们在代码中增加了新的线程,该线程一直阻塞,但是并没有报死锁。
所以看起来这种方法可以解决报错,是因为新建的go线程中的错误并没有打印。
@欢迎指正。
真正的原因:
分析问题应该是出在用FOR … RANGE 这种方法读取CHAN这一块,因为实际上START到底有多少系统并不知道,所以一直在读取CHAN,而CHAN一直没有数据,就阻塞了(这里并没有显式的close掉chan)。而用上面注释代码指定次数的方法就避免了这种阻塞,所以这里会出错,不知道这样理解对不对?
对于这种次数不确定的goroutine,解决办法是for … select 这种方法?
准确地说,是提示”deadlock”。
当worker都执行完,所在的goroutine都结束以后。只剩下main所在的goroutine在读取start数据,但是这时候没有其他的goroutine给start写入数据,所以main就会一直在等待一个不可能出现的数据,结果就死锁了。
有两种方法解决这个问题:
1.知道channel要收到多少数据,当读取到第10个数据以后就不再读取。
2.当不需要再写入数据时关闭channel,读取channel的数据获知关闭状态则退出循环。
验证:
func main() {
th:=threadSafeSet{
s:[]interface{}{"1","2"},
}
vs := th.Iter()
i := 1
for true {
fmt.Printf("第%v次读取\n",i)
v,ok := <- vs
if ok {
fmt.Println(v)
}else {
return
}
i ++
}
fmt.Println(<- vs)
}
程序第三次读取时会报死锁,也解释了上文单次读取不报错的原因。
go testDeadLock(c) c<- 'A' 只要换下位置就不会报错了,无缓存通道要先接受,后发送。
据了解,chan的缓冲区要么设置0要么设置1。
超过1都不建议使用,不如用专业的消息队列中间件。
缓冲为1,实现异步协程,为0实现同步。