Golang Channel 实战技巧和说明

Channel 的一些实战说明

关于 close Channel

在这里插入图片描述

close Channel 的一些说明

channel 不需要通过 close 来释放资源,这个是它与 socket、file 等不一样的地方,对于 channel 而言,唯一需要 close 的就是我们想通过 close 触发 channel 读事件。

send on closed channel

v, ok := <-ch 判断是否 close

v, ok := <-ch
_,ok := <-chfunc chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
  • block为false,即执行select时,如果channel为空,返回(false,false),代表select操作失败,没接收到值。

  • 否则,如果channel已经关闭,并且没有数据,ep即接收数据的变量设置为零值,返回(true,false),代表select操作成功,但channel已关闭,没读到有效值。

  • 否则,其他读到有效数据的情况,返回(true,ture)。

优雅判断是否 close 的封装

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
	select {
	case <-ch:
		return true
	default:
	}

	return false
}

func main() {
	c := make(chan T)
	fmt.Println(IsClosed(c)) // false
	close(c)
	fmt.Println(IsClosed(c)) // true
}

请允许我打个小广告:这篇文章首发在我微信公众号【后端系统和架构】中,点击这里可以去往公众号查看原文链接,如果对你有帮助,欢迎前往关注,更加方便快捷的接收最新优质文章

for-range 读取 Channel 数据

不管是有缓冲还是无缓冲,都可以使用 for-range 从 channel 中读取数据,并且这个是一直循环读取的。

for-range 中的 range 产生的迭代值为 Channel 中发送的值,如果已经这个 channel 已经 close 了,那么首先还会继续执行,直到所有值被读取完,然后才会跳出 for 循环,因此,通过 for-range 读取 chann 数据会比较方便,因为我们只需要读取数据就行了,不需管他的退出,在 close 之后如果数据读取完了会自动帮我们退出。如果既没有 close 也没有数据可读,那么就会阻塞到 range 这里,除非有数据产生或者 chan 被关闭了。但是如果 channel 是 nil,读取会被阻塞,也就是会一直阻塞在 range 位置。

一个示例如下:

   ch := make(chan int)

   // 一直循环读取 range 中的迭代值
   for v := range ch {
        // 得到了 v 这个 chann 中的值
        fmt.Println("读取数据:",v)
   }

select 读写 Channel 数据

  • select 的 case 分支里面,可以读数据,也可以写数据。最多只允许有一个 default case,它可以放在 case 列表的任何位置,并且没有任何影响。

  • select 可以同时处理多个 channel,如果有同时多个 case 分支可以去处理,比如同时有多个 channel 可以接收数据,那么 Go 会伪随机(pseudo-random)的选择一个 case 处理。如果没有 case 需要处理,则会选择 default 分支去处理。如果没有 default case,则 select 语句会阻塞,直到某个 case 分支可以处理了。

  • 每次 select 语句的执行,是会扫描完所有的 case 后才确定如何执行,而不是说遇到合适的 case 就直接执行了。

  • 对于 nil channel 上的操作会一直被阻塞,如果没有 default case,只有 nil channel 的 select 会一直被阻塞。

  • select 语句和 switch 语句一样,它不是循环,它只会选择一个 case 来处理,如果想一直处理channel,你可以在外面加一个无限的 for 循环

for {
    select {
    case c <- x:
        x, y = y, x+y
    case <-quit:
        fmt.Println("quit")
        return
    }
}

Channel 的读写超时机制【select + timeout】

我们的一般常见场景就是,

当我们从 chann 中进行读取数据,或者写入数据的时候,想要快速返回得到是否成功的结果,如果被 chann 阻塞后,需要指定一定的超时时间,然后如果在超时时间内还没有返回,那么就超时退出,不能一直阻塞在读写 chann 的流程中。

Go 的 time 库里面,提供了 time.NewTimer()、time.After()、time.NewTicker() 等方法,最终都可以通过这些方法来返回或者得到一个 channel,然后向这个 channel 中发送数据,就可以实现定时器的功能。

channel 可以通过 select + timeout 来实现阻塞超时的使用姿势,超时读写的姿势如下:

// 通过 select 实现读超时,如果读 chann 阻塞 timeout 的时间后就会返回
func ReadWithSelect(ch chan int) (x int, err error) {
	timeout := time.NewTimer(time.Microsecond * 500)

	select {
	case x = <-ch:
		return x, nil
	case <-timeout.C:
		return 0, errors.New("read time out")
	}
}

// 通过 select 实现写超时,如果写 chann 阻塞 timeout 的时间后就会返回
func WriteChWithSelect(ch chan int) error {
	timeout := time.NewTimer(time.Microsecond * 500)

	select {
	case ch <- 1:
		return nil
	case <-timeout.C:
		return errors.New("write time out")
	}
}

一个简单的实操代码示例

package main

import (
	"fmt"
	"runtime"
	"time"
)

func DoWorker() {
	c := make(chan bool, 1)

	go func() {

		time.Sleep(100 * time.Millisecond) // 等待 100ms 后写入,和后面的读超时配合,看超时判断结果

		c <- true
	}()

	go func() {
		timeout := time.NewTimer(time.Millisecond * 105) // 设置 105 ms 超时,如果超时没有读取到则 timeout
		select {
		case x := <-c:
			fmt.Printf("read chann:%v\n", x)

		case <-timeout.C:
			fmt.Println("read timeout")

		}
		fmt.Printf("over select\n\n")

	}()

}

func main() {
	fmt.Printf("start main num:%v\n", runtime.NumGoroutine())

	go func() {

		for {
			time.Sleep(1 * time.Second)

			fmt.Printf("start go DoWorker\n")

			go DoWorker()
		}

	}()

	for {
		time.Sleep(4 * time.Second)

		fmt.Printf("now main num:%v\n", runtime.NumGoroutine())
	}
}


输出:
start main num:1
start go DoWorker
read chann:true
over select

start go DoWorker
read chann:true
over select

start go DoWorker
read chann:true
over select

TryEnqueue 无阻塞写 Channel 数据

有些场景,我们期望往缓冲队列中写入数据的时候,如果队列已满,那么不要进行写阻塞,而是写完发现队列已满就抛错,那么我们可以通过如下机制的封装来实现,原理是通过一个 select 和 一个 default 语句去实现,有一个 default 就不会阻塞了:

var jobChan = make(chan int, 3)

func TryEnqueue(job int) bool {
	select {
	case jobChan <- job:
		fmt.Printf("true\n")  // 队列未满
		return true
	default:
		fmt.Printf("false\n") // 队列已满
		return false
	}
}

Channel 常见错误和根因分析

fatal error: all goroutines are asleep - deadlock 问题解决和优化

参考 go-language-fatal-error-all-goroutines-are-asleep-deadlock,在 main 函数里面,如果要 通过 chann 等待其他子协程的往 chann 中写入数据,但是并没有其他子协程写入或者其他协程没有写入就提前退出或者结束了,此时,main goroutine 协程就会等一个永远不会来的数据,那整个程序就永远等下去了,这个时候就会报上述错误。

fatal error: all goroutines are asleep - deadlock! 异常的示例,在 main 里面,往 chann 中写超过缓冲数量的数据,这个时候,main 是要期望能够从有其他协程读取这些数据的,但是 main 里面并没有,因此就会报错:

package main
import "fmt"
func main() { 
    channel := make(chan string, 2)
    fmt.Println("1") 
    channel <- "h1" 
    fmt.Println("2") 
    channel <- "w2" 
    fmt.Println("3") 
    channel <- "c3"    // 执行到这一步,直接报 error 
    fmt.Println("...") 
    msg1 := <-channel 
    fmt.Println(msg1) 
}

优化处理:

package main
import "fmt"
func main() { 
    channel := make(chan string, 2)
    fmt.Println("1") 
    channel <- "h1" 
    fmt.Println("2") 
    channel <- "w2"
    fmt.Println("3") 
    select {
    case channel <- "c3": 
        fmt.Println("ok") 
    default: 
        fmt.Println("channel is full !") 
    }
    fmt.Println("...") 
    msg1 := <-channel 
    fmt.Println(msg1) 
}

最后

请允许我打个小广告:这篇文章首发在我微信公众号【后端系统和架构】中,点击这里可以去往公众号查看原文链接,如果对你有帮助,欢迎前往关注,更加方便快捷的接收最新优质文章