Russian Qt Forum
Ноябрь 26, 2024, 01:59 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: 1 [2]   Вниз
  Печать  
Автор Тема: Многопоточный поиск подстроки  (Прочитано 18610 раз)
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #15 : Декабрь 16, 2010, 11:57 »

Да не большой файл в QStringList, а маленький и каждому потоку по экземпляру.
Не мудрите. Здесь лучше действовать по принципу "безобразно зато однообразно". Если файл мал - просто обрабатываем его меньшим числом ниток, совсем маленький - всего 1 ниткой. Это нормально, не все "масштабируется". Связываясь с QStringList Вы получаете массу ненужных проблем на свою голову. А если строки малы? (как в обычном текстовике). Тогда использование 2 и более ниток просто в минус. Значит надо как-то делить QStringList на пачки. и.т.д. и.т.п.

Вообще время обсуждения уже давно превысило время реализации  Улыбающийся  Напишите рабочую ф-цию для поиска в куске - она нужна по-любому. Затем попробуйте запустить N ниток с этой ф-цией. А дальше видно будет - может все уже хорошо. А может будут др. проблемы которые сейчас не видны.

К слову сказать, поиск без использования QString даст прирост более чем вдвое
Записан
Anarchist
Гость
« Ответ #16 : Декабрь 16, 2010, 13:03 »

Тут возможно, что читающий поток будет забивать очередь заданий очень быстро, а рабочие потоки не будут успевать их выбирать из очереди. Тогда придется ограничивать читающую очередь и принудительно ее усыплять, если в очереди накопится большое количество данных.

Может ссылка есть или так опишите?
Записан
BRE
Гость
« Ответ #17 : Декабрь 16, 2010, 13:36 »

Может ссылка есть или так опишите?
Семафор инициализируется значением максимальной длины очереди, например это может быть число рабочих поток умноженное на 2.
При помещении задания в очередь этот семафор захватывает ресурс, при извлечении задания из очереди - освобождает.
Читающий поток заполнит очередь (исчерпает количество ресурсов в семафоре) и заблокируется, до тех пор, пока очередной освободившийся поток не выдернет из очереди задание.
« Последнее редактирование: Декабрь 16, 2010, 14:21 от BRE » Записан
Anarchist
Гость
« Ответ #18 : Декабрь 16, 2010, 15:04 »

Вообще время обсуждения уже давно превысило время реализации  Улыбающийся

Реализация давно ведётся. Пока не реализован и не продуман даже механизм определения что поиск окончен и пора завершаться потокам и не реализован как раз метод заполнения очереди заданий порциями.

Завтра потестирую время выполнения.
Записан
Anarchist
Гость
« Ответ #19 : Декабрь 17, 2010, 22:38 »

В главном потоке пишу:

Код:
semaphore.acquire();
allTasks.addTask(string);
qDebug() << "nakidal" << semaphore.available() << QThread::currentThread();
qDebug() << "tasklist size from main:" << allTasks.getSize() << QThread::currentThread();

В поисковых потоках:
Код:
do {
    string = Tasks->getTask();
    sem->release();
    qDebug() << "zabral" << QThread::currentThread();
    qDebug() << "tasklist size from thread:" << Tasks->getSize();
    ....
while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));

Встаёт колом:

...
zabral SearchThread(0x9a4f858)
tasklist size from thread: 1
zabral SearchThread(0x9a4f858)
tasklist size from thread: 0
nakidal 1 MainSearchThread(0x9a55b40)
tasklist size from main: 1 MainSearchThread(0x9a55b40)
nakidal 0 MainSearchThread(0x9a55b40)
tasklist size from main: 2 MainSearchThread(0x9a55b40)

И встаёт, потоки не забирают больше. Не подскажете в чём может быть дело?
« Последнее редактирование: Декабрь 17, 2010, 22:44 от Anarchist » Записан
BRE
Гость
« Ответ #20 : Декабрь 17, 2010, 22:47 »

Побольше бы кода.

Это что за цикл такой?
Код
C++ (Qt)
do {
   ....
while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));

Под инициализацией семафора, я имел ввиду то число, которое передается в конструкторе QSemaphore.
« Последнее редактирование: Декабрь 18, 2010, 00:01 от BRE » Записан
Anarchist
Гость
« Ответ #21 : Декабрь 18, 2010, 07:12 »

Код:
#ifndef TASKING_H
#define TASKING_H

#include <QStringList>
#include <QWaitCondition>
#include <QMutex>
#include <QString>

class Tasking
{
public:
    Tasking();
    QString getTask();
    void addTask(QString string);
    void clear();
    int getSize();

private:
    QStringList stringList;
    QWaitCondition cond;
    QMutex mutex;
};

#endif // TASKING_H

Код:
#include "tasking.h"

Tasking::Tasking()
{
}

QString Tasking::getTask()
{
    QMutexLocker locker(&mutex);
    while(stringList.empty())
        //cond.wait(&mutex, ULONG_MAX);
        cond.wait(&mutex);

    return stringList.takeFirst();
}

void Tasking::addTask(QString newString)
{
    QMutexLocker locker(&mutex);
    stringList.append(newString);
    cond.wakeOne();
}

int Tasking::getSize()
{
    return stringList.size();
}

void Tasking::clear()
{
    QMutexLocker locker(&mutex);
    stringList.clear();
}

Код:
#ifndef SEARCHTHREAD_H
#define SEARCHTHREAD_H

#include <QThread>
#include <QStringList>
#include <QSemaphore>

class Tasking;

class SearchThread : public QThread
{
    Q_OBJECT
public:
    SearchThread(Tasking *allTasks, QStringList numberList, int direction, QSemaphore *semaphore, QThread *parent = 0);

signals:
    void write(QString);
    void ended();

public slots:
    void run();
    void stop();

private:
    volatile bool stoped;
    int direct;
    QString string;
    QStringList list;
    Tasking *Tasks;
    QSemaphore *sem;
};

#endif // SEARCHTHREAD_H

Код:
#include "searchthread.h"
#include "tasking.h"
#include <QDebug>
#include <QTextCodec>

SearchThread::SearchThread(Tasking *allTasks, QStringList smallList, int direction, QSemaphore *semaphore, QThread *parent) :
    QThread(parent)
{
    Tasks = allTasks;
    list = smallList;
    direct = direction;
    sem = semaphore;
    stoped = false;
}

void SearchThread::stop()
{
    stoped = true;
}

void SearchThread::run()
{
    do {
        string = Tasks->getTask();
        sem->release();
        qDebug() << "zabral" << QThread::currentThread();
        qDebug() << "tasklist size from thread:" << Tasks->getSize();

        //qDebug() << string << QThread::currentThread() << stoped;
        QTextCodec::setCodecForCStrings(QTextCodec::codecForName("Windows-1251"));

        for (int i = 0; i < list.size(); ++i) {
            if (-1 != string.indexOf(list[i]))
                emit write(string);
                break;
            }
        }
        emit ended();
    }
    while (!stoped && (2 * QThread::idealThreadCount() != sem->available()));
}

Код:
#ifndef MAINSEARCHTHREAD_H
#define MAINSEARCHTHREAD_H

#include <QThread>
#include <QStringList>
#include <QMutex>
#include <QWaitCondition>

class SearchThread;

class MainSearchThread : public QThread
{
    Q_OBJECT
public:
    MainSearchThread(QString bigFile,
                     QString smallFile,
                     QString outFile,
                     int direction,
                     QObject *parent = 0);

signals:
    void warning(const QString);
    void progress(int);
    void ended(bool);
    void ended(QString);
    void stop();

public slots:
    void write(QString);
    void counter();
    void cancel();

protected:
    void run();

private:
    QList<SearchThread*> threadList;

    volatile bool canceled;
    QString bigFileName;
    QString smallFileName;
    QString outFileName;
    int direct;
    volatile qint64 count;
    volatile qint64 bigSize;
    QStringList list;
    qint64 lineCount(const QString FileName);
};

#endif // MAINSEARCHTHREAD_H

Код:
#include "mainsearchthread.h"
#include "searchthread.h"
#include "tasking.h"
#include <QFile>
#include <QTextStream>
#include <QThread>
#include <QSemaphore>
#include <QDebug>

MainSearchThread::MainSearchThread(QString bigFile,
                                   QString smallFile,
                                   QString outFile,
                                   int direction,
                                   QObject *parent) :
    QThread(parent),
    canceled(false),
    count(0),
    bigSize(0)
{
    bigFileName = bigFile;
    smallFileName = smallFile;
    outFileName = outFile;
    direct = direction;
}

void MainSearchThread::cancel()
{
    canceled = true;
}

qint64 MainSearchThread::lineCount(const QString FileName)
{
  QFile File(FileName);
  if (!File.open(QIODevice::ReadOnly)) {
      emit warning(tr("Cannot read file %1:\n%2.").arg(File.fileName()).arg(File.errorString()));
      emit ended(tr("Error."));
      return -1;
  }

  char   buff[1024*128];
  qint64 lcount = 0;
  int    len;

  char prev = 0;
  char cur  = 0;
  for(;;) {
      if (canceled)
      {
          emit ended(tr("Canceled."));
          return -1;
      }

      len = File.read(buff, sizeof(buff));
      if (File.error()) return -1;
      if(!len) {break;}

      for (int i=0; i<len; ++i) {
          cur = buff[i];
          if      (cur  == 10) {++lcount;}
          else if (prev == 13) {++lcount;}
          prev = cur;
      }
  }
  if (cur == 13) {++lcount;}
  File.close();
  return lcount + 1;
}

void MainSearchThread::counter()
{
    count++;
    emit progress(count*100/bigSize);
}

void MainSearchThread::write(QString string)
{
    qDebug() << string;
    list.append(string);
}

void MainSearchThread::run()
{
    //определяем размер файла
    billingSize = lineCount(bigFileName);
    if ( -1 == bigSize)
        return;
    //проверка на открытие файла
    QFile bigFile(bigFileName);
    if (!bigFile.open(QIODevice::ReadOnly)) {
        emit warning(tr("Cannot read file %1:\n%2.").arg(bigFile.fileName()).arg(bigFile.errorString()));
        emit ended(tr("Error."));
        return;
    }
    //проверка открытия файла
    QFile smallFile(smallFileName);
    if (!smallFile.open(QIODevice::ReadOnly)) {
        emit warning(tr("Cannot read file %1:\n%2.").arg(smallFile.fileName()).arg(smallFile.errorString()));
        emit ended(tr("Error."));
        bigFile.close();
        return;
    }
    //берём строки из файла и кладём в список строк
    QTextStream numberStream(&smallFile);
    QStringList smallList;
    QString smallString;
    while (!smallStream.atEnd()) {
        //проверка на отмену
        if (canceled) {
            bigFile.close();
            smallFile.close();
            emit ended(true);
            emit ended(tr("Canceled."));
        }
        smallString = smallStream.readLine();
        //если строка пустая, то пропускаем её
        if ("" == smallString)
            continue;
        smallList.append(smallString);
    }

    Tasking allTasks; //создаём список заданий
    QSemaphore semaphore(2 * QThread::idealThreadCount()); //семафор для контроля раздачи заданий
    //QSemaphore semaphore(2); //семафор для контроля раздачи заданий
    //подготавливаем список потоков, раздаем списки и указатель на список заданий
    for (int i = 1; i <= QThread::idealThreadCount(); i++) {
        SearchThread *thread = new SearchThread(&allTasks, smallList, direct, &semaphore);
        connect(thread, SIGNAL(write(QString)), this, SLOT(write(QString)));
        connect(thread, SIGNAL(ended()), this, SLOT(counter()));
        connect(this, SIGNAL(stop()), thread, SLOT(stop()));
        threadList.append(thread);
        thread->start();
    }
    //устанавливаем счётчик строк в 0
    count = 0;
    //читаем построчно файл и раздаем строки списку потоков
    QTextStream bigStream(&bigFile);
    QString bigString;
    while (!bigStream.atEnd()) {
        //проверка на отмену
        if (canceled) {
            //очищаем список заданий
            allTasks.clear();
            //сообщаем об остановке
            emit stop();
            //ждем пока все потоки завершат выполнение
            for (int i = 1; i <= QThread::idealThreadCount(); i++) {
                threadList[i - 1]->wait();
            }
            bigFile.close();
            emit ended(true);
            emit ended(tr("Canceled."));
            return;
        }
        //читаем строку из файла
        bigString = bigStream.readLine();
        //если строка пустая, то пропускаем её
        if ("" == bigString)
            continue;

        semaphore.acquire();
        allTasks.addTask(bigString);
        qDebug() << "nakidal" << semaphore.available() << QThread::currentThread();
        qDebug() << "tasklist size from main:" << allTasks.getSize() << QThread::currentThread();
    }
    //необходимо сообщить потокам что всё
    emit stop();
    //ожидаем завершения всех потоков
    for (int i = 1; i <= QThread::idealThreadCount(); i++) {
        threadList[i - 1]->wait();
    }
    qDebug() << list;
    //закрываем файл
    bigFile.close();
    //открываем файл для записи найденых строк
    QFile outFile(outFileName);
    if (!outFile.open(QIODevice::WriteOnly)) {
        emit warning(tr("Cannot write file %1:\n%2.").arg(outFile.fileName()).arg(outFile.errorString()));
        return;
    }
    //записываем найденые строки
    QTextStream outStream(&outFile);
    for (int i = 0; i < list.size(); ++i)
        outStream << list[i] << "\r\n";

    outFile.close(); //закрываем файл с найдеными строками
    emit ended(true);
    emit progress(100);
    emit ended(tr("Sucsessfuly complited."));
}
« Последнее редактирование: Декабрь 18, 2010, 21:17 от Anarchist » Записан
BRE
Гость
« Ответ #22 : Декабрь 18, 2010, 10:46 »

Перенеси семафор в класс Tasking, соответственно захват/освобождение этого семафора лучше выполнять в addTask/getTask.
В конструкторе Tasking, при создании семафора, указывай ему в качестве параметра необходимую максимальную длину очереди.
Ну и в цикле SearchThread::run убери эту проверку с количеством доступных ресурсов в семафоре.
Разберись как работают семафоры.

Записан
Anarchist
Гость
« Ответ #23 : Декабрь 18, 2010, 10:54 »

То что перенести надо это понял, сделаю.

Код:
QSemaphore semaphore(2 * QThread::idealThreadCount()); //симофор для контроля раздачи заданий
Инициализирую как количество потоков умноженное на 2, что тут не так?

Цитировать
Ну и в цикле SearchThread::run убери эту проверку с количеством доступных ресурсов в семафоре.
Я посылаю сигнал остановки, но мне нужно что бы потоки доделали всё что в очереди лежит, вот и проверяю.
« Последнее редактирование: Декабрь 18, 2010, 10:57 от Anarchist » Записан
BRE
Гость
« Ответ #24 : Декабрь 18, 2010, 11:10 »

Сейчас посмотрю твой код повнимательней, лучше будет, если выложишь компилябельный код.
Я набросал небольшой пример, посмотри.
(Для сборки используется cmake, если его нет, то нужно будет сделать pro-файл)
« Последнее редактирование: Декабрь 18, 2010, 11:14 от BRE » Записан
Anarchist
Гость
« Ответ #25 : Декабрь 18, 2010, 19:47 »

Спасибо огромное Вам. Посмотрел Ваш код и доработал свой, почерпнул много интересных мыслей. Немного отличается реализация, но основные идеи одинаковые. Мой код конечно побольше, просто содержит кучу проверок разных и взаимодействие с интерфейсом. Я его повылизываю хорошенько и выложу потом здесь. Сейчас он уже рабочий. Всё работает очень замечательно. Думаю "заказчик" будет очень доволен производительностью Улыбающийся.
Записан
Waryable
Гость
« Ответ #26 : Декабрь 20, 2010, 11:16 »

Если еще предложения принимаются, то есть такое:
1. список файлов для поиска в QStringList.
2. Каждый поток извлекает(и удаляет) из QStringList имя файла, и обрабатывает весь файл.
3. Алгоритм поиска строки довольно прост: для начала читай файл в QByteArray целиком, потом сравнивай побайтно строку поиска с байтами файла QByteArray.

Не понимаю зачем делить содержимое файла на строки. Этож жеж сколько лишних трудов...
Записан
Anarchist
Гость
« Ответ #27 : Декабрь 20, 2010, 22:27 »

1. список файлов для поиска в QStringList.
Файлов всего 2, в одном строки которые искать в другом где искать.

Не понимаю зачем делить содержимое файла на строки. Этож жеж сколько лишних трудов...
Файлы и так содержат строки. И каких же это трудов, когда есть QTextStream?

Всё уже готово и проходит обкатку. Просто на работе всё, как-нибудь обязательно выложу.
« Последнее редактирование: Декабрь 20, 2010, 22:30 от Anarchist » Записан
Waryable
Гость
« Ответ #28 : Декабрь 21, 2010, 06:03 »

Цитировать
Файлы и так содержат строки. И каких же это трудов, когда есть QTextStream?
Файлы содержат только байты. Строки между собой разделяются служебными байтами(или их комбинациями). Поэтому явно или неявно при чтении файла в текстовом режиме приходится искать конец каждой строки. Исходя из этих(и не только из этих) соображений, при загрузке файла в текстовом режиме класс QTextStream осуществляет различные "служебные" действия на все случаи жизни. Но, если эффективность приложения устраивает, то конечно удобней пользоваться готовыми решениями. Ну а если всетаки встанет вопрос о повышении эффективности возвращайся в тему!  Подмигивающий
« Последнее редактирование: Декабрь 21, 2010, 06:06 от Waryable » Записан
Anarchist
Гость
« Ответ #29 : Январь 25, 2011, 09:41 »

Всё некогда было, вот выкладываю исходники. Может кто посмотрит и что умное подскажет. Приму все замечания и предложения к рассмотрению.
Записан
Страниц: 1 [2]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.111 секунд. Запросов: 23.