C++ (Qt)#pragma once#include <QSqlDatabase>#include <QThread>#include <QMap> class ConnectionPool : public QObject{ Q_OBJECTpublic: explicit ConnectionPool(const QString driverName, const QString& dbName, const QString& dbHost, const quint16& dbPort, const QString& userName, const QString& userPassword, QObject* parent = 0); ~ConnectionPool(); QSqlDatabase getConnection(); QSqlDatabase getConnectionForThread(QThread* thread); private: QString generateConnectionName(QThread* thread) const; void onThreadDestroyed(); private: const QString _driverName; const QString _dbName; const QString _dbHost; const quint16 _dbPort; const QString _userName; const QString _userPassword; QMap<QThread*, QString> _threadConnectionNameMap; QMutex _mutex;};
C++ (Qt)#include "ConnectionPool.h"#include <QDataStream>#include <QCryptographicHash> ConnectionPool::ConnectionPool(const QString driverName, const QString& dbName, const QString& dbHost, const quint16& dbPort, const QString& userName, const QString& userPassword, QObject* parent) : QObject(parent), _driverName(driverName), _dbName(dbName), _dbHost(dbHost), _dbPort(dbPort), _userName(userName), _userPassword(userPassword){} ConnectionPool::~ConnectionPool(){ foreach (QThread* thread, _threadConnectionNameMap.keys()) { QSqlDatabase::removeDatabase(_threadConnectionNameMap.value(thread)); }} QSqlDatabase ConnectionPool::getConnection(){ return getConnectionForThread(QThread::currentThread());} QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread){ QSqlDatabase db; if(_threadConnectionNameMap.contains(thread)) { db = QSqlDatabase::database(_threadConnectionNameMap.value(thread)); } else { QMutexLocker locker(&_mutex); const QString connectionName = generateConnectionName(thread); db = QSqlDatabase::addDatabase(_driverName, connectionName); _threadConnectionNameMap.insert(thread, connectionName); connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed); db.setDatabaseName(_dbName); db.setUserName(_userName); db.setPassword(_userPassword); db.setHostName(_dbHost); db.setPort(_dbPort); } if(!db.isOpen()) db.open(); return db;} QString ConnectionPool::generateConnectionName(QThread* thread) const{ QByteArray data; QDataStream stream(&data, QIODevice::WriteOnly); stream.setVersion(QDataStream::Qt_5_0); stream << (long)thread; stream << _driverName; stream << _dbName; stream << _dbHost; stream << _dbPort; stream << _userName; stream << _userPassword; QCryptographicHash hash(QCryptographicHash::Md5); hash.addData(data); return QString(hash.result().toHex());} void ConnectionPool::onThreadDestroyed(){ QThread* thread = static_cast<QThread*>(sender()); if(_threadConnectionNameMap.contains(thread)) { QString connectionName = _threadConnectionNameMap.take(thread); QSqlDatabase::removeDatabase(connectionName); }}
C++ (Qt)QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread){ QReadLocker rLocker(&_rwLock); if(_threadConnectionNameMap.contains(thread)) { QSqlDatabase db = QSqlDatabase::database(_threadConnectionNameMap.value(thread)); db.open(); return db; } rLocker.unlock(); QWriteLocker wLocker(&_rwLock); const QString connectionName = generateConnectionName(thread); QSqlDatabase db = QSqlDatabase::addDatabase(_driverName, connectionName); _threadConnectionNameMap.insert(thread, connectionName); connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed); db.setDatabaseName(_dbName); db.setUserName(_userName); db.setPassword(_userPassword); db.setHostName(_dbHost); db.setPort(_dbPort); if(!db.isOpen()) db.open(); return db;}
C++ (Qt)QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread){ QMutexLocker locker(&_mutex); if(_threadConnectionNameMap.contains(thread)) { QSqlDatabase db = QSqlDatabase::database(_threadConnectionNameMap.value(thread)); db.open(); return db; } const QString connectionName = generateConnectionName(thread); QSqlDatabase db = QSqlDatabase::addDatabase(_driverName, connectionName); _threadConnectionNameMap.insert(thread, connectionName); connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed); db.setDatabaseName(_dbName); db.setUserName(_userName); db.setPassword(_userPassword); db.setHostName(_dbHost); db.setPort(_dbPort); if(!db.isOpen()) db.open(); return db;}
class DBConnectionPool : public QObject{ Q_OBJECTpublic: explicit DBConnectionPool(QObject *parent = 0); DBConnectionPool(const PoolConfig& config, QObject* parent = 0); void start(); void stop(); QSqlQuery executeQuery(const QString &query);signals: void stateChanged( const QString& msg );private: PoolConfig config; QList<DatabaseWorker*> databaseConnections; QSqlQuery execute(const QString& statement); QFuture<QSqlQuery> future; int workerCounter = 0; QMutex mutex;};
DBConnectionPool::DBConnectionPool(const PoolConfig &config, QObject *parent) : QObject(parent){ this->config = config;}void DBConnectionPool::start(){ for(int i = 0; i < this->config.connectionsCount; i++) { DatabaseWorker* worker = new DatabaseWorker; worker->config.hostName = this->config.hostName; worker->config.databaseName = this->config.databaseName; worker->config.userName = this->config.userName; worker->config.password = this->config.password; worker->config.databaseReferenceName = "dbWorker@" + QString::number(i); worker->init(); worker->createConnection(); databaseConnections << worker; }}QSqlQuery DBConnectionPool::execute(const QString& statement){ QMutexLocker locker(&mutex); QSqlQuery query = this->databaseConnections[this->workerCounter]->executeSync(statement); if (this->workerCounter < this->config.connectionsCount - 1) { this->workerCounter++; } else { this->workerCounter = 0; } return query;}QSqlQuery DBConnectionPool::executeQuery(const QString& statement){ future = QtConcurrent::run(this, &DBConnectionPool::execute, statement); future.waitForFinished(); return(future.result());}
class DatabaseWorker : public QObject{ Q_OBJECTprivate: QSqlDatabase db; QSqlQuery executeRequest(const QString& request); QFuture<QSqlQuery> future;public: explicit DatabaseWorker(QObject *parent = 0); void init(); bool createConnection(); QSqlQuery executeSync(const QString& request); void executeAsync(); bool isBusy(); PoolConfig config; QStringList requestList;signals:public slots:};
void DatabaseWorker::init(){ db = QSqlDatabase::addDatabase("QPSQL", this->config.databaseReferenceName);}bool DatabaseWorker::createConnection(){ db.setHostName(this->config.hostName); db.setDatabaseName(this->config.databaseName); db.setUserName(this->config.userName); db.setPassword(this->config.password); db.open(); if (!db.isOpen()) { qDebug() << "Ошибка при подключении к СУБД: " << db.lastError() << endl; return false; } return true;}QSqlQuery DatabaseWorker::executeRequest(const QString& request){ QSqlQuery query(db); bool result = query.exec(request); if (!result) { qDebug() << "Ошибка при выполнении запроса: " <<db.lastError() << endl; } return query;}QSqlQuery DatabaseWorker::executeSync(const QString& request){ future = QtConcurrent::run(this, &DatabaseWorker::executeRequest, request); future.waitForFinished(); return(future.result());}