epoll 加连接池

前几天看了epoll 使用,今天写了一个测试脚本,测试一下epoll加连接池的性能

50万个请求,连接池使用2000连接,发送 "test" 服务端接受后 转成大写返回,处理完所有的请求耗时3.731506996s,性能很强大(注意:需要在linux环境下测试)

为什么要使用连接池?

拿数据库举例,频繁的建立、关闭连接,会极大的降低mysql的性能,因为建立连接,释放连接引起的大量性能开销。

连接池技术带来的优势:

1、资源重用

由于tcp得到重用,避免了频繁创建、释放连接引起的大量性能开销。在减少系统消耗的基础上,另一方面也增进了系统运行环境的平稳性(减少内存碎片以及数据库临时进程/线程的数量)。

2、更快的系统响应速度

连接池在初始化后运行中。对于业务请求处理而言,大部分请求可以直接利用现有可用连接,避免了数据库连接初始化和释放过程的时间开销,从而缩减了系统整体响应时间。

3、连接数量的控制

过多的连接数量会拖垮整个服务,连接池可以设定activeConne连接数量,从客户端阻塞过多的连接,保证系统服务的平稳。

4、统一的连接管理,避免数据库连接泄漏

根据预先的连接占用超时设定,强制收回被占用连接。从而避免了常规数据库连接操作中可能出现的资源泄漏。

为什么使用epoll

首先对于一个tcp连接,操作系统会为每一个连接分配一定的内存空间外(主要是内部网络数据结构sk_buff的大小、连接的读写缓存,sof),虽然这些可以进行调优,但是如果想使用正常的操作系统的TCP/IP栈的话,这些是硬性的需求。刨去这些,不同的编程语言不同的框架的设计,甚至是不同的需求场景,都会极大的影响TCP服务器内存的占用和处理。

一般Go语言的TCP(和HTTP)的处理都是每一个连接启动一个goroutine去处理,因为我们被教导goroutine的不像thread, 它是很便宜的,可以在服务器上启动成千上万的goroutine。但是对于一百万的连接,这种goroutine-per-connection的模式就至少要启动一百万个goroutine,这对资源的消耗也是极大的。针对不同的操作系统和不同的Go版本,一个goroutine锁使用的最小的栈大小是2KB ~ 8 KB (go stack),如果在每个goroutine中在分配byte buffer用以从连接中读写数据,几十G的内存轻轻松松就分配出去了。

在linux测试过epoll性能,5万个tcp连接

1、client端

func main()  {
    connections:=50000
    addr:="127.0.0.1:8972"
    var conns []net.Conn
    for i := 0; i < connections; i++ {
        c, err := net.DialTimeout("tcp", addr, 10*time.Second)
        if err != nil {
            fmt.Println("failed to connect", i, err)
            i--
            continue
        }
        conns = append(conns, c)
        time.Sleep(time.Millisecond)
    }
    defer func() {
        for _, c := range conns {
            c.Close()
        }
    }()
    log.Printf("完成初始化 %d 连接", len(conns))
    tts := time.Millisecond * 5

    for {
        for i := 0; i < len(conns); i++ {
            time.Sleep(tts)
            conn := conns[i]
            conn.Write([]byte("hello world\r\n"))
        }
    }
}

2、普通的tcp连接

server.go

func main() {
    ln, err := net.Listen("tcp", "127.0.0.1:8972")
    if err != nil {
        panic(err)
    }
    var connections []net.Conn
    defer func() {
        for _, conn := range connections {
            conn.Close()
        }
    }()
    for {
        conn, e := ln.Accept()
        if e != nil {
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                log.Printf("accept temp err: %v", ne)
                continue
            }
            log.Printf("accept err: %v", e)
            return
        }
        go handleConn(conn)
        connections = append(connections, conn)
        if len(connections)%100 == 0 {
            log.Printf("total number of connections: %v", len(connections))
        }
    }
}
func handleConn(conn net.Conn) {
    io.Copy(ioutil.Discard, conn)
}

5万个tcp消耗的内存情况
clipboard.png

3、epoll 使用的是【百万 Go TCP 连接的思考: epoll方式减少资源占用】该博客的epoll的代码

5万个tcp消耗的内存情况
clipboard.png

下面使用连接池加epoll测试 qps 15万+

使用连接池,server使用epoll,使用2000个连接,处理完50万个请求,发送test ,返回TEST大写,耗时3.7s,处理完所有的请求,qps 15万+
github上有详细代码,地址:https://github.com/shanlongpa...

1、testPool.go是client端代码

   pool := &pools.Pool{
       MaxIdle:     100,
       MaxActive:   2000,
       IdleTimeout: 20 * time.Second,
       MaxConnLifetime: 100 * time.Second,
       Dial: func() (net.Conn, error) {
           c, err := net.Dial("tcp", "127.0.0.1:8972")
           if err != nil {
               return nil, err
           }
           return c, err
       },
   }
   defer pool.Close()

   t := time.Now()

   worklist := make(chan int)
   var wg sync.WaitGroup
   for i := 0; i < 2000; i++ {
       go func() {
           for range worklist {
               wg.Done()
               cli,err:=pool.Get()
               if err!=nil{
                   log.Println(err)
                   return
               }

               str:="test"

               err=pools.Write(cli.C,[]byte(str))

               if err!=nil{
                   log.Println(err)
                   pool.Put(cli,true)
                   return
               }
               _,err=pools.Read(cli.C)
               if err!=nil{
                   log.Println(err)
               }else{
                   //if i%500==0{
                   //    fmt.Println(string(receByte))
                   //}
               }
               pool.Put(cli,false)
           }
       }()
   }

   for i := 0; i < 500000; i++ {
       wg.Add(1)
       worklist <- i
   }

   fmt.Println("pool建立,连接数:",pool.Active)

   close(worklist)
   wg.Wait()
   // 调用服务
   fmt.Println(time.Since(t))

-连接池结构

type Pool struct {
    // 建立tcp连接
    Dial func() (net.Conn, error)

    // 健康检测,判断连接是否断开
    TestOnBorrow func(c net.Conn, t time.Time) error

    // 连接池中最大空闲连接数
    MaxIdle int

    // 打开最大的连接数
    MaxActive int

    // Idle多久断开连接,小于服务器超时时间
    IdleTimeout time.Duration

    // 配置最大连接数的时候,并且wait是true的时候,超过最大的连接,get的时候会阻塞,知道有连接放回到连接池
    Wait bool

    // 超过多久时间 链接关闭
    MaxConnLifetime time.Duration

    chInitialized uint32 // set to 1 when field ch is initialized 原子锁ch初始化一次

    mu     sync.Mutex    // 锁
    closed bool          // set to true when the pool is closed.
    Active int           // 连接池中打开的连接数
    ch     chan struct{} // limits open connections when p.Wait is true
    Idle   idleList      // idle 连接
}

// 空闲连,记录poolConn的头和尾
type idleList struct {
    count       int
    front, back *poolConn
}

// 连接的双向链表
type poolConn struct {
    C          net.Conn
    t          time.Time // idle 时间,即放会pool的时间
    created    time.Time //创建时间
    next, prev *poolConn
}

主要有两个方法Get(),获取一个可用的连接。 Put() 把连接放回到连接池

func (p *Pool) Get() (*poolConn, error) {

    // p.Wait == true. 的时候限制最大连接数
    if p.Wait && p.MaxActive > 0 {
        p.lazyInit()
        <-p.ch
    }

    p.mu.Lock()

    // 删除idle超时的连接,删除掉
    if p.IdleTimeout > 0 {
        n := p.Idle.count
        for i := 0; i < n && p.Idle.back != nil && p.Idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.Idle.back
            p.Idle.popBack()
            p.mu.Unlock()
            pc.C.Close()
            p.mu.Lock()
            p.Active--
        }
    }

    //从Idle list 获取一个可用的空闲链接.
    for p.Idle.front != nil {
        pc := p.Idle.front
        p.Idle.popFront()
        p.mu.Unlock()
        if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.C, pc.t) == nil) &&
            (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
            return pc, nil
        }
        pc.C.Close()
        p.mu.Lock()
        p.Active--
    }

    //pool关闭后直接return error
    if p.closed {
        p.mu.Unlock()
        return nil, errors.New("get on closed pool")
    }

    // Handle limit for p.Wait == false.
    if !p.Wait && p.MaxActive > 0 && p.Active >= p.MaxActive {
        p.mu.Unlock()
        return nil, errors.New("pool 耗尽了")
    }

    p.Active++
    p.mu.Unlock()
    c, err := p.Dial()
    if err != nil {
        c = nil
        p.mu.Lock()
        p.Active--
        if p.ch != nil && !p.closed {
            p.ch <- struct{}{}
        }
        p.mu.Unlock()
    }
    return &poolConn{C: c, created: nowFunc()}, err
}

func (p *Pool) Put(pc *poolConn, forceClose bool) error {
    p.mu.Lock()
    if !p.closed && !forceClose {
        pc.t = nowFunc()
        p.Idle.pushFront(pc)
        if p.Idle.count > p.MaxIdle {
            pc = p.Idle.back
            p.Idle.popBack()
        } else {
            pc = nil
        }
    }

    if pc != nil {
        p.mu.Unlock()
        pc.C.Close()
        p.mu.Lock()
        p.Active--
    }

    if p.ch != nil && !p.closed {
        p.ch <- struct{}{}
    }
    p.mu.Unlock()
    return nil
}

2、epollServer.go 是服务端代码

epoll 使用主要分为三部,第一步创建epoll,第二部,添加事件 EPOLL_CTL_ADD,第三步,等待EpollEvent.

func main() {
    setLimit()

    ln, err := net.Listen("tcp", "127.0.0.1:8972")
    if err != nil {
        panic(err)
    }
    epoller, err = MkEpoll()
    if err != nil {
        panic(err)
    }

    go start()

    for {
        conn, e := ln.Accept()
        if e != nil {
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                log.Printf("accept temp err: %v", ne)
                continue
            }

            log.Printf("accept err: %v", e)
            return
        }

        if err := epoller.Add(conn); err != nil {
            log.Printf("failed to add connection %v", err)
            conn.Close()
        }
    }
}
    
    
    //返回接受的信息,小写转成大写字母
 func replyConn(c net.Conn) error {
    data,err:= pools.Read(c)
    if err!=nil{
        return err
    }
    err= pools.Write(c,[]byte(strings.ToUpper(string(data))))
    return err
 }