本文借助 endless 源码来阐述。重点看 fork() 之后的逻辑。
func (srv *endlessServer) fork() (err error) {
runningServerReg.Lock()
defer runningServerReg.Unlock()
// only one server instance should fork!
if runningServersForked {
return errors.New("Another process already forked. Ignoring this one.")
}
runningServersForked = true
var files = make([]*os.File, len(runningServers))
var orderArgs = make([]string, len(runningServers))
// get the accessor socket fds for _all_ server instances
for _, srvPtr := range runningServers {
// introspect.PrintTypeDump(srvPtr.EndlessListener)
switch srvPtr.EndlessListener.(type) {
case *endlessListener:
// normal listener
files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
default:
// tls listener
files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
}
orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
}
env := append(
os.Environ(),
"ENDLESS_CONTINUE=1",
)
if len(runningServers) > 1 {
env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
}
path := os.Args[0]
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}
cmd := exec.Command(path, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = files
cmd.Env = env
err = cmd.Start()
if err != nil {
log.Fatalf("Restart: Failed to launch, error: %v", err)
}
return
}
func (el *endlessListener) File() *os.File {
// returns a dup(2) - FD_CLOEXEC flag *not* set
// 这个注释不对
tl := el.Listener.(*net.TCPListener)
fl, _ := tl.File()
return fl
}
// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) {
if !l.ok() {
return nil, syscall.EINVAL
}
f, err = l.file()
if err != nil {
return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return
}
// dup 得到的是 fd 的一个副本
func (ln *TCPListener) file() (*os.File, error) {
f, err := ln.fd.dup()
if err != nil {
return nil, err
}
return f, nil
}
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// net/fd_unix.go
func (fd *netFD) dup() (f *os.File, err error) {
ns, call, err := fd.pfd.Dup()
if err != nil {
if call != "" {
err = os.NewSyscallError(call, err)
}
return nil, err
}
return os.NewFile(uintptr(ns), fd.name()), nil
}
// internal/poll/fd_mutex.go
// incref adds a reference to mu.
// It reports whether mu is available for reading or writing.
// decref removes a reference from mu.
// It reports whether there is no remaining reference.
// internal/poll/fd_unix.go
// Dup duplicates the file descriptor.
func (fd *FD) Dup() (int, string, error) {
if err := fd.incref(); err != nil {
return -1, "", err
}
defer fd.decref()
return DupCloseOnExec(fd.Sysfd)
}
// dupCloseOnExecOld is the traditional way to dup an fd and
// set its O_CLOEXEC bit, using two system calls.
func dupCloseOnExecOld(fd int) (int, string, error) {
syscall.ForkLock.RLock()
defer syscall.ForkLock.RUnlock()
newfd, err := syscall.Dup(fd)
if err != nil {
return -1, "dup", err
}
// 为新的fd设置close_on_exec标志位
syscall.CloseOnExec(newfd)
return newfd, "", nil
}
fork()File()listener fdfd-1fdfd-1O_CLOEXEC
dupsyscall.Dup(fd)
- dup系统调用会创建文件描述符的一个拷贝。
- 新生成的文件描述符是进程当前可用的文件描述符编号的最小值。
- 如果拷贝成功,两者都指向同一个打开的文件,因此共享所有的锁定,读写指针,和各项权限或标志位,因此可能存在交叉使用。
- 如果拷贝错误则返回-1,错误代码存入errno中。
- 调用dup族类函数得到的新文件描述符将清除O_CLOEXEC模式,这也是为什么源码中在dup之后重新设置O_CLOEXEC
2、关于 O_CLOEXEC
- 在一个进程中通过exec调用启动一个子进程,这个子进程会继承父进程打开的fd以及进行操作,出于安全考虑,有些fd,我们要求在子进程中无法继承,就需要设置这些fd的 O_CLOEXEC ,那么在启动子进程成功之后,内核会关闭子进程中的这些fd。
- 调用open函数 O_CLOEXEC 模式打开的文件描述符在执行exec调用得到的新程序中关闭,且为原子操作。
- 调用open函数不使用 O_CLOEXEC 模式打开的文件描述符,然后调用 fcntl 函数设置 FD_CLOEXEC 选项,效果和使用 O_CLOEXEC 选项open函数相同,但分别调用open、fcnt两个l函数,不是原子操作,多线程环境中存在竞态条件,故用open函数O_CLOEXEC选项代替之。
execCmd.ExtraFiles
func openFile(name string, flag int, perm FileMode) (file *File, err error) {
r, e := syscall.Open(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))
if e != nil {
return nil, e
}
return newFile(r, name, "file"), nil
}
/dev/null
3+iCmd.ExtraFiles0-stdin,1-stdout,2-stderrCmd.ExtraFiles
至此,fd-1 文件描述符就被成功传入子进程中,它是 fd 的副本,文件描述符都会对应一个内核中已经打开的文件,只不过 socket fd 比较特殊,它不是指向文件系统,而是保存在内存中的结构体SOCKET,在这个结构体里面有一个发送队列和一个接收队列。fd, fd-1 目前同时指向了这个SOCKET。
getListener()
func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
if srv.isChild {
var ptrOffset uint = 0
runningServerReg.RLock()
defer runningServerReg.RUnlock()
if len(socketPtrOffsetMap) > 0 {
ptrOffset = socketPtrOffsetMap[laddr]
}
// 使用 fd 编号构建 *os.File 对象
f := os.NewFile(uintptr(3+ptrOffset), "")
// 使用此 *os.File 对象构建 net.Listener 对象
l, err = net.FileListener(f)
if err != nil {
err = fmt.Errorf("net.FileListener error: %v", err)
return
}
} else {
l, err = net.Listen("tcp", laddr)
if err != nil {
err = fmt.Errorf("net.Listen error: %v", err)
return
}
}
return
}
*os.Filenet.Listenernet.Listen()
至此,就变成了两个进程来消费SOCKET中的一个接收队列,这两个进程可以同时存在,以轮训的方式来消费。
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la) // *******
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
// 在这里完成了 socker_create, socker_bind, socket_listen 等基础工作
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
然后去发信号关闭父进程。
func (el *endlessListener) Close() error {
if el.stopped {
return syscall.EINVAL
}
el.stopped = true
return el.Listener.Close()
}
// internal/poll/fd_unix.go
// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
// the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any
// attempts to block in the pollDesc will return errClosing(fd.isFile).
fd.pd.evict()
// The call to decref will call destroy if there are no other
// references.
err := fd.decref()
// Wait until the descriptor is closed. If this was the only
// reference, it is already closed. Only wait if the file has
// not been set to blocking mode, as otherwise any current I/O
// may be blocking, and that would block the Close.
// No need for an atomic read of isBlocking, increfAndClose means
// we have exclusive access to fd.
if fd.isBlocking == 0 {
runtime_Semacquire(&fd.csema)
}
return err
}
Listener.Close()
关于查看进程的fd
lsof -p 1299
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
testproc 1299 root cwd DIR 0,58 4096 415 /data/www/golang/project/hashcompare
testproc 1299 root rtd DIR 253,0 4096 128 /
testproc 1299 root txt REG 0,58 6223068 433 /data/www/golang/project/hashcompare/testproc
testproc 1299 root mem REG 253,0 2156240 33632456 /usr/lib64/libc-2.17.so
testproc 1299 root mem REG 253,0 142144 34128089 /usr/lib64/libpthread-2.17.so
testproc 1299 root mem REG 253,0 163312 34127918 /usr/lib64/ld-2.17.so
testproc 1299 root 0u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 1u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 2u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 3u REG 0,58 166 435 /data/www/golang/project/hashcompare/testproc_f1.txt
testproc 1299 root 4u a_inode 0,9 0 4554 [eventpoll]
testproc 1299 root 5r FIFO 0,8 0t0 936660 pipe
testproc 1299 root 6w FIFO 0,8 0t0 936660 pipe
testproc 1299 root 7u IPv6 936664 0t0 TCP *:6060 (LISTEN)
或者
ls -l /proc/1299/fd
总用量 0
lrwx------ 1 root root 64 11月 16 09:53 0 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 1 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 2 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 3 -> /data/www/golang/project/hashcompare/testproc_f1.txt
lrwx------ 1 root root 64 11月 16 09:53 4 -> anon_inode:[eventpoll]
lr-x------ 1 root root 64 11月 16 09:53 5 -> pipe:[936660]
l-wx------ 1 root root 64 11月 16 09:53 6 -> pipe:[936660]
lrwx------ 1 root root 64 11月 16 09:53 7 -> socket:[936664]