1 前言

阅读文章之前,请先了解一下thrift相关知识。thrift官方并没有提供客户端连接池的实现方案,而我们在实际使用时,thrift客户端必须复用,来保证较为可观的吞吐量,并避免在高QPS调用情况下,不断的创建、释放客户端所带来的机器端口耗尽问题。本文会详细讲解如何实现一个简单可靠的thrift客户端连接池,并通过对照实验来说明thrift客户端连接池所带来的好处。由于篇幅的原因,本文只粘出关键代码,源代码请查看Thrift Client Pool Demo

1.1 运行环境

  1. Golang版本: go1.14.3 darwin/amd64
  2. Thrift Golang库版本: 0.13.0
  3. Thrift IDL编辑器版本: 0.13.0

1.2 .thrift文件

namespace java com.czl.api.thrift.model
namespace cpp com.czl.api
namespace php com.czl.api
namespace py com.czl.api
namespace js com.czl.apixianz
namespace go com.czl.api


struct ApiRequest {
    1: required i16 id;
}

struct  ApiResponse{
    1:required string name;
}

// service1
service ApiService1{
    ApiResponse query(1:ApiRequest request)
}

// service2
service ApiService2{
    ApiResponse query(1:ApiRequest request)
}

注:请通过安装Thrift IDL编译器,并生成客户端、服务端代码。

1.3 对照实验说明

通过脚本开启100个协程并发调用rpc服务10分钟,统计这段时间内,未使用thrift客户端连接池与使用客户端连接池服务的平均吞吐量、Thrift API调用平均延迟、机器端口消耗等数据进行性能对比。

  1. 实验一: 未使用thrift客户端连接池
  2. 实验二: 使用thrift客户端连接池
2 Thrift客户端连接池实现

2.1 连接池的功能

首先,我们要明确一下连接池的职责,这里我简单的总结一下,连接池主要功能是维护连接的创建、释放,通过缓存连接来复用连接,减少创建连接所带来的开销,提高系统的吞吐量,一般连接池还会有连接断开的重连机制、超时机制等。这里我们可以先定义出大部分连接池都会有的功能,只是定义,可以先不管每个功能的具体实现。每一个空闲Thrift客户端其实底层都维护着一条空闲TCP连接,空闲Thrift客户端与空闲连接在这里其实是同一个概念。

......

// Thrift客户端创建方法,留给业务去实现
type ThriftDial func(addr string, connTimeout time.Duration) (*IdleClient, error)

// 关闭Thrift客户端,留给业务实现
type ThriftClientClose func(c *IdleClient) error

// Thrift客户端连接池
type ThriftPool struct {
	// Thrift客户端创建逻辑,业务自己实现
	Dial ThriftDial
	// Thrift客户端关闭逻辑,业务自己实现
	Close ThriftClientClose
	// 空闲客户端,用双端队列存储
	idle list.List
	// 同步锁,确保count、status、idle等公共数据并发操作安全
	lock *sync.Mutex
	// 记录当前已经创建的Thrift客户端,确保MaxConn配置
	count int32
	// Thrift客户端连接池状态,目前就open和stop两种
	status uint32
	// Thrift客户端连接池相关配置
	config *ThriftPoolConfig
}

// 连接池配置
type ThriftPoolConfig struct {
	// Thrfit Server端地址
	Addr string
	// 最大连接数
	MaxConn int32
	// 创建连接超时时间
	ConnTimeout time.Duration
	// 空闲客户端超时时间,超时主动释放连接,关闭客户端
	IdleTimeout time.Duration
	// 获取Thrift客户端超时时间
	Timeout time.Duration
	// 获取Thrift客户端失败重试间隔
	interval time.Duration
}

// Thrift客户端
type IdleClient struct {
	// Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节
	Transport thrift.TTransport
	// 真正的Thrift客户端,业务创建传入
	RawClient interface{}
}

// 封装了Thrift客户端
type idleConn struct {
    // 空闲Thrift客户端
	c *IdleClient
	// 最近一次放入空闲队列的时间
	t time.Time
}

// 获取Thrift空闲客户端
func (p *ThriftPool) Get() (*IdleClient, error) {
	// 1. 从空闲池中获取空闲客户端,获取到更新数据,返回,否则执行第2步
	// 2. 创建新到Thrift客户端,更新数据,返回Thrift客户端
	......
}

// 	归还Thrift客户端
func (p *ThriftPool) Put(client *IdleCLient) error {
	// 1. 如果客户端已经断开,更新数据,返回,否则执行第2步
	// 2. 将Thrift客户端丢进空闲连接池,更新数据,返回
	......
}

// 超时管理,定期释放空闲太久的连接
func (p *ThriftPool) CheckTimeout() {
	// 扫描空闲连接池,将空闲太久的连接主动释放掉,并更新数据
	......
}

// 异常连接重连
func (p *ThriftPool) Reconnect(client *IdleClient) (newClient *IdleClient, err error) {
	// 1. 关闭旧客户端
	// 2. 创建新的客户端,并返回
	......
}

// 其他方法
......

这里有两个关键的数据结构,ThriftPool和IdleClient,ThriftPool负责实现整个连接池的功能,IdleClient封装了真正的Thrift客户端。先看一下ThriftPool的定义:

// Thrift客户端创建方法,留给业务去实现
type ThriftDial func(addr string, connTimeout time.Duration) (*IdleClient, error)

// 关闭Thrift客户端,留给业务实现
type ThriftClientClose func(c *IdleClient) error

// Thrift客户端连接池
type ThriftPool struct {
	// Thrift客户端创建逻辑,业务自己实现
	Dial ThriftDial
	// Thrift客户端关闭逻辑,业务自己实现
	Close ThriftClientClose
	// 空闲客户端,用双端队列存储
	idle list.List
	// 同步锁,确保count、status、idle等公共数据并发操作安全
	lock *sync.Mutex
	// 记录当前已经创建的Thrift客户端,确保MaxConn配置
	count int32
	// Thrift客户端连接池状态,目前就open和stop两种
	status uint32
	// Thrift客户端连接池相关配置
	config *ThriftPoolConfig
}

// 连接池配置
type ThriftPoolConfig struct {
	// Thrfit Server端地址
	Addr string
	// 最大连接数
	MaxConn int32
	// 创建连接超时时间
	ConnTimeout time.Duration
	// 空闲客户端超时时间,超时主动释放连接,关闭客户端
	IdleTimeout time.Duration
	// 获取Thrift客户端超时时间
	Timeout time.Duration
	// 获取Thrift客户端失败重试间隔
	interval time.Duration
}
  1. Thrift客户端创建与关闭,涉及到业务细节,这里抽离成Dial方法和Close方法。
  2. 连接池需要维护空闲客户端,这里用双端队列来存储。
  3. 一般的连接池,都应该支持最大连接数配置,MaxConn可以配置连接池最大连接数,同时我们用count来记录连接池当前已经创建的连接。
  4. 为了实现连接池的超时管理,当然也得有相关超时配置。
  5. 连接池的状态、当前连接数等这些属性,是多协程并发操作的,这里用同步锁lock来确保并发操作安全。

在看一下IdleClient实现:

// Thrift客户端
type IdleClient struct {
	// Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节
	Transport thrift.TTransport
	// 真正的Thrift客户端,业务创建传入
	RawClient interface{}
}

// 封装了Thrift客户端
type idleConn struct {
    // 空闲Thrift客户端
	c *IdleClient
	// 最近一次放入空闲队列的时间
	t time.Time
}
  1. RawClient是真正的Thrift客户端,与实际逻辑相关。
  2. Transport Thrift传输层,Thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节。
  3. idleConn封装了IdleClient,用来实现空闲连接超时管理,idleConn记录一个时间,这个时间是Thrift客户端最近一次被放入空闲队列的时间。

2.2 获取连接

......

var nowFunc = time.Now

......

// 获取Thrift空闲客户端
func (p *ThriftPool) Get() (*IdleClient, error) {
	return p.get(nowFunc().Add(p.config.Timeout))
}

// 获取连接的逻辑实现
// expire设定了一个超时时间点,当没有可用连接时,程序会休眠一小段时间后重试
// 如果一直获取不到连接,一旦到达超时时间点,则报ErrOverMax错误
func (p *ThriftPool) get(expire time.Time) (*IdleClient, error) {
	if atomic.LoadUint32(&p.status) == poolStop {
		return nil, ErrPoolClosed
	}

	// 判断是否超额
	p.lock.Lock()
	if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
		p.lock.Unlock()
		// 不采用递归的方式来实现重试机制,防止栈溢出,这里改用循环方式来实现重试
		for {
			// 休眠一段时间再重试
			time.Sleep(p.config.interval)
			// 超时退出
			if nowFunc().After(expire) {
				return nil, ErrOverMax
			}
			p.lock.Lock()
			if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
				p.lock.Unlock()
			} else { // 有可用链接,退出for循环
				break
			}
		}
	}

	if p.idle.Len() == 0 {
		// 先加1,防止首次创建连接时,TCP握手太久,导致p.count未能及时+1,而新的请求已经到来
		// 从而导致短暂性实际连接数大于p.count(大部分链接由于无法进入空闲链接队列,而被关闭,处于TIME_WATI状态)
		atomic.AddInt32(&p.count, 1)
		p.lock.Unlock()
		client, err := p.Dial(p.config.Addr, p.config.ConnTimeout)
		if err != nil {
			atomic.AddInt32(&p.count, -1)
			return nil, err
		}
		// 检查连接是否有效
		if !client.Check() {
			atomic.AddInt32(&p.count, -1)
			return nil, ErrSocketDisconnect
		}

		return client, nil
	}

	// 从队头中获取空闲连接
	ele := p.idle.Front()
	idlec := ele.Value.(*idleConn)
	p.idle.Remove(ele)
	p.lock.Unlock()

	// 连接从空闲队列获取,可能已经关闭了,这里再重新检查一遍
	if !idlec.c.Check() {
		atomic.AddInt32(&p.count, -1)
		return nil, ErrSocketDisconnect
	}
	return idlec.c, nil
}
p.Get()p.config.MaxConnp.idle.Len() != 0 || atomic.LoadInt32(&p.count) < p.config.MaxConn
// 递归的方法实现等待重试逻辑
func (p *ThriftPool) get(expire time.Time) (*IdleClient, error) {
	// 超时退出
	if nowFunc().After(expire) {
		return nil, ErrOverMax
	}
	if atomic.LoadUint32(&p.status) == poolStop {
		return nil, ErrPoolClosed
	}

	// 判断是否超额
	p.lock.Lock()
	if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
		p.lock.Unlock()
		// 休眠递归重试
		time.Sleep(p.config.interval)
		p.get(expire)
	}
	.......
}

2.3 释放连接

// 归还Thrift客户端
func (p *ThriftPool) Put(client *IdleClient) error {
	if client == nil {
		return nil
	}

	if atomic.LoadUint32(&p.status) == poolStop {
		err := p.Close(client)
		client = nil
		return err
	}

	if atomic.LoadInt32(&p.count) > p.config.MaxConn || !client.Check() {
		atomic.AddInt32(&p.count, -1)
		err := p.Close(client)
		client = nil
		return err
	}

	p.lock.Lock()
	p.idle.PushFront(&idleConn{
		c: client,
		t: nowFunc(),
	})
	p.lock.Unlock()

	return nil
}
p.Put()p.countp.Get()p.Put()

2.4 超时管理

p.Get()p.Dial()
// 超时管理,定期释放空闲太久的连接
func (p *ThriftPool) CheckTimeout() {
	p.lock.Lock()
	for p.idle.Len() != 0 {
		ele := p.idle.Back()
		if ele == nil {
			break
		}
		v := ele.Value.(*idleConn)
		if v.t.Add(p.config.IdleTimeout).After(nowFunc()) {
			break
		}
		//timeout && clear
		p.idle.Remove(ele)
		p.lock.Unlock()

		p.Close(v.c) //close client connection
		atomic.AddInt32(&p.count, -1)

		p.lock.Lock()
	}
	p.lock.Unlock()
	return
}
p.Get()p.Put()

2.5 重连机制

thrift.TTransport.IsOpen()
// 检测连接是否有效
func (c *IdleClient) Check() bool {
	if c.Transport == nil || c.RawClient == nil {
		return false
	}
	return c.Transport.IsOpen()
}
c.Transport.IsOpen()falsec.Transport.IsOpen()c.Transport.Open()ThriftPoolThriftPoolAgent
package pool

import (
	"fmt"
	"github.com/apache/thrift/lib/go/thrift"
	"log"
	"net"
)

type ThriftPoolAgent struct {
	pool *ThriftPool
}

func NewThriftPoolAgent() *ThriftPoolAgent {
	return &ThriftPoolAgent{}
}

func (a *ThriftPoolAgent) Init(pool *ThriftPool) {
	a.pool = pool
}

// 真正的业务逻辑放到do方法做,ThriftPoolAgent只要保证获取到可用的Thrift客户端,然后传给do方法就行了
func (a *ThriftPoolAgent) Do(do func(rawClient interface{}) error) error {
	var (
		client *IdleClient
		err error
	)
	defer func() {
		if client != nil {
			if err == nil {
				if rErr := a.releaseClient(client); rErr != nil {
					log.Println(fmt.Sprintf("releaseClient error: %v", rErr))
				}
			} else if _, ok := err.(net.Error); ok {
				a.closeClient(client)
			} else if _, ok = err.(thrift.TTransportException); ok {
				a.closeClient(client)
			} else {
				if rErr := a.releaseClient(client); rErr != nil {
					log.Println(fmt.Sprintf("releaseClient error: %v", rErr))
				}
			}
		}
	}()
	// 从连接池里获取链接
	client, err = a.getClient()
	if err != nil {
		return err
	}
	if err = do(client.RawClient); err != nil {
		if _, ok := err.(net.Error); ok {
			log.Println(fmt.Sprintf("err: retry tcp, %T, %s", err, err.Error()))
			// 网络错误,重建连接
			client, err = a.reconnect(client)
			if err != nil {
				return err
			}
			return do(client.RawClient)
		}

		if _, ok := err.(thrift.TTransportException); ok {
			log.Println(fmt.Sprintf("err: retry tcp, %T, %s", err, err.Error()))
			// thrift传输层错误,也重建连接
			client, err = a.reconnect(client)
			if err != nil {
				return err
			}
			return do(client.RawClient)
		}
		return err
	}
	return nil
}

// 获取连接
func (a *ThriftPoolAgent) getClient() (*IdleClient, error) {
	return a.pool.Get()
}

// 释放连接
func (a *ThriftPoolAgent) releaseClient(client *IdleClient) error {
	return a.pool.Put(client)
}

// 关闭有问题的连接,并重新创建一个新的连接
func (a *ThriftPoolAgent) reconnect(client *IdleClient) (newClient *IdleClient, err error) {
	return a.pool.Reconnect(client)
}

// 关闭连接
func (a *ThriftPoolAgent) closeClient(client *IdleClient) {
	a.pool.CloseConn(client)
}

// 释放连接池
func (a *ThriftPoolAgent) Release() {
	a.pool.Release()
}

func (a *ThriftPoolAgent) GetIdleCount() uint32 {
	return a.pool.GetIdleCount()
}

func (a *ThriftPoolAgent) GetConnCount() int32 {
	return a.pool.GetConnCount()
}
3 对照实验

启用100个协程,不断调用Thrift服务端API 10分钟,对比服务平均吞吐量、Thrift API调用平均延迟、机器端口消耗。

平均吞吐量(r/s) = 总成功数 / 600

API调用平均延迟(ms/r) = 总成功数 / API成功请求总耗时(微秒) / 1000

机器端口消耗计算:netstat -nt | grep 9444 -c

3.1 实验一:未使用连接池

connect can't assign request address

3.2 实验二:使用连接池

  1. 机器端口消耗
    在这里插入图片描述
  2. 平均吞吐量、平均延迟
    在这里插入图片描述
    可以看出,用了连接池后,平均吞吐量可达到1.8w,API调用平均延迟才0.5ms,你可能会问,理论吞吐量不是可以达到1000 / 0.5 * 100 = 20w?理论归理论,如果按照1.8w吞吐量算,一次处理过程总时间消耗是1000 / (18000 / 100) = 5.6ms,所以这里影响吞吐量的因素已经不是API调用的耗时了,1.8w的吞吐量其实已经挺不错了。
    另外,消耗的端口数也才194/2 = 97(除余2是因为server端也在本地跑),而且都是ESTABLISH状态,连接一直保持着,不断的在被复用。连接被复用,少了创建TCP连接的三次握手环节,这里也可以解释为啥API调用的平均延迟可以从77ms降到0.5ms,不过0.5ms确实有点低,线上环境Server一般不会和Client在同一台机器,而且业务逻辑也会比这里复杂,API调用的平均延迟会相对高一点。
4 总结
  1. 调用Thrift API必须使用Thrift客户端连接池,否则在高并发的情况下,会有大量的TCP连接处于TIME_WAIT状态,机器端口被大量消耗,可能会导致部分请求失败甚至服务不可用。每次请求都重新创建TCP连接,进行TCP三次握手环节,API调用的延迟会比较高,服务的吞吐量也不会很高。
  2. 使用Thrift客户端连接池,可以提高系统的吞吐量,同时可以避免机器端口被耗尽的危险,提高服务的可靠性。