问题

最近搞压测,写了一个压测的工具,就是想通过go来实现一秒发多少个请求,然后我写了一段这样的代码,如下,当然压测的代码我就没有贴了,我贴了主要核心代码,主要是起一个定时器,然后通过但仅此去读定时器的通道,然后for循环起goroutine,goroutine里面是进行并发的逻辑。

package main

import (
  "fmt"
  "time"
)

func main() {
  if err := recover(); err != nil {
    fmt.Println(err)
  }
  tick1 := time.NewTicker(1 * time.Second)
  for {
    select {
    case <-tick1.C:
      for i := 0; i < 100000; i++ {
        go func() {
          if err := recover(); err != nil {
            fmt.Println(err)
          }
          fmt.Println("xxx")
          time.Sleep(10 * time.Second)
        }()
      }
    }
  }
  time.Sleep(2 * time.Hour)
}

上面的代码执行完了之后发现会报错:

panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1287935 [running]:
internal/poll.(*fdMutex).rwlock(0xc0000b6060, 0xc13e5a2e00, 0x4a1527)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x146
internal/poll.(*FD).writeLock(...)
        /usr/local/go/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0xc0000b6060, 0xc08cd342c8, 0x4, 0x8, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:254 +0x6e
os.(*File).write(...)
        /usr/local/go/src/os/file_posix.go:48
os.(*File).Write(0xc0000b4008, 0xc08cd342c8, 0x4, 0x8, 0x0, 0x0, 0x0)
        /usr/local/go/src/os/file.go:173 +0x77
fmt.Fprintln(0x4e9300, 0xc0000b4008, 0xc13e5a2fb0, 0x1, 0x1, 0x0, 0x0, 0x0)
        /usr/local/go/src/fmt/print.go:265 +0x8b
fmt.Println(...)
        /usr/local/go/src/fmt/print.go:274
main.main.func1.1()
        /root/test_go/t5.go:25 +0x96
created by main.main.func1
        /root/test_go/t5.go:21 +0x5f

关键的报错信息是:

panic: too many concurrent operations on a single file or socket (max 1048575)

对单个 file/socket的并发操作个数超过了系统上限,这个是标准输出造成的。

解决方案

1:不同的应用程序,消耗的资源是不一样的。比较推荐的方式的是:应用程序来主动限制并发的协程数量。

关于上面的问题代码我们进行优化,通过channel来控制并发数。

  • make(chan struct{}, 300) 创建缓冲区大小为 300 的 channel,在没有被接收的情况下,至多发送 300 个消息则被阻塞。
  • 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
  • 协程任务结束,调用 <-ch 释放缓冲区。
package main

import (
  "fmt"
  "time"
)

func main() {
  if err := recover(); err != nil {
    fmt.Println(err)
  }
  tick1 := time.NewTicker(1 * time.Second)
  //初始化channel个数
  ch := make(chan struct{}, 300)
  for {
    select {
    case <-tick1.C:
      for i := 0; i < 100000; i++ {
        //这里往channel写数据
        ch <- struct{}{}
        go func() {
          if err := recover(); err != nil {
            fmt.Println(err)
          }
          fmt.Println("xxx")
          time.Sleep(10 * time.Second)
          //读取channel数据
          <-ch
        }()
      }
    }
  }
  time.Sleep(2 * time.Hour)
}

执行后,从日志中可以很容易看到,每秒钟只并发执行了 300 个任务,达到了协程并发控制的目的。

2:调整系统资源的上限

可以使用 ulimit -n 999999,将同时打开的文件句柄数量调整为 999999 来解决这个问题