go的tcp看起来是同步请求,实际上被转成了异步操作,为什么这牛逼呢?

首先tcp连接的接口是conn

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
   if !c.ok() {
      return 0, syscall.EINVAL
   }
   n, err := c.fd.Read(b)
   if err != nil && err != io.EOF {
      err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
   }
   return n, err
}

fd是初始化的时候传进来的,这东西怎么来的呢?

net.DialTimeout->Dial->DialContext->dialSerial->dialSingle->dialTCP->doDialTCP->internetSocket->socket->newFD

好长啊,看的累死我了,最终存储的fd(linux下)是net/fd_unix.go的netFD

我们看一下这里面Read的实现:

func (fd *netFD) Read(p []byte) (n int, err error) {
   n, err = fd.pfd.Read(p)
   runtime.KeepAlive(fd)
   return n, wrapSyscallError("read", err) 
}

里面的pfd是poll/fd_unix.go定义的,进来

func (fd *FD) Read(p []byte) (int, error) {
   if err := fd.readLock(); err != nil {
      return 0, err
   }
   defer fd.readUnlock()
   if len(p) == 0 {
      // If the caller wanted a zero byte read, return immediately
      // without trying (but after acquiring the readLock).
      // Otherwise syscall.Read returns 0, nil which looks like
      // io.EOF.
      // TODO(bradfitz): make it wait for readability? (Issue 15735)
      return 0, nil
   }
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return 0, err
   }
   if fd.IsStream && len(p) > maxRW {
      p = p[:maxRW]
   }
   for {
      n, err := syscall.Read(fd.Sysfd, p)
      if err != nil {
         n = 0
         if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
               continue
            }
         }
      }
      err = fd.eofError(n, err)
      return n, err
   }
}

首先使用epoll的read,开始读取,这个是非阻塞的,直接跳到下面,同步转异步关键是waitRead方法,又调用了fd_poll_runtime.go->wait,进去:

func (pd *pollDesc) wait(mode int, isFile bool) error {
   if pd.runtimeCtx == 0 {
      return errors.New("waiting for unsupported file type")
   }
   res := runtime_pollWait(pd.runtimeCtx, mode)
   return convertErr(res, isFile)
}

func runtime_pollWait(ctx uintptr, mode int) int

真正的定义在netpoll.go里面

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   err := netpollcheckerr(pd, int32(mode))
   if err != 0 {
      return err
   }
   // As for now only Solaris uses level-triggered IO.
   if GOOS == "solaris" {
      netpollarm(pd, mode)
   }
   for !netpollblock(pd, int32(mode), false) {
      err = netpollcheckerr(pd, int32(mode))
      if err != 0 {
         return err
      }
      // Can happen if timeout has fired and unblocked us,
      // but before we had a chance to run, timeout has been reset.
      // Pretend it has not happened and retry.
   }
   return 0
}

我们来看netpollblock

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   }

   // set the gpp semaphore to WAIT
   for {
      old := *gpp
      if old == pdReady {
         *gpp = 0
         return true
      }
      if old != 0 {
         throw("runtime: double wait")
      }
      if atomic.Casuintptr(gpp, 0, pdWait) {
         break
      }
   }

   // need to recheck error states after setting gpp to WAIT
   // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
   if waitio || netpollcheckerr(pd, mode) == 0 {
      gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
   }
   // be careful to not lose concurrent READY notification
   old := atomic.Xchguintptr(gpp, 0)
   if old > pdWait {
      throw("runtime: corrupted polldesc")
   }
   return old == pdReady
}

代码追踪到此结束。

开启read请求后,这里就调用了gopark,释放本goroutine的所有权,等待gpp锁,当epoll读取完成之后,系统会回调netpollblockcommit方法,并继续执行read。

所以一个看起来是普通socket.Read的同步方法,实际上底层是异步的,不会有上下文切换的问题,而使用golang socket的时候,也不要把它当成同步io,读到数据之后完全可以直接把数据通过通道发给后端处理逻辑,让整个系统都无阻塞。