推荐视频:

高并发技术之数据库连接池设计与实现:

c/c++ linux服务器开发学习地址:

1.项目目的

在高并发的情况,大量的TCP三次握手,MySQL server连接认证,MySQL server连接关闭回收资源,TCP四次挥手会耗费性能。本项目的目的是为了避免频繁的向数据库申请资源,释放资源带来性能损耗。

2.基本思路

为数据库的连接建立一个缓存池,预先在该缓存中放入一定数量的连接。当多个任务需要访问mysql时,不需要每个任务都去直接通过TCP连接mysql server,而是在该 缓存 池中取对应数量的连接即可。用完之后不需要释放该连接,只需要归还到连接池即可。

3关键点分析

1.利用mysql提供的api,自定义一个“连接”类。后面把该连接类放入容器中作为连接池。

2.基于上述分析,连接池的设计采用单例模式设计。

3.拟采用生产者-消费者线程模型,生产者负责产生连接,消费者负责使用连接。考虑并发情况,使用互斥锁和条件变量实现线程安全和同步,即:生产后再消费的同步。

4.实现连接池的容器考虑队列实现。在并发情况下,STL的queue不是线程安全的,可使用互斥锁实现线程安全。

5. 由于连接用完后是归还而不是释放,拟采用智能指针来管理连接,用lamda表达式来实现连接归还的功能。(因为 智能指针 出作用域自动析构,且申明指针智能时可以指定删除器,方便自定义归还功能)

6. 连接池中连接的数量为多少时性能最佳?

4 代码实现

4.1 “连接” 类的功能

分析可知,连接池中的“连接” 使用类实现。利用mysql提供的API可实现。

主要功能包括:

1.“连接”的构造和析构功能

2.连接数据库

3.对数据库的操作

4. 返回一个连接的空闲时间(用于释放多余产生的连接,后文会说明)

4.2“连接” 类的代码如下

注:( 头文件 是类的定义, 源文件 是类中成员方法的实现):

Connection.h

 #pragma once

#include <string>
#include <mysql.h>
#include < ctime >

using namespace std;

class Connection
{
public:
    // 初始化数据库连接
    Connection();

    // 释放数据库连接资源
    ~Connection();

    // 连接数据库
    bool connect(string ip,
        unsigned short port,
        string username,
        string password,
        string dbname);

    // 更新操作 insert、delete、update
    bool update(string sql);

    // 查询操作 select
    MYSQL_RES* query(string sql);

    // 刷新连接的起始空闲时刻
    // 记录每个队列的空闲时间,缓解服务器资源,在入队时
    void refreshAliveTime();

    // 返回连接空闲的时长
    clock_t getAliveTime();

private:
    MYSQL* _conn; // 表示和MySQL Server的一条连接
    clock_t _alivetime; // 记录进入空闲状态后的起始存活时刻(即在队列中出现的时刻)
};  

Connection.cpp

 #include "public.h"
# include  "Connection.h"

// 初始化数据库连接
Connection::Connection()
{
    _conn = mysql_init(nullptr);
}

// 释放数据库连接资源
Connection::~Connection()
{
    if (_conn != nullptr)
        mysql_close(_conn);
}

// 连接数据库
 bool  Connection::connect(string ip,
    unsigned short port,
    string username,
    string password,
    string dbname)
{
    MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(),
        password.c_str(), dbname.c_str(),
        port, nullptr, 0);

    //mysql_query(_conn, "set interactive_timeout=24*3600");

    return p != nullptr;
}

// 更新操作 insert、delete、update
bool Connection::update(string sql)
{
    bool a = mysql_query(_conn, sql.c_str());
    if (mysql_query(_conn, sql.c_str()))
    {
        LOG("更新失败:" + sql + "\nmysql_error:" + mysql_error(_conn));
        return false;
    }
    return true;
}

// 查询操作 select
MYSQL_RES* Connection::query(string sql)
{
    // 查询操作 select
    // 如果查询成功,返回0。如果出现错误,返回非0值

    if (mysql_query(_conn, sql.c_str()))
    {
        LOG("查询失败" + sql + "\nmysql_error:" + mysql_error(_conn));
        return nullptr;
    }
    return mysql_use_result(_conn);
}

// 刷新连接的起始空闲时刻
void Connection::refreshAliveTime()
{
    _alivetime = clock();
}

// 返回连接空闲的时长
clock_t Connection::getAliveTime()
{
    return clock() - _alivetime;
}  

4.3连接池的功能

连接池的主要参数:

1.初始连接数:连接池事先会准备一些连接备用。最小连接数需要根据实际情况不断测试决定,设置太多的话会出现很多空连接,浪费资源。

2.最大连接数:当并发请求太多了之后,初始量不够用了。这时候会根据需求创建更多的连接,但不能无限创建,因为考虑到资源浪费问题。

待实现的连接池的主要功能如下:

1.创建一个连接池对象。(因为是一个单例模式)

2.初始化连接数以及生产新连接(生产过程是连接池类内部多线程创建的,所以权限为private;另外需要定义一个连接数计算器,使用原子变atomic,就不需要用互斥锁来保护该计数器了)

3.从连接池中获取一个可用连接(消费过程是用户请求,权限为public;用完后归还到队列中)

4.回收连接(通过定义一个扫描函数,获取每个连接的空闲时间,用于多余连接的释放)

5.加载初始配置项,主要是数据库连接参数如用户名密码等(可选)。

4.4 连接池的代码如下:

CommonConnectionPool.h

 #pragma once

#include "Connection.h"
#include <string>
#include <queue>
#include <mutex>
#include <atomic>
#include <thread>
#include <memory>
#include <functional>
#include <condition_variable>

using namespace std;

// 实现连接池功能模块


class ConnectionPool
{
public:
    // 获取连接池对象实例(懒汉式单例模式,在获取实例时才实例化对象)
    static ConnectionPool* getConnectionPool();
    // 给外部提供接口,从连接池中获取一个可用的空闲连接
    //注意,这里不要直接返回指针,否则我们还需要定义一个(归还连接)的方法,还要自己去释放该指针。
    //这里直接返回一个智能指针,智能指针出作用域自动析构,(我们只需重定义析构即可--不释放而是归还) 
    shared_ptr<Connection> getConnection();
private:
    // 单例模式——构造函数私有化
    ConnectionPool();
    // 从配置文件中加载配置项
    bool loadConfigFile();

    // 运行在独立的线程中,专门负责生产新连接
    // 非静态成员方法,其调用依赖对象,要把其设计为一个线程函数,需要绑定this指针。 
    // 把该线程函数写为类的成员方法,最大的好处是 非常方便访问当前对象的成员变量。(数据)
    void produceConnectionTask();

    // 扫描超过maxIdleTime时间的空闲连接,进行对于连接的回收
    void scannerConnectionTask();

    string _ip;                    // MySQL的ip地址
    unsigned short _port;                  // MySQL的端口号,默认为3306
    string _username;              // MySQL登陆用户名
    string _password;              // MySQL登陆密码
    string _dbname;                // 连接的数据库名称
    int _initSize;              // 连接池的最大初始连接量
    int _maxSize;               // 连接池的最大连接量
    int _maxIdleTime;           // 连接池的最大空闲时间
    int _connectionTimeout;     // 连接池获取连接的超时时间

    // 存储MySQL连接的队列
    queue<Connection*> _connectionQue;
    // 维护连接队列的线程安全互斥锁
    mutex _queueMutex;

    // 记录connection连接的总数量
    atomic_int _connectionCnt;

    // 设置条件变量,用于连接生产线程和连接消费线程的通信
    condition_variable cv;
};  

【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL, Redis ,fastdfs, MongoDB ,ZK, 流媒体 , CDN ,P2P,K8S, Docker ,TCP/IP,协程,DPDK, ffmpeg 等)

CommonConnectionPool.cpp

 #include "CommonConnectionPool.h"
#include "public.h"

// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
    // 对于静态局部变量的初始化,编译器自动进行lock和unlock
    static ConnectionPool pool;
    return &pool
}

// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
    FILE* pf = fopen("mysql.ini", "r");
    if (pf == nullptr)
    {
        LOG("File 'mysql.ini' is not existing!");
        return false;
    }

    // 逐行处理配置文件中的配置字符串
    while (!feof(pf))
    {
        // 配置字符串举例:username=root\n

        // 从文件中获取一行配置字符串
        char line[1024] = { 0 };
         fgets (line, 1024, pf);
        string str = line;

        // 找到配置字符串中的'='
        int idx = str.find('=', 0);

        // 无效的配置项
        if (idx == -1)
        {
            // 当配置字符串中找不到'='时说明该配置字符串有问题或者是注释,将其忽略
            continue;
        }

        // 分别取出该行配置中的key和value
        int endIdx = str.find('\n', idx);
        string key = str.substr(0, idx);
        string value = str.substr(idx + 1, endIdx - idx - 1);

        if (key == "ip")
        {
            _ip = value;
        }
        else if (key == "port")
        {
            _port = atoi(value.c_str());
        }
        else if (key == "username")
        {
            _username = value;
        }
        else if (key == "password")
        {
            _password = value;
        }
        else if (key == "dbname")
        {
            _dbname = value;
        }
        else if (key == "maxSize")
        {
            _maxSize = atoi(value.c_str());
        }
        else if (key == "maxIdleTime")
        {
            _maxIdleTime = atoi(value.c_str());
        }
        else if (key == "connectionTimeout")
        {
            _connectionTimeout = atoi(value.c_str());
        }
        else if (key == "initSize")
        {
            _initSize = atoi(value.c_str());
        }

    }

    return true;
}

// 连接池的构造函数
ConnectionPool::ConnectionPool()
{
    // 加载配置项
    if (!loadConfigFile())
    {
        return;
    }

    // 创建初始数量的连接
    for (int i = 0; i < _initSize; ++i)
    {
        Connection* p = new Connection();
        p->connect(_ip, _port, _username, _password, _dbname);
        p->refreshAliveTime(); // 记录连接的起始空闲时刻
        _connectionQue.push(p);
        _connectionCnt++;
    }

    // 启动一个新的线程,作为连接的生产者
    thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
    produce.detach();   //守护线程

    // 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,并对其进行回收
    thread scanner(std::bind(&ConnectionPool::produceConnectionTask, this));
    scanner.detach();
}

// 运行在独立的线程中,专门负责产生新连接
void ConnectionPool::produceConnectionTask()
{
    for (;;)
    {
        unique_lock<mutex> lock(_queueMutex); //条件变量需要和互斥锁一块使用
        while (!_connectionQue.empty())
        {
            // 队列非空时,此处生产线程进入等待状态
            cv.wait(lock); //进入等待时,释放锁,保证消费者线程正常运行
        }
        // 连接数量没有到达上限,继续创建新的连接
        if (_connectionCnt < _maxSize)
        {
            Connection* p = new Connection();
            p->connect(_ip, _port, _username, _password, _dbname);
            _connectionQue.push(p);
            _connectionCnt++;
        }
        // 通知消费者线程,可以消费连接了
        cv.notify_all();
    }
}

// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
    unique_lock<mutex> lock(_queueMutex);
    while (_connectionQue.empty())
    {
        if (cv_status::timeout == cv.wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) //超时唤醒
        {
            if (_connectionQue.empty())
            {
                LOG("Failed to get connection:got idle connection timeout!");
                return nullptr;
            }
        }
    }

    /*
     * shared_ptr智能指针析构时,会把connection资源直接delete掉,
     * 相当于调用connection的析构函数,connection就被close掉了。
     * 这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到 queue 当中*/    shared_ptr<Connection> sp(_connectionQue.front(),
        [&](Connection* pcon)
        {
            // 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
            unique_lock<mutex> lock(_queueMutex);
            pcon->refreshAliveTime(); //在归还回空闲连接队列之前要记录一下连接开始空闲的时刻
            _connectionQue.push(pcon);
        });

    _connectionQue.pop();

    // 消费者取出一个连接之后,通知生产者,生产者检查队列,如果为空则生产
    cv.notify_all();

    return sp;
}

// 扫描超过maxIdleTime时间的空闲连接,进行对于连接的回收
void ConnectionPool::scannerConnectionTask()
{
    for (;;)
    {
        // 通过sleep实现定时效果
        this_thread::sleep_for(chrono::seconds(_maxIdleTime));

        // 扫描整个队列,释放多余的连接
        unique_lock<mutex> lock(_queueMutex);
        while (_connectionCnt > _initSize)
        {
            Connection* p = _connectionQue.front();
            if (p->getAliveTime() >= (_maxIdleTime * 1000))
            {
                _connectionQue.pop();
                _connectionCnt--;
                delete p; // 调用~Connection()释放连接
            }
            else
            {
                // 队头的连接没有超过_maxIdleTime,其它连接肯定也没有
                break;
            }
        }
    }
}  

4.5 配置文件 mysql.ini

 ip=127.0.0.1
port=3306
username=root
password=111111
dbname=chat
initSize=10
maxSize=1024
maxIdleTime=60
connectionTimeout=100  
5.测试函数 main.cpp及结果
 #pragma once
#include<iostream>
#include <string>
#include <ctime>
#include "Connection.h"
#include "CommonConnectionPool.h"
int n = 10000;//数据量
int main()
{



//不使用连接池,单线程:
clock_t begin = clock();
for (int i = 0; i < n; i++)
{
Connection conn;
char sql[1024] = { 0 };
sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
1, "a");
conn.connect("127.0.0.1", 3306, "root", "zh601572", "chat");
conn.update(sql); 
}
clock_t end = clock();
cout << end - begin << "ms" << endl;
return 0;


//不使用连接池,4线程:
//Connection conn;
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//clock_t begin = clock();
//thread t1([]()
//{
//for (int i = 0; i < n/4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//5, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t2([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//6, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t3([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//7, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t4([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//8, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//t1.join();
//t2.join();
//t3.join();
//t4.join();
//clock_t end = clock();
//cout << end - begin << "ms" << endl;
//return 0;



//使用连接池,单线程:
//clock_t begin = clock();
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//4, "zhouhui");
//sp ->update(sql);
//}
//clock_t end = clock();
//cout << end - begin << "ms" << endl;
//return 0;


//使用连接池,四线程:
//clock_t begin = clock();
//
//thread t1([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",4, "zhouhui");
//sp ->update(sql);
//}
//}
//);

//thread t2([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n /4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);

//thread t3([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);
//thread t4([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);

//t1.join();
//t2.join();
//t3.join();
//t4.join();

//clock_t end = clock();
//cout << (end - begin) << "ms" << endl;

//return 0;
}
  

结果如下

可以看到还是节约很多时间资源的。

6 思考

数据库连接池设置多少连接才合适?