写了一个爬虫,发现出现了socket泄露的情况。百度了一下发现是缺少了Response.Body.Close(),所以导致连接
没有被正常的关闭。也没有被gc回收。下面是文档中的说明
Callers should close resp.Body when done reading from it. If resp.Body
is not closed, the Client's underlying RoundTripper (typically Transport)
may not be able to re-use a persistent TCP connection to the server for a
subsequent "keep-alive" request.
解决问题很简单,不过引起了我想看看源码中简单的HTTP请求是如何实现的欲望。
Do函数(包括Post,Get)
首先我们用NewRequest构建了一个Request,里面包含了我们请求的url,如果是post请求还会包含请求的body,
随后会触发一个doFollowingRedirects函数,但是这里我们为了简化就不展开,直接看没有重定向的情况,也就是
通过Client.send函数继续向下传递这个Request
send函数
Client.send函数是对send函数的一个包装,目的是提取中Client cookie Jar 中的cookie放入Request中,以及
将Response中返回的cookie 装进Client的cookie Jar。
func send(ireq *Request, rt RoundTripper, deadline time.Time) (*Response, error)
当Client.send调用send的时候会将Transport作为rt参数传入进去,如果没有的话则会用Transport.go里面
默认的DefaultTransport.
随后send做了一些微小的工作,检测不完整的Request,setRequestCancel(如果设置了超时时间Timeout则这个函数会生效,第一次读的时候
会停止这个Timeout的计时,如果此时Request已经被Cancel了,那么返回一个error)。
随后调用rt的RoundTrip函数来获得Response.
Transport.RoundTrip函数
首先检测一下Request的信息完整性,然后看一下altProto里面有没有符合Scheme的RoundTrip实现。随后进入for循环,构建一个
connectMethod类型变量,随后通过Transport.getConn来拿到一个TCP连接,再通过调用persistConn.roundTrip来把
Request写入TCP中,完成发送请求。如果发送失败,则调用checkTransportResend来尝试重新发送这个Request.
Transport.altProto
最开始我也没有看懂这是在干嘛,后来找到了一个RegisterProtocol函数,才看明白这是在干什么。Transport作为一个可以复用的结构体实际上可以处理不同协议的请求,那么不同协议的请求就要有不同的实现,诸如ftp,file等。如果出现了这种情况,我们就可以通过RegisterProtocol来注册一些针对不同协议的实现,从而当Transport发送Request之前就可以通过map来确定到底要使用哪个RoundTrip。
Transport.connectMethod+
结构体中包括了代理地址,协议(HTTP or HTTPS),以及目的地址。需要注意的是,connectMethod类型是很关键的,
它不仅是Transport中一些map的键值,也是很多函数的参数。与其相似的结构体connectMethodKey中包含了和它一样的内容,只不过结构体
内变量的类型不同(connectMethodKey中的proxy是string,而connectMethod中的proxy是*url.URL)
Transport.getConn函数
首先通过getIdleConn函数来获取可用的空闲连接,如果有的话,直接返回。如果没有的话,用go(异步)的方式创建一个dialConn,然后通过
channel来将其送回getConn函数中。而在getConn中则是用select阻塞,等待返回。整个函数中比较复杂的机制在于情况的判定,譬如请求超时了
connection仍然没有返回,这个时候函数会调用handlePendingDial对connection进行处理,放入idle队列或者将其关闭。又或者是当我们请求的
connection没有返回而此时出现了一个空闲的connection,调用handlePendingDial等待我们申请的那个connection,将这个空闲的返回。
Transport.getIdleConn函数
关于空闲连接的在Transport中的两个map,搜索idleConn,如果存在多个则返回第一个,没有则返回nil
Transport.dialConn函数
首先创建一个persistConn类型的变量,然后检测Scheme,如果是TLS,HTTPS或者是使用了代理,那么通过DialTLS函数来创建
Conn,在这里我们不解释这个过程。如果是普通的HTTP,则通过Transport.dial来获得这个Conn.我们只看HTTP的处理过程,发现直接
跳过了函数里面的80行+.随后创建了persistConn的读写缓冲区放入结构体中。以异步方式打开persistConn的读写函数(readLoop和writeLoop)
persistConn
注释里已经写的非常全面了,我就做个搬运工.
// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
// alt optionally specifies the TLS NextProto RoundTripper.
// This is used for HTTP/2 today and future protocol laters.
// If it's non-nil, the rest of the fields are unused.
alt RoundTripper
t *Transport
cacheKey connectMethodKey
conn net.Conn
tlsState *tls.ConnectionState
br *bufio.Reader // from conn
sawEOF bool // whether we've seen EOF from conn; owned by readLoop
bw *bufio.Writer // to conn
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
closech chan struct{} // closed when conn closed
isProxy bool
// writeErrCh passes the request write error (usually nil)
// from the writeLoop goroutine to the readLoop which passes
// it off to the res.Body reader, which then uses it to decide
// whether or not a connection can be reused. Issue 7569.
writeErrCh chan error
lk sync.Mutex // guards following fields
numExpectedResponses int
closed error // set non-nil when conn is closed, before closech is closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
mutateHeaderFunc func(Header)
}
persistConn.roundTrip函数
首先调用replaceReqCanceler来探测Request是否已经触发了删除行为,如果是,就把persistConn放入putOrCloseIdleConn中处理。
实际上,go在实现HTTP请求的时候是有一个默认的Header,而在Request里面也实现了一个extraHeaders的方法。也就是说,在这一步的
时候HTTP Header才会真正的被完善。包括Accept-Encoding(gzip),Range,Connection(close).随后向writech里面写入Request,
在persistConn结构体中已经讲过,writech的接收者是writeloop,writeloop接收到了之后就会将其写入缓冲区并调用Flush,将err通过
channel返回。接下来roundTrip向reqch中写入requestAndChan,reqch的接受者是readloop,接下来函数select挂起几个管道,
用来监听一些写入错误,服务超时,连接关闭(或被删除),以及readloop传送回来的response.检查返回值没有问题之后将response返回。
Transport结构体中空闲连接部分
idleConn map[connectMethodKey][]*persistConn
idleConnCh map[connectMethodKey]chan *persistConn
第一个idleConn是以MethodKey作为键值的,为一个persistConn切片建立索引,可以想象的是倘若我们设置最大空闲连接为5(perhost),
那么我们可以通过MethodKey获得的最大空闲连接应该就是5个。
idleConnCh是对传送persistConn的管道建立索引,每次有人等待连接的时候都会建立一个这样管道。调用tryPutIdleConn的时候
会尝试着将已经收到的空闲连接放入管道内,如果放入成功则返回,放入失败则在idleConnCh删除这个索引。然后将其放入idleConn中。
Transport.dial函数
dial函数是调用的Transport结构体中的Dial func(network, addr string) (net.Conn, error).如果你没有创建这个函数的话,
默认的就是net.Dial函数。也就是调用底层函数了。
persistConn.readLoop函数
首先用defer注册一个close函数,用来关闭conn以及关闭persistConn中的closech以通知conn被关闭。然后进入循环,
首先用Peek(1)来探测是否发生了IO错误。在persistConn.reqch管道中读出requestAndChan类型变量,这个变量是用来匹配Request,
并且传入几个管道作为通信。随后调用persistConn.readResponse()来读出Response。后面做一些容错性的检查以及ResponseBody
的消息管道,最后用select挂起,等到persistConn的关闭或者Request的cancel,又或者是body的关闭,这个时候才会触发退出循环
或者继续循环的指令。那么最初因为没有写Response.Body.Close()所导致的问题就出在这里了。
总结
第一次看源码去解决问题,问题很快就得到解决了。这就正说明了绝大部分问题在源码中都有说明和注释。实话实说,我看的蛮吃力的,
自己写了一圈下来发现自己写的内容对读者并不是特别友好,更多的是对源码的一种简化版翻译。水平较低难免出错,期盼如果有大神
看到可以指出我的错误,也欢迎问题的交(gao)流(ji)