semaphore.acquire();allTasks.addTask(string);qDebug() << "nakidal" << semaphore.available() << QThread::currentThread();qDebug() << "tasklist size from main:" << allTasks.getSize() << QThread::currentThread();
do { string = Tasks->getTask(); sem->release(); qDebug() << "zabral" << QThread::currentThread(); qDebug() << "tasklist size from thread:" << Tasks->getSize(); ....while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));
C++ (Qt)do { ....while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));
#ifndef TASKING_H#define TASKING_H#include <QStringList>#include <QWaitCondition>#include <QMutex>#include <QString>class Tasking{public: Tasking(); QString getTask(); void addTask(QString string); void clear(); int getSize();private: QStringList stringList; QWaitCondition cond; QMutex mutex;};#endif // TASKING_H
#include "tasking.h"Tasking::Tasking(){}QString Tasking::getTask(){ QMutexLocker locker(&mutex); while(stringList.empty()) //cond.wait(&mutex, ULONG_MAX); cond.wait(&mutex); return stringList.takeFirst();}void Tasking::addTask(QString newString){ QMutexLocker locker(&mutex); stringList.append(newString); cond.wakeOne();}int Tasking::getSize(){ return stringList.size();}void Tasking::clear(){ QMutexLocker locker(&mutex); stringList.clear();}
#ifndef SEARCHTHREAD_H#define SEARCHTHREAD_H#include <QThread>#include <QStringList>#include <QSemaphore>class Tasking;class SearchThread : public QThread{ Q_OBJECTpublic: SearchThread(Tasking *allTasks, QStringList numberList, int direction, QSemaphore *semaphore, QThread *parent = 0);signals: void write(QString); void ended();public slots: void run(); void stop();private: volatile bool stoped; int direct; QString string; QStringList list; Tasking *Tasks; QSemaphore *sem;};#endif // SEARCHTHREAD_H
#include "searchthread.h"#include "tasking.h"#include <QDebug>#include <QTextCodec>SearchThread::SearchThread(Tasking *allTasks, QStringList smallList, int direction, QSemaphore *semaphore, QThread *parent) : QThread(parent){ Tasks = allTasks; list = smallList; direct = direction; sem = semaphore; stoped = false;}void SearchThread::stop(){ stoped = true;}void SearchThread::run(){ do { string = Tasks->getTask(); sem->release(); qDebug() << "zabral" << QThread::currentThread(); qDebug() << "tasklist size from thread:" << Tasks->getSize(); //qDebug() << string << QThread::currentThread() << stoped; QTextCodec::setCodecForCStrings(QTextCodec::codecForName("Windows-1251")); for (int i = 0; i < list.size(); ++i) { if (-1 != string.indexOf(list[i])) emit write(string); break; } } emit ended(); } while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));}
#ifndef MAINSEARCHTHREAD_H#define MAINSEARCHTHREAD_H#include <QThread>#include <QStringList>#include <QMutex>#include <QWaitCondition>class SearchThread;class MainSearchThread : public QThread{ Q_OBJECTpublic: MainSearchThread(QString bigFile, QString smallFile, QString outFile, int direction, QObject *parent = 0);signals: void warning(const QString); void progress(int); void ended(bool); void ended(QString); void stop();public slots: void write(QString); void counter(); void cancel();protected: void run();private: QList<SearchThread*> threadList; volatile bool canceled; QString bigFileName; QString smallFileName; QString outFileName; int direct; volatile qint64 count; volatile qint64 bigSize; QStringList list; qint64 lineCount(const QString FileName);};#endif // MAINSEARCHTHREAD_H
#include "mainsearchthread.h"#include "searchthread.h"#include "tasking.h"#include <QFile>#include <QTextStream>#include <QThread>#include <QSemaphore>#include <QDebug>MainSearchThread::MainSearchThread(QString bigFile, QString smallFile, QString outFile, int direction, QObject *parent) : QThread(parent), canceled(false), count(0), bigSize(0){ bigFileName = bigFile; smallFileName = smallFile; outFileName = outFile; direct = direction;}void MainSearchThread::cancel(){ canceled = true;}qint64 MainSearchThread::lineCount(const QString FileName){ QFile File(FileName); if (!File.open(QIODevice::ReadOnly)) { emit warning(tr("Cannot read file %1:\n%2.").arg(File.fileName()).arg(File.errorString())); emit ended(tr("Error.")); return -1; } char buff[1024*128]; qint64 lcount = 0; int len; char prev = 0; char cur = 0; for(;;) { if (canceled) { emit ended(tr("Canceled.")); return -1; } len = File.read(buff, sizeof(buff)); if (File.error()) return -1; if(!len) {break;} for (int i=0; i<len; ++i) { cur = buff[i]; if (cur == 10) {++lcount;} else if (prev == 13) {++lcount;} prev = cur; } } if (cur == 13) {++lcount;} File.close(); return lcount + 1;}void MainSearchThread::counter(){ count++; emit progress(count*100/bigSize);}void MainSearchThread::write(QString string){ qDebug() << string; list.append(string);}void MainSearchThread::run(){ //определяем размер файла billingSize = lineCount(bigFileName); if ( -1 == bigSize) return; //проверка на открытие файла QFile bigFile(bigFileName); if (!bigFile.open(QIODevice::ReadOnly)) { emit warning(tr("Cannot read file %1:\n%2.").arg(bigFile.fileName()).arg(bigFile.errorString())); emit ended(tr("Error.")); return; } //проверка открытия файла QFile smallFile(smallFileName); if (!smallFile.open(QIODevice::ReadOnly)) { emit warning(tr("Cannot read file %1:\n%2.").arg(smallFile.fileName()).arg(smallFile.errorString())); emit ended(tr("Error.")); bigFile.close(); return; } //берём строки из файла и кладём в список строк QTextStream numberStream(&smallFile); QStringList smallList; QString smallString; while (!smallStream.atEnd()) { //проверка на отмену if (canceled) { bigFile.close(); smallFile.close(); emit ended(true); emit ended(tr("Canceled.")); } smallString = smallStream.readLine(); //если строка пустая, то пропускаем её if ("" == smallString) continue; smallList.append(smallString); } Tasking allTasks; //создаём список заданий QSemaphore semaphore(2 * QThread::idealThreadCount()); //семафор для контроля раздачи заданий //QSemaphore semaphore(2); //семафор для контроля раздачи заданий //подготавливаем список потоков, раздаем списки и указатель на список заданий for (int i = 1; i <= QThread::idealThreadCount(); i++) { SearchThread *thread = new SearchThread(&allTasks, smallList, direct, &semaphore); connect(thread, SIGNAL(write(QString)), this, SLOT(write(QString))); connect(thread, SIGNAL(ended()), this, SLOT(counter())); connect(this, SIGNAL(stop()), thread, SLOT(stop())); threadList.append(thread); thread->start(); } //устанавливаем счётчик строк в 0 count = 0; //читаем построчно файл и раздаем строки списку потоков QTextStream bigStream(&bigFile); QString bigString; while (!bigStream.atEnd()) { //проверка на отмену if (canceled) { //очищаем список заданий allTasks.clear(); //сообщаем об остановке emit stop(); //ждем пока все потоки завершат выполнение for (int i = 1; i <= QThread::idealThreadCount(); i++) { threadList[i - 1]->wait(); } bigFile.close(); emit ended(true); emit ended(tr("Canceled.")); return; } //читаем строку из файла bigString = bigStream.readLine(); //если строка пустая, то пропускаем её if ("" == bigString) continue; semaphore.acquire(); allTasks.addTask(bigString); qDebug() << "nakidal" << semaphore.available() << QThread::currentThread(); qDebug() << "tasklist size from main:" << allTasks.getSize() << QThread::currentThread(); } //необходимо сообщить потокам что всё emit stop(); //ожидаем завершения всех потоков for (int i = 1; i <= QThread::idealThreadCount(); i++) { threadList[i - 1]->wait(); } qDebug() << list; //закрываем файл bigFile.close(); //открываем файл для записи найденых строк QFile outFile(outFileName); if (!outFile.open(QIODevice::WriteOnly)) { emit warning(tr("Cannot write file %1:\n%2.").arg(outFile.fileName()).arg(outFile.errorString())); return; } //записываем найденые строки QTextStream outStream(&outFile); for (int i = 0; i < list.size(); ++i) outStream << list[i] << "\r\n"; outFile.close(); //закрываем файл с найдеными строками emit ended(true); emit progress(100); emit ended(tr("Sucsessfuly complited."));}
QSemaphore semaphore(2 * QThread::idealThreadCount()); //симофор для контроля раздачи заданий