Russian Qt Forum
Ноябрь 22, 2024, 20:40 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: 1 2 [3]   Вниз
  Печать  
Автор Тема: [РЕШЕНО] connection pool плюсы и минусы  (Прочитано 28071 раз)
ballard
Гость
« Ответ #30 : Декабрь 26, 2016, 12:09 »

Если вы блокируете текущую нить на время выполнения запроса, то не понятно для чего его выполнять в другой нитке. Ну и выполняйте этот запрос сразу в текущей?  Непонимающий

Изначально суть в том, как вы понимаете, чтобы открыть и держать открытыми несколько соединений с БД.
Если я блокирую какую-то одну нитку, то остальные остаются свободны для обращения из других потоков.
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #31 : Декабрь 26, 2016, 12:23 »

Изначально суть в том, как вы понимаете, чтобы открыть и держать открытыми несколько соединений с БД.
Если я блокирую какую-то одну нитку, то остальные остаются свободны для обращения из других потоков.
Так откройте сразу 10 соединений и используйте их. Зачем вам рабочие нитки?
Записан
ballard
Гость
« Ответ #32 : Декабрь 26, 2016, 12:30 »

Так откройте сразу 10 соединений и используйте их. Зачем вам рабочие нитки?

Всмысле открывать сразу в вызывающих потоках? или вы что имеете в виду?
Идея такая, что у меня может быть к примеру 20 вызывающих при 5 открытых нитках в пуле, но каждых из этих 20-ти будет класть по задания последовательно, после выполнения предыдущего
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #33 : Декабрь 26, 2016, 12:43 »

Всмысле открывать сразу в вызывающих потоках? или вы что имеете в виду?
Идея такая, что у меня может быть к примеру 20 вызывающих при 5 открытых нитках в пуле, но каждых из этих 20-ти будет класть по задания последовательно, после выполнения предыдущего
Пожалуйста, пулу все равно, дожидается ли клиент отработки предудущего запроса или нет.
Нужно ждать хорошо, не нужно - тоже хорошо.
Записан
ballard
Гость
« Ответ #34 : Декабрь 26, 2016, 13:47 »

Пожалуйста, пулу все равно, дожидается ли клиент отработки предудущего запроса или нет.
Нужно ждать хорошо, не нужно - тоже хорошо.

Ну да, но получается мне нужно как раз организовать промежуточный класс между вызывающими потоками и пулом, у которого и будет нужный мне метод, возвращающий QSqlRequest, Вы это имеете в виду?
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #35 : Декабрь 26, 2016, 13:53 »

Ну да, но получается мне нужно как раз организовать промежуточный класс между вызывающими потоками и пулом, у которого и будет нужный мне метод, возвращающий QSqlRequest, Вы это имеете в виду?
Да, он будет запускать нитки-воркеры и нагружать их работой (через очередь), в ответ сигналя о результатах. Как у вас и было реализовано.
Добавьте в него очередь и все.
Записан
ballard
Гость
« Ответ #36 : Декабрь 26, 2016, 13:55 »

Да, он будет запускать нитки-воркеры и нагружать их работой (через очередь), в ответ сигналя о результатах. Как у вас и было реализовано.
Добавьте в него очередь и все.

Ok, спасибо, буду делать, потом поделюсь результатом Улыбающийся
Записан
ballard
Гость
« Ответ #37 : Декабрь 27, 2016, 00:34 »


В общем что-то нет прогресса с очередью В замешательстве

Только сделал так выполнение запроса (QueryWrapper - это обернутый QSqlRequest):
Код
C++ (Qt)
QueryWrapper DBConnectionPool::execute(const QString &statement, const QMap<QString, QVariant> &values)
{
   QMutexLocker locker(&mutex);
   bool executed = false;
   QueryWrapper wrapper;
   while (!executed) {
       for (auto& connection : databaseConnections) {
           if (!connection->isBusy()) {
               wrapper = connection->executeSync(statement, values);
               executed = true;
               break;
           }
       }
   }
   return wrapper;
}
 
QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values)
{
   wrappedFuture = QtConcurrent::run(this, &DBConnectionPool::execute, statement, values);
   wrappedFuture.waitForFinished();
   return(wrappedFuture.result());
}

Очередь непонятно, как сюда прикрутить: т.к., например future создается через qtconcurrent::run, который сразу исполняет запрос и, соответственно, в QList для некого воркера future не добавить.

Т.е. для синхронного запроса с очередью мне нужно что-то типа:
Код
C++ (Qt)
QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values)
{
   Task task(statement, values);
   taskList.append(task);
   task.waitForResult();
   return task.result();
}

taskList это как раз qlist из которого некий consumer заберет запрос и выполнит

Пока что не понимаю, можно ли это сделать, и, если да, то как Непонимающий
Записан
ballard
Гость
« Ответ #38 : Декабрь 27, 2016, 14:08 »

Да, он будет запускать нитки-воркеры и нагружать их работой (через очередь), в ответ сигналя о результатах. Как у вас и было реализовано.
Добавьте в него очередь и все.

В общем-то пришла идея сделать эту функцию блокирующей, и тогда можно встроить работу с очередью. Либо по таймауту, либо цикл с проверкой выполнения запроса, либо через Qt::BlockingQueuedConnection, как Вы думаете, какой вариант лучше?

Код
C++ (Qt)
QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values)
{
   Task task(statement, values);
   taskList.append(task);
 
   // block
 
   return task.result();
}
Записан
ballard
Гость
« Ответ #39 : Декабрь 27, 2016, 23:43 »


В общем сделал пока вариант с одним воркером...из одного потока пока все работает.
Блокирующая функция выполняется. Можете посмотреть момент с ::run() у QueryThread и по Tasking, все ли верно?

worker.h
Код
C++ (Qt)
class DatabaseWorker : public QObject
{
   Q_OBJECT
public:
   explicit DatabaseWorker(QObject* parent = 0);
   DatabaseWorker(const QString& threadId, QObject* parent = 0);
   void init(const QString& threadId);
 
private:
   QSqlDatabase m_database;    
 
signals:
   void results( const QList<QSqlRecord>& records );
 
public slots:
   void slotExecute( const QString& query );
};

worker.cpp
Код
C++ (Qt)
DatabaseWorker::DatabaseWorker(const QString& threadId, QObject* parent ) : QObject(parent)
{
   init(threadId);
}
 
void DatabaseWorker::init(const QString& threadId)
{
   QString dbThreadId;
   QString emptyStr = "";
   if (threadId == emptyStr) {
       dbThreadId = QString::number((qintptr)QThread::currentThreadId());
   } else {
       dbThreadId = threadId;
   }
   m_database = QSqlDatabase::addDatabase( "QPSQL", dbThreadId );
   m_database.setDatabaseName( "test" );
   m_database.setHostName( "localhost" );
   m_database.setUserName( "postgres" );
   m_database.setPassword( "postgres" );
   if ( !m_database.open() )
   {
       return;
   }
}
 
void DatabaseWorker::slotExecute( const QString& query )
{
   QList<QSqlRecord> recs;
   QSqlQuery sql( query, m_database );
   while ( sql.next() )
   {
       recs.push_back( sql.record() );
   }
   emit results( recs );
}

querythread.h
Код
C++ (Qt)
class QueryThread : public QThread
{
   Q_OBJECT
public:
   explicit QueryThread(QThread* parent = 0);
   QueryThread(Tasking* taskManager, QThread* parent = 0);
   void execute( const QString& query );
   ~QueryThread();
   void prepare();
 
protected:
   void run();
 
private:
   Tasking* taskManager;
   DatabaseWorker* m_worker;    
 
signals:
   void progress( const QString& msg );
   void ready(bool);
   void results( const QList<QSqlRecord>& records );
   void executefwd( const QString& query );
 
public slots:
   void getResults( const QList<QSqlRecord>& records );
};

querythread.cpp
Код
C++ (Qt)
QueryThread::QueryThread(Tasking* taskManager, QThread *parent) : QThread(parent)
{
   this->taskManager = taskManager;
 
}
 
QueryThread::~QueryThread()
{
   delete m_worker;
}
 
void QueryThread::prepare()
{
   emit ready(false);
   m_worker = new DatabaseWorker;
   connect( this, &QueryThread::executefwd, m_worker, &DatabaseWorker::slotExecute );
   qRegisterMetaType< QList<QSqlRecord> >( "QList<QSqlRecord>" );
   connect( m_worker, &DatabaseWorker::results, this, &QueryThread::getResults );
   emit ready(true);
}
 
void QueryThread::execute( const QString& query )
{
   emit executefwd( query );
}
 
void QueryThread::getResults( const QList<QSqlRecord>& records )
{
   emit results( records );    
}
 
void QueryThread::run()
{
   forever {
       execute(taskManager->getTask());
   }
}

tasking.h
Код
C++ (Qt)
class Tasking : public QObject
{
   Q_OBJECT
 
public:
   explicit Tasking(QObject *parent = 0);
   QMutex mutex;
   QWaitCondition cond;
   QString getTask();
   void addTask(const QString &task);
   QList<QString> taskList;
 
signals:
 
public slots:
};

tasking.cpp
Код
C++ (Qt)
void Tasking::addTask(const QString &task)
{
   QMutexLocker locker( &mutex );
   taskList.append( task );
   cond.wakeAll();
}
 
QString Tasking::getTask()
{
   QMutexLocker locker( &mutex );
 
   while( taskList.empty() ) {
       cond.wait( &mutex );
   }
 
   return taskList.takeFirst();
}

pool.h
Код
C++ (Qt)
class WorkerPool : public QObject
{
   Q_OBJECT
public:
   explicit WorkerPool(QObject *parent = 0);
   QList<QSqlRecord> executeRequest( const QString& request );
 
private:
   QueryThread *querythread;
 
   bool isResultsReady = false;
   QList<QSqlRecord> recs;
   void saveResults(const QList<QSqlRecord> &recs);
 
   Tasking* taskManager;
 
 
signals:
   void requestDone();

pool.cpp
Код
C++ (Qt)
WorkerPool::WorkerPool(QObject *parent) : QObject(parent)
{
   taskManager = new Tasking;
   querythread = new QueryThread(taskManager);
   connect(querythread, &QueryThread::results, this, &WorkerPool::saveResults );
   querythread->prepare();
   querythread->start();
}
 
QList<QSqlRecord> WorkerPool::executeRequest(const QString &request)
{
   isResultsReady = false;
   taskManager->addTask(request);
 
   QEventLoop loop;
   connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit);
   loop.exec();
 
   return recs;
}
 
void WorkerPool::saveResults(const QList<QSqlRecord> &recs)
{
   this->recs = recs;
   isResultsReady = true;
   emit requestDone();
}
Записан
ballard
Гость
« Ответ #40 : Декабрь 27, 2016, 23:59 »

Теперь надо еще продумать, как идентифицировать сигналы от разных воркеров, при обращении из разных потоков
Записан
ballard
Гость
« Ответ #41 : Декабрь 28, 2016, 16:32 »

А результат можно возвращать с помощью сигнала, добавив каждому запросу некий ID.
Код
C++ (Qt)
public:
   ID    executeQuery( const QString &query );    // Возвращает ID запроса
 
signals:
   void    queryFinished( ID id, const QSqlQuery &result );    // Испускается после выполнения запроса
 

Обновил пул для работы с несколькими воркерами, можете посоветовать, как вернуть данные именно от того воркера, которому отправлен запрос?

Код
C++ (Qt)
WorkerPool::WorkerPool(const int &workersCount, QObject *parent) : QObject(parent)
{
   taskManager = new Tasking;
   for (auto i=0; i<workersCount; i++) {
       QueryThread* thread = new QueryThread(taskManager, "worker@" + QString::number(i));
       connect(thread, &QueryThread::results, this, &WorkerPool::saveResults);
       workersList << thread;
   }
 
   foreach (QueryThread* worker, workersList) {
       worker->prepare();
       worker->start();
   }
}
 
QList<QSqlRecord> WorkerPool::executeRequest(const QString &request)
{
   isResultsReady = false;
   taskManager->addTask(request);
 
   QEventLoop loop;
   connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit);
   loop.exec();
 
   return recs;
}
 
void WorkerPool::saveResults(const QList<QSqlRecord> &recs)
{
   this->recs = recs;
   isResultsReady = true;
   emit requestDone();
}
Записан
ballard
Гость
« Ответ #42 : Декабрь 28, 2016, 17:17 »

В общем добавил пока сортрировку следующим образом, записывая запрос , блокируя мьютексом и проверяя потом на совпадение, но с таким способом пул зависает через несколько запросов, которые производятся из разных потоков, похоже из-за мьютекса возникает deadlock, без блокирования мьютексом запросы идут нормально, но, естественно порядок не совпадает...что тут можно сделать? :

Код
C++ (Qt)
WorkerPool::WorkerPool(const int &workersCount, QObject *parent) : QObject(parent)
{
   taskManager = new Tasking;
   for (auto i=0; i<workersCount; i++) {
       QueryThread* thread = new QueryThread(taskManager, "worker@" + QString::number(i));
       connect(thread, &QueryThread::results, this, &WorkerPool::saveResults);
       workersList << thread;
   }
 
   foreach (QueryThread* worker, workersList) {
       worker->prepare();
       worker->start();
   }
}
 
QList<QSqlRecord> WorkerPool::executeRequest(const QString &request)
{
   isResultsReady = false;
   taskManager->addTask(request);
 
   QMutexLocker locker(&mutex);
   lastRequest = request;
 
   QEventLoop loop;
   connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit);
   loop.exec();
 
   return recs;
}
 
void WorkerPool::saveResults(const QList<QSqlRecord> &recs, const QString &request)
{
   if (request == lastRequest) {
       this->recs = recs;
       isResultsReady = true;
       emit requestDone();
   }
}
« Последнее редактирование: Декабрь 28, 2016, 17:58 от ballard » Записан
ballard
Гость
« Ответ #43 : Декабрь 28, 2016, 21:10 »

все, вроде бы победил: сделал мапу с результатами Улыбающийся
Код
C++ (Qt)
QList<QSqlRecord> WorkerPool::executeRequest(const QString &request)
{
   isResultsReady = false;
 
   QMutexLocker locker(&mutex);
   taskManager->addTask(request);
 
   bool completed = false;
 
   while (!completed) {
       if (resultsMap.contains(request)) {
           completed = true;
       }
   }
   return resultsMap.take(request);
}
 
void WorkerPool::saveResults(const QList<QSqlRecord> &recs, const QString &request)
{
   resultsMap[request] = recs;
   isResultsReady = true;
   emit requestDone();
}
Записан
Страниц: 1 2 [3]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.374 секунд. Запросов: 22.