1、适用场景
在很多爬虫场景,需要使用代理或者登录的cookie等信息,可能需要让建立httpClient的连接和对应的cookie和代理,或者更多信息进行绑定,所以需要维护好自己的连接池,同时设置每个代理或者cookie的使用次数等.
注意:http请求的具体是json或者其他的格式,以及消息头等,可能需要具体分析。
2、代码整体介绍:
主要分四个模块:
2.1、main调用模块,是具体对连接池的使用;
2.2、连接池新建和监听接口的实现;
2.3、连接池的获取,以及连接使用结束后的释放功能;
2.4、 http连接建立,以及http具体请求。
2.1、main调用模块:
主要是初始化连接池 NewChanHttpClient,从连接池取client以及释放client,对使用后的连接池,如果超过十次,则放弃,因为此时代理或者cookie可能失效,需要重新建立。
package main
import (
"hunk0527/internal/doHttpPool"
"hunk0527/pkg/libHttp"
"net/http"
"time"
"github.com/astaxie/beego/logs"
)
func main() {
logs.Info("测试http爬虫复用连接池")
//建立连接池
handler := &DemoHandler{}
clientPool := doHttpPool.NewChanHttpClient(5, handler)
//取有效连接
client := clientPool.GetHttpClientValid(20)
if client == nil {
return
}
//使用client;
postUrl := "https://www.baidu.com"
postData := []byte("test")
header := make(http.Header)
resp, err := libHttp.HttpPostJsonClient(client, postUrl, postData, header)
logs.Info("查询返回值:", err, len(resp))
//释放client
if err == nil {
clientPool.ReleaseHttpClient(client)
}
time.Sleep(time.Second * 10)
//重新在取一个连接;
clientPool.GetHttpClientValid(20)
select {}
}
2.2、新连接创建接口实现模块:
主要为了实现NewHttpClientHandler连接池的具体实现方式,主要监听维护连接池总数,以及新建连接的过程,不同的爬虫新建连接可能差别很大,有些需要切换代理,有些可能需要滑块,有些需要登录cookie等,各种场景都不一样,具体NewHttpClient()中实现。
ListenClientPool()维护连接池最小有效长度。
package main
import (
"hunk0527/pkg/libHttp"
"sync"
"time"
"github.com/astaxie/beego/logs"
)
type DemoHandler struct {
Info string
}
// wg=nil 且isRetClient = true,表示直接创建新连接并立即返回,使用完后直接释放;
func (handler *DemoHandler) NewHttpClient(wg *sync.WaitGroup, timeout int64, isRetClient bool, channelClient chan *libHttp.HttpClientStu) *libHttp.HttpClientStu {
if wg != nil {
defer wg.Done()
}
//1、可能需要取代理地址,然后检测代理是否最近失效的;
proxyUrl := ""
client := libHttp.NewHttpClientOnce(proxyUrl, int(timeout), false)
//2、可能需要提前建立tcp连接;有些爬虫因为使用代理可能造成链接时间过长,如果等实际请求时在建立连接,可能会发生超时等异常情况.
//3、是否写入连接池;
if !isRetClient {
channelClient <- client
client = nil
}
return client
}
func (handler *DemoHandler) ListenClientPool(max int, channelClient chan *libHttp.HttpClientStu) {
if max <= 0 {
max = 5
}
for true {
time.Sleep(time.Second)
lenght := len(channelClient)
newNum := max - lenght
if lenght <= 0 {
newNum = max
}
if newNum > 0 {
wg := &sync.WaitGroup{}
for i := 0; i < newNum; i++ {
wg.Add(1)
go handler.NewHttpClient(wg, 40, false, channelClient)
}
wg.Wait()
logs.Info("监听连接池总数:原长度:%d,创建新连接数量:%d,当前长度:%d", lenght, newNum, len(channelClient))
}
logs.Info("当前长度:%d", len(channelClient))
}
return
}
2.3、连接池初始化,获取有效连接以及释放有效连接;
// 创建新连接的实现方法,可能需要添加代理,或者提前先建立好http连接;
// 监听http连接池等,维持连接数;
type NewHttpClientHandler interface {
NewHttpClient(wg *sync.WaitGroup, timeout int64, isRetClient bool, channelClient chan *libHttp.HttpClientStu) *libHttp.HttpClientStu
ListenClientPool(max int, channelClient chan *libHttp.HttpClientStu) //监听连接池数量,不需要监听可以写空
}
type HttpClientChannelStu struct {
ChanHttpClient chan *libHttp.HttpClientStu
Handler NewHttpClientHandler
}
var (
GlobalChanHttpClient *HttpClientChannelStu //有可能不需要使用此去全局变量;
)
//可能需要维护一个连接池,否则每次连接花费的时间会很长;
func NewChanHttpClient(max int, handler NewHttpClientHandler) *HttpClientChannelStu {
pp := &HttpClientChannelStu{}
pp.ChanHttpClient = make(chan *libHttp.HttpClientStu, 100)
pp.Handler = handler
//监听连接池;
go pp.Handler.ListenClientPool(max, pp.ChanHttpClient)
return pp
}
// 获取一个有效的httpClient;如果创建为空,最后强行创建一个连接;
func (pp *HttpClientChannelStu) GetHttpClientValid(timeout int64) *libHttp.HttpClientStu {
if pp == nil {
panic("未初始化连接池")
}
var client *libHttp.HttpClientStu
select {
case client = <-pp.ChanHttpClient:
logs.Info("clientChan有数据:剩余长度为:", len(pp.ChanHttpClient))
case <-time.After(time.Millisecond * 200):
logs.Info("clientChan中无数据,超时500ms退出\n")
}
if client == nil {
client = pp.Handler.NewHttpClient(nil, timeout, true, pp.ChanHttpClient)
}
logs.Info("取一个client结束:", client.Proxy, "使用次数:", client.UsedTimes)
return client
}
// 释放一个client;
func (pp *HttpClientChannelStu) ReleaseHttpClient(client *libHttp.HttpClientStu) {
if client == nil {
return
}
if client.UsedTimes > 10 {
logs.Info("本client已经使用超过十次,放弃:Proxy=%s,使用次数=%d!!!\n\n", client.Proxy, client.UsedTimes)
return
}
pp.ChanHttpClient <- client
logs.Info("释放一个httpClient", client.Proxy, client.UsedTimes)
return
}
2.4、http连接建立,以及http请求:
type HttpClientStu struct {
Client *http.Client
Proxy string //代理地址;
UsedTimes int64 //使用的次数;在请求的时候会加1;
Reserve1 string //预留参数,可能是cookie或者其他和client绑定参数;
Reserve2 string
}
func NewHttpClientOnce(proxyUrl string, timeout int, DisableKeepAlives bool) *HttpClientStu {
if timeout <= 0 {
timeout = 10
}
client := &http.Client{}
tr := &http.Transport{
....... //这中间省略很多步骤,可能根据具体需求建立
DisableKeepAlives: DisableKeepAlives,
}
if proxyUrl != "" {
proxy, _ := url.Parse(proxyUrl)
tr.Proxy = http.ProxyURL(proxy)
}
client.Timeout = time.Duration(timeout) * time.Second
client.Transport = tr
retClient := &HttpClientStu{
Client: client,
Proxy: proxyUrl,
}
return retClient
}
//请求json数据,简略版本,可能还有其他参数的添加等,完善一下此功能。
func HttpPostJsonClient(cli *HttpClientStu, postUrl string, postData []byte, header http.Header) ([]byte, error) {
req, _ := http.NewRequest("POST", postUrl, bytes.NewBuffer([]byte(postData)))
req.Header = header
//使用次数加1
cli.UsedTimes += 1
resp, err := cli.Client.Do(req)
if err != nil {
return []byte{}, err
}
defer resp.Body.Close()
resBytes, err := ResponseBinaryReader(resp)
return resBytes, err
}