高并发技术之数据库连接池设计与实现:
c/c++ linux服务器开发学习地址:
1.项目目的在高并发的情况,大量的TCP三次握手,MySQL server连接认证,MySQL server连接关闭回收资源,TCP四次挥手会耗费性能。本项目的目的是为了避免频繁的向数据库申请资源,释放资源带来性能损耗。
2.基本思路为数据库的连接建立一个缓存池,预先在该缓存中放入一定数量的连接。当多个任务需要访问mysql时,不需要每个任务都去直接通过TCP连接mysql server,而是在该 缓存 池中取对应数量的连接即可。用完之后不需要释放该连接,只需要归还到连接池即可。
3关键点分析1.利用mysql提供的api,自定义一个“连接”类。后面把该连接类放入容器中作为连接池。
2.基于上述分析,连接池的设计采用单例模式设计。
3.拟采用生产者-消费者线程模型,生产者负责产生连接,消费者负责使用连接。考虑并发情况,使用互斥锁和条件变量实现线程安全和同步,即:生产后再消费的同步。
4.实现连接池的容器考虑队列实现。在并发情况下,STL的queue不是线程安全的,可使用互斥锁实现线程安全。
5. 由于连接用完后是归还而不是释放,拟采用智能指针来管理连接,用lamda表达式来实现连接归还的功能。(因为 智能指针 出作用域自动析构,且申明指针智能时可以指定删除器,方便自定义归还功能)
6. 连接池中连接的数量为多少时性能最佳?
4 代码实现4.1 “连接” 类的功能
分析可知,连接池中的“连接” 使用类实现。利用mysql提供的API可实现。
主要功能包括:
1.“连接”的构造和析构功能
2.连接数据库
3.对数据库的操作
4. 返回一个连接的空闲时间(用于释放多余产生的连接,后文会说明)
4.2“连接” 类的代码如下
注:( 头文件 是类的定义, 源文件 是类中成员方法的实现):
Connection.h
#pragma once
#include <string>
#include <mysql.h>
#include < ctime >
using namespace std;
class Connection
{
public:
// 初始化数据库连接
Connection();
// 释放数据库连接资源
~Connection();
// 连接数据库
bool connect(string ip,
unsigned short port,
string username,
string password,
string dbname);
// 更新操作 insert、delete、update
bool update(string sql);
// 查询操作 select
MYSQL_RES* query(string sql);
// 刷新连接的起始空闲时刻
// 记录每个队列的空闲时间,缓解服务器资源,在入队时
void refreshAliveTime();
// 返回连接空闲的时长
clock_t getAliveTime();
private:
MYSQL* _conn; // 表示和MySQL Server的一条连接
clock_t _alivetime; // 记录进入空闲状态后的起始存活时刻(即在队列中出现的时刻)
};
Connection.cpp
#include "public.h"
# include "Connection.h"
// 初始化数据库连接
Connection::Connection()
{
_conn = mysql_init(nullptr);
}
// 释放数据库连接资源
Connection::~Connection()
{
if (_conn != nullptr)
mysql_close(_conn);
}
// 连接数据库
bool Connection::connect(string ip,
unsigned short port,
string username,
string password,
string dbname)
{
MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(),
password.c_str(), dbname.c_str(),
port, nullptr, 0);
//mysql_query(_conn, "set interactive_timeout=24*3600");
return p != nullptr;
}
// 更新操作 insert、delete、update
bool Connection::update(string sql)
{
bool a = mysql_query(_conn, sql.c_str());
if (mysql_query(_conn, sql.c_str()))
{
LOG("更新失败:" + sql + "\nmysql_error:" + mysql_error(_conn));
return false;
}
return true;
}
// 查询操作 select
MYSQL_RES* Connection::query(string sql)
{
// 查询操作 select
// 如果查询成功,返回0。如果出现错误,返回非0值
if (mysql_query(_conn, sql.c_str()))
{
LOG("查询失败" + sql + "\nmysql_error:" + mysql_error(_conn));
return nullptr;
}
return mysql_use_result(_conn);
}
// 刷新连接的起始空闲时刻
void Connection::refreshAliveTime()
{
_alivetime = clock();
}
// 返回连接空闲的时长
clock_t Connection::getAliveTime()
{
return clock() - _alivetime;
}
4.3连接池的功能
连接池的主要参数:
1.初始连接数:连接池事先会准备一些连接备用。最小连接数需要根据实际情况不断测试决定,设置太多的话会出现很多空连接,浪费资源。
2.最大连接数:当并发请求太多了之后,初始量不够用了。这时候会根据需求创建更多的连接,但不能无限创建,因为考虑到资源浪费问题。
待实现的连接池的主要功能如下:
1.创建一个连接池对象。(因为是一个单例模式)
2.初始化连接数以及生产新连接(生产过程是连接池类内部多线程创建的,所以权限为private;另外需要定义一个连接数计算器,使用原子变atomic,就不需要用互斥锁来保护该计数器了)
3.从连接池中获取一个可用连接(消费过程是用户请求,权限为public;用完后归还到队列中)
4.回收连接(通过定义一个扫描函数,获取每个连接的空闲时间,用于多余连接的释放)
5.加载初始配置项,主要是数据库连接参数如用户名密码等(可选)。
4.4 连接池的代码如下:
CommonConnectionPool.h
#pragma once
#include "Connection.h"
#include <string>
#include <queue>
#include <mutex>
#include <atomic>
#include <thread>
#include <memory>
#include <functional>
#include <condition_variable>
using namespace std;
// 实现连接池功能模块
class ConnectionPool
{
public:
// 获取连接池对象实例(懒汉式单例模式,在获取实例时才实例化对象)
static ConnectionPool* getConnectionPool();
// 给外部提供接口,从连接池中获取一个可用的空闲连接
//注意,这里不要直接返回指针,否则我们还需要定义一个(归还连接)的方法,还要自己去释放该指针。
//这里直接返回一个智能指针,智能指针出作用域自动析构,(我们只需重定义析构即可--不释放而是归还)
shared_ptr<Connection> getConnection();
private:
// 单例模式——构造函数私有化
ConnectionPool();
// 从配置文件中加载配置项
bool loadConfigFile();
// 运行在独立的线程中,专门负责生产新连接
// 非静态成员方法,其调用依赖对象,要把其设计为一个线程函数,需要绑定this指针。
// 把该线程函数写为类的成员方法,最大的好处是 非常方便访问当前对象的成员变量。(数据)
void produceConnectionTask();
// 扫描超过maxIdleTime时间的空闲连接,进行对于连接的回收
void scannerConnectionTask();
string _ip; // MySQL的ip地址
unsigned short _port; // MySQL的端口号,默认为3306
string _username; // MySQL登陆用户名
string _password; // MySQL登陆密码
string _dbname; // 连接的数据库名称
int _initSize; // 连接池的最大初始连接量
int _maxSize; // 连接池的最大连接量
int _maxIdleTime; // 连接池的最大空闲时间
int _connectionTimeout; // 连接池获取连接的超时时间
// 存储MySQL连接的队列
queue<Connection*> _connectionQue;
// 维护连接队列的线程安全互斥锁
mutex _queueMutex;
// 记录connection连接的总数量
atomic_int _connectionCnt;
// 设置条件变量,用于连接生产线程和连接消费线程的通信
condition_variable cv;
};
【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL, Redis ,fastdfs, MongoDB ,ZK, 流媒体 , CDN ,P2P,K8S, Docker ,TCP/IP,协程,DPDK, ffmpeg 等)
CommonConnectionPool.cpp
#include "CommonConnectionPool.h"
#include "public.h"
// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
// 对于静态局部变量的初始化,编译器自动进行lock和unlock
static ConnectionPool pool;
return &pool
}
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
FILE* pf = fopen("mysql.ini", "r");
if (pf == nullptr)
{
LOG("File 'mysql.ini' is not existing!");
return false;
}
// 逐行处理配置文件中的配置字符串
while (!feof(pf))
{
// 配置字符串举例:username=root\n
// 从文件中获取一行配置字符串
char line[1024] = { 0 };
fgets (line, 1024, pf);
string str = line;
// 找到配置字符串中的'='
int idx = str.find('=', 0);
// 无效的配置项
if (idx == -1)
{
// 当配置字符串中找不到'='时说明该配置字符串有问题或者是注释,将其忽略
continue;
}
// 分别取出该行配置中的key和value
int endIdx = str.find('\n', idx);
string key = str.substr(0, idx);
string value = str.substr(idx + 1, endIdx - idx - 1);
if (key == "ip")
{
_ip = value;
}
else if (key == "port")
{
_port = atoi(value.c_str());
}
else if (key == "username")
{
_username = value;
}
else if (key == "password")
{
_password = value;
}
else if (key == "dbname")
{
_dbname = value;
}
else if (key == "maxSize")
{
_maxSize = atoi(value.c_str());
}
else if (key == "maxIdleTime")
{
_maxIdleTime = atoi(value.c_str());
}
else if (key == "connectionTimeout")
{
_connectionTimeout = atoi(value.c_str());
}
else if (key == "initSize")
{
_initSize = atoi(value.c_str());
}
}
return true;
}
// 连接池的构造函数
ConnectionPool::ConnectionPool()
{
// 加载配置项
if (!loadConfigFile())
{
return;
}
// 创建初始数量的连接
for (int i = 0; i < _initSize; ++i)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshAliveTime(); // 记录连接的起始空闲时刻
_connectionQue.push(p);
_connectionCnt++;
}
// 启动一个新的线程,作为连接的生产者
thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
produce.detach(); //守护线程
// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,并对其进行回收
thread scanner(std::bind(&ConnectionPool::produceConnectionTask, this));
scanner.detach();
}
// 运行在独立的线程中,专门负责产生新连接
void ConnectionPool::produceConnectionTask()
{
for (;;)
{
unique_lock<mutex> lock(_queueMutex); //条件变量需要和互斥锁一块使用
while (!_connectionQue.empty())
{
// 队列非空时,此处生产线程进入等待状态
cv.wait(lock); //进入等待时,释放锁,保证消费者线程正常运行
}
// 连接数量没有到达上限,继续创建新的连接
if (_connectionCnt < _maxSize)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
_connectionQue.push(p);
_connectionCnt++;
}
// 通知消费者线程,可以消费连接了
cv.notify_all();
}
}
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
unique_lock<mutex> lock(_queueMutex);
while (_connectionQue.empty())
{
if (cv_status::timeout == cv.wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) //超时唤醒
{
if (_connectionQue.empty())
{
LOG("Failed to get connection:got idle connection timeout!");
return nullptr;
}
}
}
/*
* shared_ptr智能指针析构时,会把connection资源直接delete掉,
* 相当于调用connection的析构函数,connection就被close掉了。
* 这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到 queue 当中*/ shared_ptr<Connection> sp(_connectionQue.front(),
[&](Connection* pcon)
{
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime(); //在归还回空闲连接队列之前要记录一下连接开始空闲的时刻
_connectionQue.push(pcon);
});
_connectionQue.pop();
// 消费者取出一个连接之后,通知生产者,生产者检查队列,如果为空则生产
cv.notify_all();
return sp;
}
// 扫描超过maxIdleTime时间的空闲连接,进行对于连接的回收
void ConnectionPool::scannerConnectionTask()
{
for (;;)
{
// 通过sleep实现定时效果
this_thread::sleep_for(chrono::seconds(_maxIdleTime));
// 扫描整个队列,释放多余的连接
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
Connection* p = _connectionQue.front();
if (p->getAliveTime() >= (_maxIdleTime * 1000))
{
_connectionQue.pop();
_connectionCnt--;
delete p; // 调用~Connection()释放连接
}
else
{
// 队头的连接没有超过_maxIdleTime,其它连接肯定也没有
break;
}
}
}
}
4.5 配置文件 mysql.ini
ip=127.0.0.1
port=3306
username=root
password=111111
dbname=chat
initSize=10
maxSize=1024
maxIdleTime=60
connectionTimeout=100
5.测试函数 main.cpp及结果 #pragma once
#include<iostream>
#include <string>
#include <ctime>
#include "Connection.h"
#include "CommonConnectionPool.h"
int n = 10000;//数据量
int main()
{
//不使用连接池,单线程:
clock_t begin = clock();
for (int i = 0; i < n; i++)
{
Connection conn;
char sql[1024] = { 0 };
sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
1, "a");
conn.connect("127.0.0.1", 3306, "root", "zh601572", "chat");
conn.update(sql);
}
clock_t end = clock();
cout << end - begin << "ms" << endl;
return 0;
//不使用连接池,4线程:
//Connection conn;
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//clock_t begin = clock();
//thread t1([]()
//{
//for (int i = 0; i < n/4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//5, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t2([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//6, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t3([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//7, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//thread t4([]()
//{
//for (int i = 0; i < n / 4; ++i)
//{
//Connection conn;
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//8, "a");
//conn.connect("localhost", 3306, "root", "zh601572", "chat");
//conn.update(sql);
//}
//});
//t1.join();
//t2.join();
//t3.join();
//t4.join();
//clock_t end = clock();
//cout << end - begin << "ms" << endl;
//return 0;
//使用连接池,单线程:
//clock_t begin = clock();
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",
//4, "zhouhui");
//sp ->update(sql);
//}
//clock_t end = clock();
//cout << end - begin << "ms" << endl;
//return 0;
//使用连接池,四线程:
//clock_t begin = clock();
//
//thread t1([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')",4, "zhouhui");
//sp ->update(sql);
//}
//}
//);
//thread t2([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n /4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);
//thread t3([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);
//thread t4([]()
//{
//ConnectionPool* cp = ConnectionPool::getConnectionPool();
//for (int i = 0; i < n / 4; i++)
//{
//shared_ptr<Connection> sp = cp->getConnection();
//char sql[1024] = { 0 };
//sprintf(sql, "insert into t1(id,name) values(%d,'%s')", 4, "zhouhui");
//sp->update(sql);
//}
//}
//);
//t1.join();
//t2.join();
//t3.join();
//t4.join();
//clock_t end = clock();
//cout << (end - begin) << "ms" << endl;
//return 0;
}
结果如下
可以看到还是节约很多时间资源的。
6 思考数据库连接池设置多少连接才合适?