前言:
有段时间没写博客了,顺手分享下前些时间写的一个通用连接池。应该有朋友要说 又是个轮子了。这还真不是要造轮子,像我们平时用的golang的mysql和redis库,基本都有连接池的实现。 但 go-nsq是没有连接池实现的。
说下怎么发现的这个问题。每当服务有大量的埋点数据需要推送到nsq时,会发现nsq推送不及时,publish的速度不给力。 服务进程的cpu不大,内存不大,网络是内网,也很小。io更很小。并发的协程也是足够的多,按道理应该很快就推送。 导出go的协程调用栈,发现不少协程都在等待nsq的chan发送。 分析go-nsq源码得知,publish的逻辑都是串行的,每个conn会new一个协程专门串行发送数据。 当我们调用publish和publishAsync其实都是往chan绑定的chan发送导入数据,然后又一个协程去串行发送。
也正是因为go-nsq做了这样的协程安全,让我怀疑整个进程就只有一个连接。用 lsof 发现服务跟nsq连接就只有一个…. go nsq的NewProducer的config里也没有任何连接池的相关配置参数。
feature_g 20593 work 33u IPv4 26183593 0t0 TCP xxxx:38966-> xxxxx:4150 (ESTABLISHED)
最简单的方法让每个常驻的协程绑定一个独占的连接,这样配置很简单,但是如果你的协程加大到500,那么连接也跟着涨到500, 不合理。 这里吐槽下 nsq的性能实在不咋地 ! 在大量的客户端建立连接时,出现连接失败的问题,需要不断的重试。
第二种:
连接池的代码扔到github了,有兴趣的朋友可以瞅瞅,使用方法简单,抽象了interface{}, 只要实现相应的方法就完事了。
最近项目催促的太紧,导致没时间好好打磨下go连接池的代码,自觉地写得不优美,尤其是锁的处理,但是可以用的,我已经把nsq和kafka都嵌入了这套连接池里了。
多说一句,请注意消息队列的连接池,尽量只放入生产端连接,因为消费端一般是用来回调,他自身也是有状态,不好控制释放连接的时机。
话说,很多的消息队列的go client都没有连接池的实现,不说nsq和kafka,还有rabbtimq。
// xiaorui.cc var nsqPool pool.Pool func initPool() { //factory 创建连接的方法 factory := func() (interface{}, error) { conn := tool.NewNsqEntry() conn.InitProducer() return conn, nil } //close 关闭连接的方法 close := func(v interface{}) error { return v.(*tool.NsqEntry).Close() } //创建一个连接池: 初始化5,最大连接30 poolConfig := &pool.PoolConfig{ InitialCap: 5, MaxCap: 30, Factory: factory, Close: close, //连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题 IdleTimeout: 15 * time.Second, } p, err := pool.NewChannelPool(poolConfig) if err != nil { fmt.Println("err=", err) } //从连接池中取得一个连接 v, err := p.Get() //do something //conn=v.(net.Conn) //将连接放回连接池中 p.Put(v) //释放连接池中的所有连接 //p.Release() //查看当前连接中的数量 current := p.Len() fmt.Println("len=", current) }
总结:
nsq和kafka使用这套连接池已经跑了一个多月了,我一直关注连接的个数及异常,到现在为止还没出问题。连接池的实现多种多样,貌似多是使用链表加锁实现的,我这个是使用channel来实现的,目的都一样,为了线程安全。