golang http client 连接池

net/httpclient

net/http client工作流程

client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func DoRequest(req *http.Request) (MyResponse, error) {
    client := &http.Client{}
    resp, err := client.Do(req)
    if resp != nil {
        defer resp.Body.Close()
    }
    if err != nil {
        return nil, err
    }
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    response := MyResponse{}
    response.Header = resp.Header
    ...
    response.Body = body

    return response, nil
}
http.Clientclient.Doreqclient.Getclient.Postclient.Doclient.Donet/httpgo/src/net/http/client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
if resp, didTimeout, err = c.send(req, deadline); err != nil {
            // c.send() always closes req.Body
            reqBodyClosed = true
            if !deadline.IsZero() && didTimeout() {
                err = &httpError{
                    err:     err.Error() + " (Client.Timeout exceeded while awaiting headers)",
                    timeout: true,
                }
            }
            return nil, uerr(err)
        }

var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
    return resp, nil
}
...

 

c.sendsend
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    if c.Jar != nil {
        for _, cookie := range c.Jar.Cookies(req.URL) {
            req.AddCookie(cookie)
        }
    }
    resp, didTimeout, err = send(req, c.transport(), deadline)
    if err != nil {
        return nil, didTimeout, err
    }
    if c.Jar != nil {
        if rc := resp.Cookies(); len(rc) > 0 {
            c.Jar.SetCookies(req.URL, rc)
        }
    }
    return resp, nil, nil
}
1
2
3
4
5
6
7
8
9
10
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    ...
        stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
    ...
        resp, err = rt.RoundTrip(req)
    ...
        return resp, nil, nil
}
rt.RoundTripinterface
1
2
// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.`

 

interfacesendc.transport()
1
2
3
4
5
6
func (c *Client) transport() RoundTripper {
    if c.Transport != nil {
        return c.Transport
    }
    return DefaultTransport
}

 

c.TransportDefaultTransportclientc.TransportDefaultTransportRoundTripper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// RoundTrip implements the RoundTripper interface.
//
// For higher-level HTTP client support (such as handling of cookies
// and redirects), see Get, Post, and the Client type.
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    ...
        for {
            ...
                pconn, err := t.getConn(treq, cm)
                ...
                if pconn.alt != nil {
                    // HTTP/2 path.
                    t.setReqCanceler(req, nil) // not cancelable with CancelRequest
                        resp, err = pconn.alt.RoundTrip(req)
                } else {
                    resp, err = pconn.roundTrip(treq)
                }
        }
    ...
}

 

HTTP/2t.getConnt.getConn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS.  If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
req := treq.Request
         trace := treq.trace
         ctx := req.Context()
         if trace != nil && trace.GetConn != nil {
             trace.GetConn(cm.addr())
         }
     if pc, idleSince := t.getIdleConn(cm); pc != nil {
         if trace != nil && trace.GotConn != nil {
             trace.GotConn(pc.gotIdleConnTrace(idleSince))
         }
         // set request canceler to some non-nil function so we
         // can detect whether it was cleared between now and when
         // we enter roundTrip
         t.setReqCanceler(req, func(error) {})
             return pc, nil
     }
     ...
         handlePendingDial := func() {
             testHookPrePendingDial()
                 go func() {
                     if v := <-dialc; v.err == nil {
                         t.putOrCloseIdleConn(v.pc)
                     }
                     testHookPostPendingDial()
                 }()
         }

cancelc := make(chan error, 1)
             t.setReqCanceler(req, func(err error) { cancelc <- err })

             go func() {
                 pc, err := t.dialConn(ctx, cm)
                     dialc <- dialRes{pc, err}
             }()
idleConnCh := t.getIdleConnCh(cm)
                select {
                    case v := <-dialc:
                            // Our dial finished.
                            if v.pc != nil {
                                if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
                                    trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
                                }
                                return v.pc, nil
                            }
                            // Our dial failed. See why to return a nicer error
                            // value.
                            select {
                                case <-req.Cancel:
                                    // It was an error due to cancelation, so prioritize that
                                    // error value. (Issue 16049)
                                    return nil, errRequestCanceledConn
                                case <-req.Context().Done():
                                        return nil, req.Context().Err()
                                case err := <-cancelc:
                                          if err == errRequestCanceled {
                                              err = errRequestCanceledConn
                                          }
                                          return nil, err
                                default:
                                              // It wasn't an error due to cancelation, so
                                              // return the original error message:
                                              return nil, v.err
                            }
                    case pc := <-idleConnCh:
                             // Another request finished first and its net.Conn
                             // became available before our dial. Or somebody
                             // else's dial that they didn't use.
                             // But our dial is still going, so give it away
                             // when it finishes:
                             handlePendingDial()
                                 if trace != nil && trace.GotConn != nil {
                                     trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
                                 }
                             return pc, nil
                    case <-req.Cancel:
                                 handlePendingDial()
                                     return nil, errRequestCanceledConn
                    case <-req.Context().Done():
                                     handlePendingDial()
                                         return nil, req.Context().Err()
                    case err := <-cancelc:
                              handlePendingDial()
                                  if err == errRequestCanceled {
                                      err = errRequestCanceledConn
                                  }
                              return nil, err
                }
}

 

下面是这个过程的流程图:

从上面可以看到,获取链接会优先从连接池中获取,如果连接池中没有可用的连接,则会创建一个连接或者从刚刚释放的连接中获取一个,这两个过程时同时进行的,谁先获取到连接就用谁的。
当新创建一个连接, 创建连接的函数定义如下:

1
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error)

 

最后这个函数会通过goroutine调用两个函数:

1
2
go pconn.readLoop()
go pconn.writeLoop()

 

readLoopwriteLoopreadLoop
1
2
3
4
5
6
7
8
9
10
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
// but after
alive = alive &&
!pc.sawEOF &&
    pc.wroteRequest() &&
tryPutIdleConn(trace)

 

这里可以看出,在处理完请求后,会立即把当前连接放到连接池中。

clientidleConn map[connectMethodKey][]*persistConnconnectMethodKeyclienthost*persistConnslicesliceMaxIdleConnsPerHostconst DefaultMaxIdleConnsPerHost = 2
clientserverserver
1
2
Connection: keep-alive
Connection: close

 

keep-aliveservercloseserverresponseTCPHTTP/1.1closekeep-alivenet/httpHTTP/1.1keep-aliveDisableKeepAlives
net/http
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
    if err := t.tryPutIdleConn(pconn); err != nil {
        pconn.close(err)
    }
}


// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
// an error explaining why it wasn't registered.
// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
    if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
        return errKeepAlivesDisabled
    }
    if pconn.isBroken() {
        return errConnBroken
    }
    if pconn.alt != nil {
        return errNotCachingH2Conn
    }
    pconn.markReused()
    key := pconn.cacheKey

    t.idleMu.Lock()
    defer t.idleMu.Unlock()
    waitingDialer := t.idleConnCh[key]
    select {
    case waitingDialer <- pconn:
        // We're done with this pconn and somebody else is
        // currently waiting for a conn of this type (they're
        // actively dialing, but this conn is ready
        // first). Chrome calls this socket late binding. See
        // https://insouciant.org/tech/connection-management-in-chromium/
        return nil
    default:
        if waitingDialer != nil {
            // They had populated this, but their dial won
            // first, so we can clean up this map entry.
            delete(t.idleConnCh, key)
        }
    }
    if t.wantIdle {
        return errWantIdle
    }
    if t.idleConn == nil {
        t.idleConn = make(map[connectMethodKey][]*persistConn)
    }
    idles := t.idleConn[key]
    if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
    }
    for _, exist := range idles {
        if exist == pconn {
            log.Fatalf("dup idle pconn %p in freelist", pconn)
        }
    }
    t.idleConn[key] = append(idles, pconn)
    t.idleLRU.add(pconn)
    if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
        oldest := t.idleLRU.removeOldest()
        oldest.close(errTooManyIdle)
        t.removeIdleConnLocked(oldest)
    }
    if t.IdleConnTimeout > 0 {
        if pconn.idleTimer != nil {
            pconn.idleTimer.Reset(t.IdleConnTimeout)
        } else {
            pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
        }
    }
    pconn.idleAt = time.Now()
    return nil
}

 

关闭连接DisableKeepAlivestrueConnections: closeserverserver

长连接与短连接

net/httpHTTP/1.1Connections: keep-aliveDefaultTransport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// DefaultTransport is the default implementation of Transport and is
// used by DefaultClient. It establishes network connections as needed
// and caches them for reuse by subsequent calls. It uses HTTP proxies
// as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and
// $no_proxy) environment variables.
var DefaultTransport RoundTripper = &Transport{
    Proxy: ProxyFromEnvironment, //代理使用
    DialContext: (&net.Dialer{
        Timeout:   30 * time.Second, //连接超时时间
        KeepAlive: 30 * time.Second, //连接保持超时时间
        DualStack: true, //
    }).DialContext,
    MaxIdleConns:          100, //client对与所有host最大空闲连接数总和
    IdleConnTimeout:       90 * time.Second, //空闲连接在连接池中的超时时间
    TLSHandshakeTimeout:   10 * time.Second, //TLS安全连接握手超时时间
    ExpectContinueTimeout: 1 * time.Second, //发送完请求到接收到响应头的超时时间
}

 

DefaultTransportMaxIdleConnsMaxIdleConnsPerHost
DisableKeepAlives = trueConnections:closeMaxIdleConnsPerHost < 0MaxIdleConnsPerHost < 0client
TIME_WAIT
ServerTIME_WAITTCPTIME_WAITserverserverclientnewclientclientclientserverkeep-alive

todo:补充tcpdump的分析结果

要解决这个问题以下几个方案:

1
2
3
4
#表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭  
net.ipv4.tcp_tw_reuse = 1  
#表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭  
net.ipv4.tcp_tw_recycle = 1
DisableKeepAlives = trueConnections: closeserverTIME_WAIT
net/http