在文章之前,我先讲个故事,我有个朋友以前是java程序员,后来转过来做go,当我问他为什么选择go的时候,他跟我说,因为写go就没人说他写的代码low了 ??? 这只是个段子不要当真。。。
好回归正题 golang 的网络轮循器是如何实现的那,先说明一下 golang 的网络轮循器是做什么的,你的go程序启动的时候会创建一个M去跑我们的系统监测任务代码如下(专栏下面的所有文章都是以go 1.8版本为准):
systemstack(func() {
newm(sysmon, nil)
})
sysmon方法就是我们说的监控任务,它没有和任何的P(逻辑处理器)进行绑定,而是通过自身改变睡眠时间和时间间隔来一直循环下去(代码位于runtime/proc.go)。 golang中所有文件描述符都被设置成非阻塞的,某个goroutine进行网络io操作,读或者写文件描述符,如果此刻网络io还没准备好,则这个goroutine会被放到系统的等待队列中,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用,后台还有一个poller会不停地进行poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。
网络轮循器就在这个for循环之中,从epoll 的epollwait 接口获取准备就绪的 *g (结构指针) 最后注入到当前调度器下的可获取的G队列,代码如下:
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false)
if gp != nil {
injectglist(gp)
}
这里的netpoll就是今天的主角,以下是追踪器的部分代码:
for {
if idle == 0 { //20us后开始睡眠
delay = 20
} else if idle > 50 { //睡眠1毫秒后翻倍
delay *= 2
}
if delay > 10*1000 { //10ms
delay = 10 * 1000
}
//以上是调整时间间隔
usleep(delay)
//睡眠若干毫秒后,判断是否进行调度追踪,
//并且 是否进行垃圾回收或所有的P都处在空闲状态
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
//唤醒任务
maxsleep := forcegcperiod / 2
if scavengelimit < forcegcperiod {
maxsleep = scavengelimit / 2
}
notetsleep(&sched.sysmonnote, maxsleep)
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
//重置时间间隔
idle = 0
delay = 20
}
unlock(&sched.lock)
}
//距离上次拉去是否超过10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
unixnow := unixnanotime()
//判断获取最后一次从网络I/O轮循查找G的时间
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
//更新最后一次查询G时间,为了下一次做判断。
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
//这行代码是今天的主角,从网络I/O(我喜欢叫网络轮询器)
//查找已经就绪的G,注意不是阻塞的
gp := netpoll(false)
if gp != nil {
incidlelocked(-1)
//找到后注入到调度器下面的可获取的G队列
injectglist(gp)
incidlelocked(1)
}
}
// 再次夺取P、 阻塞的系统调用
// 再次夺取长时间运行的G
...
...
...
}
上面简单的介绍了追踪器的大概流程,接下来有请我们的主角show time.
网络轮轮循器使用I/O多路复用的技术,可以非常高效的处理数以百万计的socket描述符,这里有linux下非阻塞io库 epoll - 知乎专栏的具体介绍。。。
我先列出golang三个封装的系统调用
1、创建epoll
func epollcreate(size int32) int32
func epollcreate1(flags int32) int32
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
MOVL $329, AX
MOVL flags+0(FP), BX
INVOKE_SYSCALL
MOVL AX, ret+4(FP)
RET
epollcreate() 可以创建一个epoll实例。在linux 内核版本大于2.6.8 后,这个size 参数就被弃用了,但是传入的值必须大于0。这里引用了互联网上的一句话
在 epollcreate() 的最初实现版本时, size参数的作用是创建epoll实例时候告诉内核需要使用多少个文件描述符。内核会使用 size 的大小去申请对应的内存(如果在使用的时候超过了给定的size, 内核会申请更多的空间)。现在,这个size参数不再使用了(内核会动态的申请需要的内存)。但要注意的是,这个size必须要大于0,为了兼容旧版的linux 内核的代码。
epollcreate1() 如果flags的值是0,epollcreate1()等同于epollcreate()除了过时的size被遗弃了。当然flasg可以使用_EPOLL_CLOEXEC = 0x80000。
2、设置epoll事件
func epollctl(epfd, op, fd int32, ev *epollevent) int32
// sys_linux_386.s
TEXT runtime·epollctl(SB),NOSPLIT,$0
MOVL $255, AX
MOVL epfd+0(FP), BX
MOVL op+4(FP), CX
MOVL fd+8(FP), DX
MOVL ev+12(FP), SI
INVOKE_SYSCALL
MOVL AX, ret+16(FP)
RET
第一个参数epfd指向epoll的实例,op 添加事件的类型 fd是要注册的目标文件描述符,ev 是关联指定的描述符
op 的枚举值:
_EPOLL_CTL_ADD = 0x1 //在epfd中注册指定的fd文件描述符并能把event和fd关联起来。
_EPOLL_CTL_MOD = 0x3 //改变 fd和evetn之间的联系。
_EPOLL_CTL_DEL = 0x2 //从指定的epfd中删除fd文件描述符。在这种模式中event是被忽略的,并且为可以等于nil。
event 结构
type epollevent struct {
events uint32
data [8]byte // to match amd64
}
3、等待epoll事件
func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32
TEXT runtime·epollwait(SB),NOSPLIT,$0
MOVL $256, AX
MOVL epfd+0(FP), BX
MOVL ev+4(FP), CX
MOVL nev+8(FP), DX
MOVL timeout+12(FP), SI
INVOKE_SYSCALL
MOVL AX, ret+16(FP)
RET
epollwait 这个系统调用是用来返回epfd中的就绪的G。events指向调用者可以使用的事件的内存区域。nev告知内核有多少个events,必须要大于0,timeout 指定超时时间。
golang 网络轮循器的代码实现
func netpoll(block bool) *g {
if epfd == -1 {
return nil
}
waitms := int32(-1)
if !block {
waitms = 0
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("epollwait failed")
}
goto retry
}
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}