Russian Qt Forum
Ноябрь 22, 2024, 09:27 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: 1 [2] 3 4 ... 13   Вниз
  Печать  
Автор Тема: К вопросу об организации взаимодействия пула производителей и одного потребителя  (Прочитано 65853 раз)
RedDog
Частый гость
***
Offline Offline

Сообщений: 221


Просмотр профиля
« Ответ #15 : Сентябрь 07, 2019, 17:25 »

А каждый воркер в свой тред засунуть, а обработчик пусть от них сигналы в своем треде получает?
Так на кутишный эвент луп вся синхронизация ляжет.
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #16 : Сентябрь 07, 2019, 18:35 »

Цитировать
А каждый воркер в свой тред засунуть, а обработчик пусть от них сигналы в своем треде получает?
Да, сейчас каждый воркер в своём треде и на каждом шаге шлёт сигнал обработчику (main_loop).
Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.

Написал свой класс для этого concurrent_loop. Вот простой пример использования:
Код
C++ (Qt)
#include <iostream>
#include "concurrent_loop.h"
#include <list>
 
int main()
{
   static constexpr int num_threads = 4;
   std::mutex print_mutex;
 
   std::list<std::function<void()>> workers;
 
   for (unsigned int i = 0; i < num_threads; ++i)
       workers.push_back([&]()
       {
           std::this_thread::sleep_for(std::chrono::seconds(1));
           std::lock_guard<std::mutex> lk(print_mutex);
           std::cout << std::this_thread::get_id() << std::endl;
       });
 
   int ntests = 3;
 
   auto loop_function = [&]()
   {
       std::cout << --ntests << "---------------------" << std::endl;
       return ntests == 0;
   };
 
   concurrent_loop cloop(loop_function, workers);
 
   cloop.join();
 
   return 0;
}
 
А вот примерный вывод:
Код
Bash
140269582214912
140269557036800
140269573822208
140269565429504
2---------------------
140269557036800
140269565429504
140269582214912
140269573822208
1---------------------
140269573822208
140269565429504
140269557036800
140269582214912
0---------------------
 


Проект приаттачен.

Да, в контексте данной задачи, похоже OpenMP наиболее разумное решение  Улыбающийся
« Последнее редактирование: Сентябрь 07, 2019, 19:17 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #17 : Сентябрь 08, 2019, 01:13 »

Да, провёл ещё несколько тестов (конкретно 3: с condition_variable, "наивный подход " и с использованием single thread реализациями). Результат не то что бы удивил, но возникли некоторые тонкости.. Они вполне предсказуемы, но если кто из уважаемых оппонентов имеет желание обсудить данные результаты, я готов предоставить реальные "боевые" исходники с подробным описанием  Улыбающийся
К сожалению, на момент написания данного поста, из-за скудного опыта работы с OpenMP, не могу представить, для полной картины, соответствующих сравнительных данных по отношению к OpenMP Обеспокоенный Но всё же склоняюсь, что OpenMP здесь в большинстве случаев будет выйгрышна.. (но это не точно Улыбающийся)
« Последнее редактирование: Сентябрь 08, 2019, 01:29 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
RedDog
Частый гость
***
Offline Offline

Сообщений: 221


Просмотр профиля
« Ответ #18 : Сентябрь 08, 2019, 01:33 »

Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.
А что будет в случае если за один "цикл" один из воркеров более одного раза отработает?
К примеру один вычисляет сумму двух интов, а другой какой нибудь синус в степени <...>
Или главный "манагер" должен им по одной итерации давать отрабатывать?
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #19 : Сентябрь 08, 2019, 01:48 »

Цитировать
А что будет в случае если за один "цикл" один из воркеров более одного раза отработает?
Плохо будет  Улыбающийся Об этом и дискутируем)
Вот мы и хотим таких ситуаций избежать) Чтоб какой-либо воркер более одного раза НЕ проскочил за один степ (цикл)

Цитировать
Или главный "манагер" должен им по одной итерации давать отрабатывать?
Да, в контексте данной задачи, "главный" должен им давать лишь одну итерацию.
« Последнее редактирование: Сентябрь 08, 2019, 01:52 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #20 : Сентябрь 08, 2019, 08:53 »

Код
C++ (Qt)
#include <vector>
#include <functional>
#include <thread>
#include <mutex>
#include <iostream>
 
using namespace std;
 
int main()
{
 static constexpr size_t num_threads = 4;
 mutex print_mutex;
 
 vector< function< void() > > workers;
 workers.reserve( num_threads );
 
 for( size_t i = 0; i < num_threads; ++i)
     workers.push_back([&]()
     {
         this_thread::sleep_for( std::chrono::seconds( 1 ) );
         lock_guard<mutex> lk( print_mutex );
         cout << this_thread::get_id() << endl;
     });
 
 const size_t ntests = 3;
 for( size_t n = 0; n < ntests; ++n )
 {
   #pragma omp parallel for
   for( size_t i = 0; i < workers.size(); ++i )
     workers[ i ]();
 
   std::cout << n << "---------------------" << std::endl;
 }
 
 return 0;
}
 
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #21 : Сентябрь 08, 2019, 09:26 »

Да, сейчас каждый воркер в своём треде и на каждом шаге шлёт сигнал обработчику (main_loop).
Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.
Будить главный только раз - дело нехитрое
Код
C++ (Qt)
if (--workersInProcess == 0)
...
Проблема в другом. По-хорошему число воркеров должно быть == числу ядер (а не числу задач). Правильно перейти к самой обычной очереди (самый что ни на есть избитый пример). Главная нитка насовала N задач в очередь, воркеры их расхватали, когда атомарный счетчик обнулился - будится главная, опять сует N задач и.т.д.

 
Да, в контексте данной задачи, похоже OpenMP наиболее разумное решение
Здесь проходит любое стандартное решение. Посмотрите QThreadPool - там минут за 10-15 все можно сделать с нуля.

 
К сожалению, на момент написания данного поста, из-за скудного опыта работы с OpenMP, не могу представить, для полной картины, соответствующих сравнительных данных по отношению к OpenMP. Но всё же склоняюсь, что OpenMP здесь в большинстве случаев будет выйгрышна.. (но это не точно)
Результаты такого тестирования давно известны Улыбающийся Просто здесь все зависит от "кластера". Т.е. если единичная задача выполняется секунды (или хотя бы "хорошие доли секунды") - то все отлично и все подходы дадут хороший и примерно одинаковый КПД. Однако чем меньше кластер - тем больше весит захват мутексов и др накладные расходы. А если кластер слишком мал - то вообще, как говорится, "не масштабируется", т.е. скорость примерно та же что и на одном дизеле  Улыбающийся
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #22 : Сентябрь 08, 2019, 11:13 »

Цитировать
По-хорошему число воркеров должно быть == числу ядер (а не числу задач). Правильно перейти к самой обычной очереди (самый что ни на есть избитый пример). Главная нитка насовала N задач в очередь, воркеры их расхватали, когда атомарный счетчик обнулился - будится главная, опять сует N задач и.т.д.
Нет. Предположим я Бос и у меня есть команда воркеров. Мне нужно подготовить проект. Я рассылаю своим воркерам задание, все необходимые материалы и т.д.
Они их получают и начинают рукожопить над проектом. Я в это время отдыхаю. Я могу зафиксировать факт того, что они все получили свои задания, но мне нужен конечный продукт.
Они заканчивают предворительную версию проекта, присылают мне и идут пить пиво. Я получаю первую версию проекта и начинаю вносить свои правки, замечания, курю и громко ругаюсь матом. После этого я отсылаю им проект на доработку. Они его получают и начинают править. Процесс закончится, когда меня полностью устроит конечный результат. Тогда я им пишу: -Всем спасибо! Мы сделали это!

Именно такую логику реализует concurrent_loop.
Решение с классическим тредпулом здесь не совсем к месту. Поскольку мне важен не факт того, что очередь задачь пуста и можно заряжать её снова, а факт того, что очередной пакет задач был выполнен. Полностью!

Поэтому одним атомарным каунтером вы здесь не отделаетесь.(
Или покажите как?
« Последнее редактирование: Сентябрь 08, 2019, 11:20 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #23 : Сентябрь 08, 2019, 12:28 »

Нет. Предположим я Бос и у меня есть команда воркеров.
Так число воркеров в команде определяется "хватит ли денег им платить", а вовсе не "числом задач". Неизбежно какой-то воркер должен будет выполнять  2 или более задач (в рамках одного подхода/сессии). Или придется городить "мертвые души" (1 задача = 1 нитка, ничем хорошим это не кончается).

Решение с классическим тредпулом здесь не совсем к месту. Поскольку мне важен не факт того, что очередь задачь пуста и можно заряжать её снова, а факт того, что очередной пакет задач был выполнен. Полностью!
Да очень даже к месту, меняется только условие ложной побудки. Не "очередь пуста" - при этом какие-то задачи уже извлечены из очереди, но еще в процессе, а счетчик "сделано" обнулился. 

Поэтому одним атомарным каунтером вы здесь не отделаетесь.(
Или покажите как?
Пойдем по пути наименьшего сопротивления
Код
C++ (Qt)
struct Task : public QRunnable {
 Task( Data * data ) : mData(data) {}  
 virtual void run( void ) { DoCalc(*mData); }
 
 Data * mData;
};
 
// main thread
QThreadPool * pool = QThreadPool::globalInstance();
while (!solved) {
 
// заряжаем данные расчетов
 std::vector<Data> data;
 for (size_t i = 0; i < num_tasks; ++i)
   data.push_back(Data(i));
 
// запускаем
 for (size_t i = 0; i < num_tasks; ++i)
   pool->start(new Task(&data[i]));
 
// ждем когда сварится
  pool->waitForDone();
 
  solved = AnalyzeResults(data);
};
Что не нравится?  Улыбающийся Или непременно нужно на примитивах std? Это нетрудно, но нужно ли?
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #24 : Сентябрь 08, 2019, 12:52 »

Цитировать
Да очень даже к месту, меняется только условие ложной побудки. Не "очередь пуста" - при этом какие-то задачи уже извлечены из очереди, но еще в процессе, а счетчик "сделано" обнулился.
Да, всё, понял идею)

Цитировать
Что не нравится?  Улыбающийся
Согласен)
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #25 : Сентябрь 08, 2019, 13:08 »

А что копаетесь - хорошо и правильно. Без велика нет понимания, то так, "почитал", "посмотрел примеры" - ну вот уже и овладел очередной "технологией"  Улыбающийся
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #26 : Сентябрь 08, 2019, 19:05 »

Цитировать
pool->waitForDone();
Это, конечно, хорошо, когда есть такой метод.. Но на мой взгляд правильнее чтобы метод start возвращал Future..

Набросал свой велосипед thread_pool (у которого метод add_task возврощает std::future.. по сути те же самые флажки, но опционально. Идея навеяна от сюда https://github.com/mtrebi/thread-pool)
Код
C++ (Qt)
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
 
#include <list>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
#include <memory>
#include <functional>
 
class thread_pool
{
public:
   thread_pool(unsigned int nthreads = 1)
       : m_is_stop(false)
   {
       for (unsigned int i = 0; i < nthreads; ++i)
           m_threads.push_back(std::thread(&thread_pool::worker, this));
   }
 
   thread_pool(const thread_pool &) = delete;
   thread_pool(thread_pool &&) = delete;
 
   thread_pool & operator=(const thread_pool &) = delete;
   thread_pool & operator=(thread_pool &&) = delete;
 
   void join()
   {
       m_is_stop = true;
       m_loop_cv.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 
   template<class F>
   auto add_task(F && work_function)->std::future<decltype(work_function())>
   {
       auto task_ptr = std::make_shared<std::packaged_task<decltype(work_function())()>>(std::forward<F>(work_function));
 
       std::function<void()> wrapper_func = [task_ptr]() { (*task_ptr)(); };
 
       {
           std::lock_guard<std::mutex> lock(m_mutex);
           m_queue.push(wrapper_func);
       }
 
       m_loop_cv.notify_one();
 
       return task_ptr->get_future();
   }
 
private:
   std::atomic_bool m_is_stop;
   std::list<std::thread> m_threads;
   std::queue<std::function<void()>> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_loop_cv;
 
   void worker()
   {
       while (!m_is_stop)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
           m_loop_cv.wait(lock, [&]()
           {
               return !m_queue.empty() || m_is_stop;
           });
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
 
           lock.unlock();
 
           work_function();
       }
   }
 
};
 
#endif // THREAD_POOL_H
 

Вроде всё работает..)
Код
C++ (Qt)
#include <iostream>
 
#include "thread_pool.h"
 
int main()
{
   static constexpr size_t nthreads = 4;
   static constexpr size_t ntests = 3;
   static constexpr size_t ntasks = 12;
   std::mutex mutex;
 
   std::vector<std::future<void>> results(ntasks);
 
   thread_pool pool(nthreads);
 
   for (size_t i = 0; i < ntests; ++i)
   {
       for (size_t j = 0; j < ntasks; ++j)
           results[j] = pool.add_task([&]()
           {
               std::this_thread::sleep_for(std::chrono::seconds(1));
               std::lock_guard<std::mutex> lk(mutex);
               std::cout << std::this_thread::get_id() << std::endl;
           });
 
       for (size_t j = 0; j < results.size(); ++j)
           results[j].wait();
 
       std::cout << i << "---------------------" << std::endl;
   }
 
   pool.join();
 
   return 0;
}
 
Улыбающийся
 
« Последнее редактирование: Сентябрь 11, 2019, 12:53 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #27 : Сентябрь 08, 2019, 20:15 »

m_ax, небольшое замечание по условным переменным.
При wait условная переменная освобождает парный мьютекс, но при пробуждении она захватывает его обратно.
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #28 : Сентябрь 08, 2019, 20:26 »

m_ax, небольшое замечание по условным переменным.
При wait условная переменная освобождает парный мьютекс, но при пробуждении она захватывает его обратно.
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.

Да, при выходе из wait у меня m_cv_mutex освобождается:
Код
C++ (Qt)
...
    {
               std::unique_lock<std::mutex> lock(m_cv_mutex);
               m_loop_cv.wait(lock, [&]()
               {
                   std::lock_guard<std::mutex> lk(m_queue_mutex);
                   return !m_queue.empty() || m_is_stop;
               });
     }
...
 
И после wait m_queue_mutex уже разблочен..
Цитировать
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.
Не понял, почему он захвачен?
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #29 : Сентябрь 08, 2019, 20:30 »

Стоп, так у вас два мьютекса вместо одного? Улыбающийся
Зачем, достаточно одного мьютекса.
Самой условной переменной не нужен мьютекс, этот мьютекс должен блокировать наш ресурс, т.е. очередь.
« Последнее редактирование: Сентябрь 08, 2019, 20:32 от Old » Записан
Страниц: 1 [2] 3 4 ... 13   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.197 секунд. Запросов: 23.