Russian Qt Forum
Ноябрь 23, 2024, 01:08 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1] 2 3   Вниз
  Печать  
Автор Тема: [РЕШЕНО] connection pool плюсы и минусы  (Прочитано 28094 раз)
unkeep
Гость
« : Март 29, 2016, 17:41 »

Какие минусы использования пула соединений при многопоточной работе с бд(соединение создаётся для каждого потока)?

В общем случае, на сколько я понимаю, жертвуя памятью получаем прирост скорости за счёт повторного использования открытого соединения. Но может быть есть ещё нюансы, например зависимость скорости соединения от их количества?
« Последнее редактирование: Март 30, 2016, 12:17 от unkeep » Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #1 : Март 29, 2016, 17:48 »

Какие минусы использования пула соединений при многопоточной работе с бд(соединение создаётся для каждого потока)?
Минусов не знаю, сам так делаю. Улыбающийся
Записан
unkeep
Гость
« Ответ #2 : Март 29, 2016, 17:54 »

Есть ли смысл закрывать соединение если им долгое время не пользуются?
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #3 : Март 29, 2016, 18:00 »

Есть ли смысл закрывать соединение если им долгое время не пользуются?
Нет. Зачем? А если сразу после закрытия понадобиться выполнить запрос? Улыбающийся
Как правило, рабочих потоков работающих с БД много не запускают, ну будет открыто 8 соединений...
Записан
unkeep
Гость
« Ответ #4 : Март 30, 2016, 10:11 »

можно код ревью?

ConnectionPool.h
Код
C++ (Qt)
#pragma once
#include <QSqlDatabase>
#include <QThread>
#include <QMap>
 
class ConnectionPool : public QObject
{
   Q_OBJECT
public:
   explicit ConnectionPool(const QString driverName,
                           const QString& dbName,
                           const QString& dbHost,
                           const quint16& dbPort,
                           const QString& userName,
                           const QString& userPassword,
                           QObject* parent = 0);
   ~ConnectionPool();
 
   QSqlDatabase getConnection();
   QSqlDatabase getConnectionForThread(QThread* thread);
 
private:
   QString generateConnectionName(QThread* thread) const;
   void onThreadDestroyed();
 
private:
   const QString _driverName;
   const QString _dbName;
   const QString _dbHost;
   const quint16 _dbPort;
   const QString _userName;
   const QString _userPassword;
   QMap<QThread*, QString> _threadConnectionNameMap;
   QMutex _mutex;
};
 

ConnectionPool.cpp
Код
C++ (Qt)
#include "ConnectionPool.h"
#include <QDataStream>
#include <QCryptographicHash>
 
 
ConnectionPool::ConnectionPool(const QString driverName,
                              const QString& dbName,
                              const QString& dbHost,
                              const quint16& dbPort,
                              const QString& userName,
                              const QString& userPassword,
                              QObject* parent)
   : QObject(parent),
     _driverName(driverName),
     _dbName(dbName),
     _dbHost(dbHost),
     _dbPort(dbPort),
     _userName(userName),
     _userPassword(userPassword)
{
}
 
ConnectionPool::~ConnectionPool()
{
   foreach (QThread* thread, _threadConnectionNameMap.keys())
   {
       QSqlDatabase::removeDatabase(_threadConnectionNameMap.value(thread));
   }
}
 
QSqlDatabase ConnectionPool::getConnection()
{
   return getConnectionForThread(QThread::currentThread());
}
 
QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread)
{
   QSqlDatabase db;
   if(_threadConnectionNameMap.contains(thread))
   {
       db = QSqlDatabase::database(_threadConnectionNameMap.value(thread));
   }
   else
   {
       QMutexLocker locker(&_mutex);
       const QString connectionName = generateConnectionName(thread);
       db = QSqlDatabase::addDatabase(_driverName, connectionName);
       _threadConnectionNameMap.insert(thread, connectionName);
       connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed);
 
       db.setDatabaseName(_dbName);
       db.setUserName(_userName);
       db.setPassword(_userPassword);
       db.setHostName(_dbHost);
       db.setPort(_dbPort);
   }
 
   if(!db.isOpen()) db.open();
   return db;
}
 
QString ConnectionPool::generateConnectionName(QThread* thread) const
{
   QByteArray data;
   QDataStream stream(&data, QIODevice::WriteOnly);
   stream.setVersion(QDataStream::Qt_5_0);
 
   stream << (long)thread;
   stream << _driverName;
   stream << _dbName;
   stream << _dbHost;
   stream << _dbPort;
   stream << _userName;
   stream << _userPassword;
 
   QCryptographicHash hash(QCryptographicHash::Md5);
   hash.addData(data);
 
   return QString(hash.result().toHex());
}
 
void ConnectionPool::onThreadDestroyed()
{
   QThread* thread = static_cast<QThread*>(sender());
   if(_threadConnectionNameMap.contains(thread))
   {
       QString connectionName = _threadConnectionNameMap.take(thread);
       QSqlDatabase::removeDatabase(connectionName);
   }
}
 
 
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #5 : Март 30, 2016, 10:28 »

Бегло просмотрел, что сразу бросилось в глаза.
В методе getConnectionForThread вы защищаете мьютексом только добавление нового соединения, хотя нужно и поиск с извлечением защищать. Для этого может пригодиться QReadWriteLock.

Теперь вообще по концепту. Я сразу создаю пул рабочих потоков и каждому потоку сразу создаю соединение. Потоки и соединения живут весь период жизни пула.
Записан
unkeep
Гость
« Ответ #6 : Март 30, 2016, 11:19 »

с учётом замечаний по синхронизации
Код
C++ (Qt)
QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread)
{
   QReadLocker rLocker(&_rwLock);
   if(_threadConnectionNameMap.contains(thread))
   {
       QSqlDatabase db = QSqlDatabase::database(_threadConnectionNameMap.value(thread));
       db.open();
       return db;
   }
   rLocker.unlock();
 
   QWriteLocker wLocker(&_rwLock);
   const QString connectionName = generateConnectionName(thread);
   QSqlDatabase db = QSqlDatabase::addDatabase(_driverName, connectionName);
   _threadConnectionNameMap.insert(thread, connectionName);
   connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed);
   db.setDatabaseName(_dbName);
   db.setUserName(_userName);
   db.setPassword(_userPassword);
   db.setHostName(_dbHost);
   db.setPort(_dbPort);
   if(!db.isOpen()) db.open();
   return db;
}
 

А концепции у нас разнятся, так как в текущей задаче используется пул потоков. Чаще всего вызывается getConnection() в методе, вызванном с помощью QtConcurrentRun.
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #7 : Март 30, 2016, 12:01 »

Да, QReadWriteLock, в отличие от бустовских аналогов, не позволяет изменять роль, т.е. стать из читателя писателем. Я его посоветовал, но сейчас забираю свой совет обратно, по крайней мере пока его не доработают. Улыбающийся
Лучше обойтись простым мьютексом.

Код
C++ (Qt)
QSqlDatabase ConnectionPool::getConnectionForThread(QThread* thread)
{
   QMutexLocker locker(&_mutex);
   if(_threadConnectionNameMap.contains(thread))
   {
       QSqlDatabase db = QSqlDatabase::database(_threadConnectionNameMap.value(thread));
       db.open();
       return db;
   }
 
   const QString connectionName = generateConnectionName(thread);
   QSqlDatabase db = QSqlDatabase::addDatabase(_driverName, connectionName);
   _threadConnectionNameMap.insert(thread, connectionName);
   connect(thread, &QThread::destroyed, this, &ConnectionPool::onThreadDestroyed);
   db.setDatabaseName(_dbName);
   db.setUserName(_userName);
   db.setPassword(_userPassword);
   db.setHostName(_dbHost);
   db.setPort(_dbPort);
   if(!db.isOpen()) db.open();
   return db;
}
 
Записан
ballard
Гость
« Ответ #8 : Декабрь 14, 2016, 18:36 »

Теперь вообще по концепту. Я сразу создаю пул рабочих потоков и каждому потоку сразу создаю соединение. Потоки и соединения живут весь период жизни пула.

можно посмотреть код такой реализации?
Записан
ballard
Гость
« Ответ #9 : Декабрь 20, 2016, 14:23 »

Бегло просмотрел, что сразу бросилось в глаза.
В методе getConnectionForThread вы защищаете мьютексом только добавление нового соединения, хотя нужно и поиск с извлечением защищать. Для этого может пригодиться QReadWriteLock.

Теперь вообще по концепту. Я сразу создаю пул рабочих потоков и каждому потоку сразу создаю соединение. Потоки и соединения живут весь период жизни пула.

на самом деле интересует организация много-поточного доступа к коннекшенам, я пробовал делать воркеры, наследуемые от QThread, но тогда непонятно как сделать метод запроса с возвращаемым результатом, так же пробовал выполнять запросы в QtConcurrent, но тут проблема в том, что соединение созданное при помощи QSqlDatabase::addDatabase должно вызываться из того же потока в котором создано, т.е. хотя бы архитектура "на пальцах" или в псевдокоде, если не хочется раскрывать свое исполнение
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #10 : Декабрь 20, 2016, 15:09 »

Для каждой нитки можно клонировать подключения.
Таких клонов может быть столько, сколько есть рабочих ниток. В каждой нитке свое подключение.
Записан
ballard
Гость
« Ответ #11 : Декабрь 20, 2016, 23:35 »

Для каждой нитки можно клонировать подключения.
Таких клонов может быть столько, сколько есть рабочих ниток. В каждой нитке свое подключение.

Большое спасибо за ответ!

А можно показать в виде кода, о чем речь (хотя бы частично)?

У меня пока такая конфигурация (если можно, код ревью, критику и т.д.  Улыбающийся ):

Пул создает набор воркеров (вместо clone делает addDatabase с именем, хотя в документации сказано, что в этом случае нужно обращаться к БД из того потока к котором вызывался addDatabase, но пока сбоев не было, может здесь что-то изменилось) и просто по очереди передает им запросы (в дальнейшем есть мысли прикрутить сюда что-то типа consumer-producer), в принципе работает при обращении к пулу из разных потоков и без QMutex, но на всякий случай его пока оставил.

Хочется создать набор соединений и выполнять к нему запросы вида QSqlQuery executeQuery(const QString &query), причем накидать сразу много запросов, чтобы они по очереди выполнились... и, вроде бы, все работает, но возможно есть какие-то проверенные временом типовые решения, или в дальшейшем я встречу какие-то проблемы.

Просто очень заинтересовала упомянутая реализация концепта, именно к такой и стремлюсь.

p.s. вроде нашел проблему, с вызовами из разных потоков непонятная ситуация

Пул:
pool.h
Код:
class DBConnectionPool : public QObject
{
    Q_OBJECT
public:
    explicit DBConnectionPool(QObject *parent = 0);
    DBConnectionPool(const PoolConfig& config, QObject* parent = 0);
    void start();
    void stop();
    QSqlQuery executeQuery(const QString &query);

signals:
    void stateChanged( const QString& msg );

private:
    PoolConfig config;
    QList<DatabaseWorker*> databaseConnections;
    QSqlQuery execute(const QString& statement);
    QFuture<QSqlQuery> future;
    int workerCounter = 0;
    QMutex mutex;
};

pool.cpp
Код:
DBConnectionPool::DBConnectionPool(const PoolConfig &config, QObject *parent) : QObject(parent)
{
    this->config = config;
}

void DBConnectionPool::start()
{
    for(int i = 0; i < this->config.connectionsCount; i++) {
        DatabaseWorker* worker = new DatabaseWorker;
        worker->config.hostName = this->config.hostName;
        worker->config.databaseName = this->config.databaseName;
        worker->config.userName = this->config.userName;
        worker->config.password = this->config.password;
        worker->config.databaseReferenceName = "dbWorker@" + QString::number(i);
        worker->init();
        worker->createConnection();
        databaseConnections << worker;
    }
}

QSqlQuery DBConnectionPool::execute(const QString& statement)
{
    QMutexLocker locker(&mutex);
    QSqlQuery query = this->databaseConnections[this->workerCounter]->executeSync(statement);
    if (this->workerCounter < this->config.connectionsCount - 1) {
        this->workerCounter++;
    } else {
        this->workerCounter = 0;
    }
    return query;
}

QSqlQuery DBConnectionPool::executeQuery(const QString& statement)
{
    future = QtConcurrent::run(this, &DBConnectionPool::execute, statement);
    future.waitForFinished();
    return(future.result());
}

Воркер:
worker.h
Код:
class DatabaseWorker : public QObject
{
    Q_OBJECT

private:
    QSqlDatabase db;
    QSqlQuery executeRequest(const QString& request);
    QFuture<QSqlQuery> future;

public:
    explicit DatabaseWorker(QObject *parent = 0);
    void init();
    bool createConnection();
    QSqlQuery executeSync(const QString& request);
    void executeAsync();
    bool isBusy();
    PoolConfig config;
    QStringList requestList;
signals:

public slots:
};

worker.cpp
Код:
void DatabaseWorker::init()
{
    db = QSqlDatabase::addDatabase("QPSQL", this->config.databaseReferenceName);
}

bool DatabaseWorker::createConnection()
{
    db.setHostName(this->config.hostName);
    db.setDatabaseName(this->config.databaseName);
    db.setUserName(this->config.userName);
    db.setPassword(this->config.password);
    db.open();
    if (!db.isOpen()) {
        qDebug() << "Ошибка при подключении к СУБД: " << db.lastError() << endl;
        return false;
    }
    return true;
}

QSqlQuery DatabaseWorker::executeRequest(const QString& request)
{
    QSqlQuery query(db);
    bool result = query.exec(request);
    if (!result) {
        qDebug() << "Ошибка при выполнении запроса: " <<db.lastError() << endl;
    }
    return query;
}

QSqlQuery DatabaseWorker::executeSync(const QString& request)
{
    future = QtConcurrent::run(this, &DatabaseWorker::executeRequest, request);
    future.waitForFinished();
    return(future.result());
}
« Последнее редактирование: Декабрь 20, 2016, 23:48 от ballard » Записан
ballard
Гость
« Ответ #12 : Декабрь 23, 2016, 11:56 »


есть ли какие-нибудь замечания/советы по реализации? (очень хотелось бы услышать) Улыбающийся
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #13 : Декабрь 23, 2016, 21:25 »

есть ли какие-нибудь замечания/советы по реализации? (очень хотелось бы услышать) Улыбающийся
А у вас этот код нормально работает?
Как-то для меня не очень понятен запуск функции в отдельном потоке и блокировка вызывающего потока? Если вы его все равно блокируете, так в нем лучше и выполняйте функцию?

Но вы все усложняете, достаточно запустить воркеры и организовать для них одну очередь запросов, по мере освобождения воркеры будут брать запросы из очереди или заснут, пока не появятся новые запросы.
Организация таких очередей 100500 раз обсуждалась на форуме, поищите, там и примеры были.
Записан
ballard
Гость
« Ответ #14 : Декабрь 23, 2016, 21:32 »

есть ли какие-нибудь замечания/советы по реализации? (очень хотелось бы услышать) Улыбающийся
А у вас этот код нормально работает?
Как-то для меня не очень понятен запуск функции в отдельном потоке и блокировка вызывающего потока? Если вы его все равно блокируете, так в нем лучше и выполняйте функцию?

Но вы все усложняете, достаточно запустить воркеры и организовать для них одну очередь запросов, по мере освобождения воркеры будут брать запросы из очереди или заснут, пока не появятся новые запросы.
Организация таких очередей 100500 раз обсуждалась на форуме, поищите, там и примеры были.


Спасибо Вам! Посмотрю конечно в поиске про очереди, т.к. именно так как выделено я и хочу сделать, но просто гуглопоиск на эту тему ни к чему не привел...поэтому и обратился Улыбающийся если вдруг вспомните ссылки, подскажите про них) Я смотрел примеры qt waiting condition и semaphore, думал копать в эту сторону) ну и как я уже писал, за любые примеры, даже псевдокодом буду очень признателен Улыбающийся
Записан
Страниц: [1] 2 3   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.189 секунд. Запросов: 23.