C++ (Qt)QueryWrapper DBConnectionPool::execute(const QString &statement, const QMap<QString, QVariant> &values){ QMutexLocker locker(&mutex); bool executed = false; QueryWrapper wrapper; while (!executed) { for (auto& connection : databaseConnections) { if (!connection->isBusy()) { wrapper = connection->executeSync(statement, values); executed = true; break; } } } return wrapper;} QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values){ wrappedFuture = QtConcurrent::run(this, &DBConnectionPool::execute, statement, values); wrappedFuture.waitForFinished(); return(wrappedFuture.result());}
C++ (Qt)QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values){ Task task(statement, values); taskList.append(task); task.waitForResult(); return task.result();}
C++ (Qt)QueryWrapper DBConnectionPool::executeQuery(const QString &statement, const QMap<QString, QVariant> &values){ Task task(statement, values); taskList.append(task); // block return task.result();}
C++ (Qt)class DatabaseWorker : public QObject{ Q_OBJECTpublic: explicit DatabaseWorker(QObject* parent = 0); DatabaseWorker(const QString& threadId, QObject* parent = 0); void init(const QString& threadId); private: QSqlDatabase m_database; signals: void results( const QList<QSqlRecord>& records ); public slots: void slotExecute( const QString& query );};
C++ (Qt)DatabaseWorker::DatabaseWorker(const QString& threadId, QObject* parent ) : QObject(parent){ init(threadId);} void DatabaseWorker::init(const QString& threadId){ QString dbThreadId; QString emptyStr = ""; if (threadId == emptyStr) { dbThreadId = QString::number((qintptr)QThread::currentThreadId()); } else { dbThreadId = threadId; } m_database = QSqlDatabase::addDatabase( "QPSQL", dbThreadId ); m_database.setDatabaseName( "test" ); m_database.setHostName( "localhost" ); m_database.setUserName( "postgres" ); m_database.setPassword( "postgres" ); if ( !m_database.open() ) { return; }} void DatabaseWorker::slotExecute( const QString& query ){ QList<QSqlRecord> recs; QSqlQuery sql( query, m_database ); while ( sql.next() ) { recs.push_back( sql.record() ); } emit results( recs );}
C++ (Qt)class QueryThread : public QThread{ Q_OBJECTpublic: explicit QueryThread(QThread* parent = 0); QueryThread(Tasking* taskManager, QThread* parent = 0); void execute( const QString& query ); ~QueryThread(); void prepare(); protected: void run(); private: Tasking* taskManager; DatabaseWorker* m_worker; signals: void progress( const QString& msg ); void ready(bool); void results( const QList<QSqlRecord>& records ); void executefwd( const QString& query ); public slots: void getResults( const QList<QSqlRecord>& records );};
C++ (Qt)QueryThread::QueryThread(Tasking* taskManager, QThread *parent) : QThread(parent){ this->taskManager = taskManager; } QueryThread::~QueryThread(){ delete m_worker;} void QueryThread::prepare(){ emit ready(false); m_worker = new DatabaseWorker; connect( this, &QueryThread::executefwd, m_worker, &DatabaseWorker::slotExecute ); qRegisterMetaType< QList<QSqlRecord> >( "QList<QSqlRecord>" ); connect( m_worker, &DatabaseWorker::results, this, &QueryThread::getResults ); emit ready(true);} void QueryThread::execute( const QString& query ){ emit executefwd( query );} void QueryThread::getResults( const QList<QSqlRecord>& records ){ emit results( records ); } void QueryThread::run(){ forever { execute(taskManager->getTask()); }}
C++ (Qt)class Tasking : public QObject{ Q_OBJECT public: explicit Tasking(QObject *parent = 0); QMutex mutex; QWaitCondition cond; QString getTask(); void addTask(const QString &task); QList<QString> taskList; signals: public slots:};
C++ (Qt)void Tasking::addTask(const QString &task){ QMutexLocker locker( &mutex ); taskList.append( task ); cond.wakeAll();} QString Tasking::getTask(){ QMutexLocker locker( &mutex ); while( taskList.empty() ) { cond.wait( &mutex ); } return taskList.takeFirst();}
C++ (Qt)class WorkerPool : public QObject{ Q_OBJECTpublic: explicit WorkerPool(QObject *parent = 0); QList<QSqlRecord> executeRequest( const QString& request ); private: QueryThread *querythread; bool isResultsReady = false; QList<QSqlRecord> recs; void saveResults(const QList<QSqlRecord> &recs); Tasking* taskManager; signals: void requestDone();
C++ (Qt)WorkerPool::WorkerPool(QObject *parent) : QObject(parent){ taskManager = new Tasking; querythread = new QueryThread(taskManager); connect(querythread, &QueryThread::results, this, &WorkerPool::saveResults ); querythread->prepare(); querythread->start();} QList<QSqlRecord> WorkerPool::executeRequest(const QString &request){ isResultsReady = false; taskManager->addTask(request); QEventLoop loop; connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit); loop.exec(); return recs;} void WorkerPool::saveResults(const QList<QSqlRecord> &recs){ this->recs = recs; isResultsReady = true; emit requestDone();}
C++ (Qt)public: ID executeQuery( const QString &query ); // Возвращает ID запроса signals: void queryFinished( ID id, const QSqlQuery &result ); // Испускается после выполнения запроса
C++ (Qt)WorkerPool::WorkerPool(const int &workersCount, QObject *parent) : QObject(parent){ taskManager = new Tasking; for (auto i=0; i<workersCount; i++) { QueryThread* thread = new QueryThread(taskManager, "worker@" + QString::number(i)); connect(thread, &QueryThread::results, this, &WorkerPool::saveResults); workersList << thread; } foreach (QueryThread* worker, workersList) { worker->prepare(); worker->start(); }} QList<QSqlRecord> WorkerPool::executeRequest(const QString &request){ isResultsReady = false; taskManager->addTask(request); QEventLoop loop; connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit); loop.exec(); return recs;} void WorkerPool::saveResults(const QList<QSqlRecord> &recs){ this->recs = recs; isResultsReady = true; emit requestDone();}
C++ (Qt)WorkerPool::WorkerPool(const int &workersCount, QObject *parent) : QObject(parent){ taskManager = new Tasking; for (auto i=0; i<workersCount; i++) { QueryThread* thread = new QueryThread(taskManager, "worker@" + QString::number(i)); connect(thread, &QueryThread::results, this, &WorkerPool::saveResults); workersList << thread; } foreach (QueryThread* worker, workersList) { worker->prepare(); worker->start(); }} QList<QSqlRecord> WorkerPool::executeRequest(const QString &request){ isResultsReady = false; taskManager->addTask(request); QMutexLocker locker(&mutex); lastRequest = request; QEventLoop loop; connect(this, &WorkerPool::requestDone, &loop, &QEventLoop::quit); loop.exec(); return recs;} void WorkerPool::saveResults(const QList<QSqlRecord> &recs, const QString &request){ if (request == lastRequest) { this->recs = recs; isResultsReady = true; emit requestDone(); }}
C++ (Qt)QList<QSqlRecord> WorkerPool::executeRequest(const QString &request){ isResultsReady = false; QMutexLocker locker(&mutex); taskManager->addTask(request); bool completed = false; while (!completed) { if (resultsMap.contains(request)) { completed = true; } } return resultsMap.take(request);} void WorkerPool::saveResults(const QList<QSqlRecord> &recs, const QString &request){ resultsMap[request] = recs; isResultsReady = true; emit requestDone();}