前言:
有段时间没写博客了,顺手分享下前些时间写的一个通用连接池。应该有朋友要说 又是个轮子了。这还真不是要造轮子,像我们平时用的golang的mysql和redis库,基本都有连接池的实现。 但 go-nsq是没有连接池实现的。
说下怎么发现的这个问题。每当服务有大量的埋点数据需要推送到nsq时,会发现nsq推送不及时,publish的速度不给力。 服务进程的cpu不大,内存不大,网络是内网,也很小。io更很小。并发的协程也是足够的多,按道理应该很快就推送。 导出go的协程调用栈,发现不少协程都在等待nsq的chan发送。 分析go-nsq源码得知,publish的逻辑都是串行的,每个conn会new一个协程专门串行发送数据。 当我们调用publish和publishAsync其实都是往chan绑定的chan发送导入数据,然后又一个协程去串行发送。
也正是因为go-nsq做了这样的协程安全,让我怀疑整个进程就只有一个连接。用 lsof 发现服务跟nsq连接就只有一个…. go nsq的NewProducer的config里也没有任何连接池的相关配置参数。
feature_g 20593 work 33u IPv4 26183593 0t0 TCP xxxx:38966-> xxxxx:4150 (ESTABLISHED)
最简单的方法让每个常驻的协程绑定一个独占的连接,这样配置很简单,但是如果你的协程加大到500,那么连接也跟着涨到500, 不合理。 这里吐槽下 nsq的性能实在不咋地 ! 在大量的客户端建立连接时,出现连接失败的问题,需要不断的重试。
第二种:
连接池的代码扔到github了,有兴趣的朋友可以瞅瞅,使用方法简单,抽象了interface{}, 只要实现相应的方法就完事了。
最近项目催促的太紧,导致没时间好好打磨下go连接池的代码,自觉地写得不优美,尤其是锁的处理,但是可以用的,我已经把nsq和kafka都嵌入了这套连接池里了。
多说一句,请注意消息队列的连接池,尽量只放入生产端连接,因为消费端一般是用来回调,他自身也是有状态,不好控制释放连接的时机。
话说,很多的消息队列的go client都没有连接池的实现,不说nsq和kafka,还有rabbtimq。
// xiaorui.cc
var nsqPool pool.Pool
func initPool() {
//factory 创建连接的方法
factory := func() (interface{}, error) {
conn := tool.NewNsqEntry()
conn.InitProducer()
return conn, nil
}
//close 关闭连接的方法
close := func(v interface{}) error {
return v.(*tool.NsqEntry).Close()
}
//创建一个连接池: 初始化5,最大连接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}
//从连接池中取得一个连接
v, err := p.Get()
//do something
//conn=v.(net.Conn)
//将连接放回连接池中
p.Put(v)
//释放连接池中的所有连接
//p.Release()
//查看当前连接中的数量
current := p.Len()
fmt.Println("len=", current)
}
总结:
nsq和kafka使用这套连接池已经跑了一个多月了,我一直关注连接的个数及异常,到现在为止还没出问题。连接池的实现多种多样,貌似多是使用链表加锁实现的,我这个是使用channel来实现的,目的都一样,为了线程安全。