在文章之前,我先讲个故事,我有个朋友以前是java程序员,后来转过来做go,当我问他为什么选择go的时候,他跟我说,因为写go就没人说他写的代码low了 ??? 这只是个段子不要当真。。。

好回归正题 golang 的网络轮循器是如何实现的那,先说明一下 golang 的网络轮循器是做什么的,你的go程序启动的时候会创建一个M去跑我们的系统监测任务代码如下(专栏下面的所有文章都是以go 1.8版本为准):

systemstack(func() {
     newm(sysmon, nil)
})

sysmon方法就是我们说的监控任务,它没有和任何的P(逻辑处理器)进行绑定,而是通过自身改变睡眠时间和时间间隔来一直循环下去(代码位于runtime/proc.go)。 golang中所有文件描述符都被设置成非阻塞的,某个goroutine进行网络io操作,读或者写文件描述符,如果此刻网络io还没准备好,则这个goroutine会被放到系统的等待队列中,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用,后台还有一个poller会不停地进行poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。

网络轮循器就在这个for循环之中,从epoll 的epollwait 接口获取准备就绪的 *g (结构指针) 最后注入到当前调度器下的可获取的G队列,代码如下:

atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false)
if gp != nil {
       injectglist(gp)
}

这里的netpoll就是今天的主角,以下是追踪器的部分代码:

for {
		if idle == 0 { //20us后开始睡眠
			delay = 20
		} else if idle > 50 { //睡眠1毫秒后翻倍
			delay *= 2
		}
		if delay > 10*1000 { //10ms
			delay = 10 * 1000
		}
                //以上是调整时间间隔
		usleep(delay)
                //睡眠若干毫秒后,判断是否进行调度追踪,
                //并且 是否进行垃圾回收或所有的P都处在空闲状态
		if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
			lock(&sched.lock)
			if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
				atomic.Store(&sched.sysmonwait, 1)
				unlock(&sched.lock)
				//唤醒任务
				maxsleep := forcegcperiod / 2
				if scavengelimit < forcegcperiod {
					maxsleep = scavengelimit / 2
				}
				notetsleep(&sched.sysmonnote, maxsleep)
				lock(&sched.lock)

				atomic.Store(&sched.sysmonwait, 0)
				noteclear(&sched.sysmonnote)
                                //重置时间间隔
				idle = 0
				delay = 20
			}
			unlock(&sched.lock)
		}
		//距离上次拉去是否超过10ms
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		now := nanotime()
		unixnow := unixnanotime()
                //判断获取最后一次从网络I/O轮循查找G的时间
		if lastpoll != 0 && lastpoll+10*1000*1000 < now {
                        //更新最后一次查询G时间,为了下一次做判断。
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
                        //这行代码是今天的主角,从网络I/O(我喜欢叫网络轮询器)
                        //查找已经就绪的G,注意不是阻塞的
			gp := netpoll(false)

			if gp != nil {
				incidlelocked(-1)
                                //找到后注入到调度器下面的可获取的G队列
				injectglist(gp)
				incidlelocked(1)
			}
		}
		// 再次夺取P、 阻塞的系统调用
		// 再次夺取长时间运行的G
		...
                ...
                ...
	}

上面简单的介绍了追踪器的大概流程,接下来有请我们的主角show time.

网络轮轮循器使用I/O多路复用的技术,可以非常高效的处理数以百万计的socket描述符,这里有linux下非阻塞io库 epoll - 知乎专栏的具体介绍。。。

我先列出golang三个封装的系统调用

1、创建epoll

func epollcreate(size int32) int32
func epollcreate1(flags int32) int32

TEXT runtime·epollcreate1(SB),NOSPLIT,$0
	MOVL    $329, AX
	MOVL	flags+0(FP), BX
	INVOKE_SYSCALL
	MOVL	AX, ret+4(FP)
	RET

epollcreate() 可以创建一个epoll实例。在linux 内核版本大于2.6.8 后,这个size 参数就被弃用了,但是传入的值必须大于0。这里引用了互联网上的一句话

在 epollcreate() 的最初实现版本时, size参数的作用是创建epoll实例时候告诉内核需要使用多少个文件描述符。内核会使用 size 的大小去申请对应的内存(如果在使用的时候超过了给定的size, 内核会申请更多的空间)。现在,这个size参数不再使用了(内核会动态的申请需要的内存)。但要注意的是,这个size必须要大于0,为了兼容旧版的linux 内核的代码。

epollcreate1() 如果flags的值是0,epollcreate1()等同于epollcreate()除了过时的size被遗弃了。当然flasg可以使用_EPOLL_CLOEXEC = 0x80000。

2、设置epoll事件

func epollctl(epfd, op, fd int32, ev *epollevent) int32

// sys_linux_386.s
TEXT runtime·epollctl(SB),NOSPLIT,$0
	MOVL	$255, AX
	MOVL	epfd+0(FP), BX
	MOVL	op+4(FP), CX
	MOVL	fd+8(FP), DX
	MOVL	ev+12(FP), SI
	INVOKE_SYSCALL
	MOVL	AX, ret+16(FP)
	RET

第一个参数epfd指向epoll的实例,op 添加事件的类型 fd是要注册的目标文件描述符,ev 是关联指定的描述符

op 的枚举值:

_EPOLL_CTL_ADD = 0x1 //在epfd中注册指定的fd文件描述符并能把event和fd关联起来。
_EPOLL_CTL_MOD = 0x3 //改变 fd和evetn之间的联系。
_EPOLL_CTL_DEL = 0x2 //从指定的epfd中删除fd文件描述符。在这种模式中event是被忽略的,并且为可以等于nil。

event 结构

type epollevent struct {
	events uint32
	data   [8]byte // to match amd64
}

3、等待epoll事件

func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32

TEXT runtime·epollwait(SB),NOSPLIT,$0
	MOVL	$256, AX
	MOVL	epfd+0(FP), BX
	MOVL	ev+4(FP), CX
	MOVL	nev+8(FP), DX
	MOVL	timeout+12(FP), SI
	INVOKE_SYSCALL
	MOVL	AX, ret+16(FP)
	RET

epollwait 这个系统调用是用来返回epfd中的就绪的G。events指向调用者可以使用的事件的内存区域。nev告知内核有多少个events,必须要大于0,timeout 指定超时时间。

golang 网络轮循器的代码实现

func netpoll(block bool) *g {
	if epfd == -1 {
		return nil
	}
	waitms := int32(-1)
	if !block {
		waitms = 0
	}
	var events [128]epollevent
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
		if n != -_EINTR {
			println("runtime: epollwait on fd", epfd, "failed with", -n)
			throw("epollwait failed")
		}
		goto retry
	}
	var gp guintptr
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}
		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

			netpollready(&gp, pd, mode)
		}
	}
	if block && gp == 0 {
		goto retry
	}
	return gp.ptr()
}