Russian Qt Forum
Ноябрь 22, 2024, 10:50 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: 1 ... 9 10 [11] 12 13   Вниз
  Печать  
Автор Тема: К вопросу об организации взаимодействия пула производителей и одного потребителя  (Прочитано 65893 раз)
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #150 : Сентябрь 24, 2019, 13:58 »

Цитировать
У вас join нужно переименовать в stop, потому что он не ждет завершения работы задач, а завершает работу пула вообще.
Да? А я думал наоборот, что вызов join блокирует поток, в котором он был вызван до тех пор, пока не будут выполнены все задачи.. Во всяком случае такой логики придерживается thread_pool из asio:
Цитировать
This function blocks until the threads in the pool have completed. If stop() is not called prior to join(), the join() call will wait until the pool has no more outstanding work.


Цитировать
Если нам нужно завершить программу, уже все равно остались задачи в очереди или нет.
Это да, но есть ситуации, когда нам и не нужно, в принципе, футуру: просто закинули задачу в пул и забыли Улыбающийся

Цитировать
Кстати, чтобы не плодить проверок, я делаю проверку в нитках чуть по другому. Когда доберусь до компьютера покажу как.

Ok) спасибо)
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #151 : Сентябрь 24, 2019, 14:08 »

По хорошему join дожидается, когда реально завершатся рабочие нитки пула.
У меня в конструкторе пула нитки запускаются, в деструкторе завершаются и joinом контролируется завершение. А все время жизни пула нитки активны или ждут работу.
А вот завершение конкретной задачи можно ждать по разному: футурами, callbackами или просто на них забить. Улыбающийся
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #152 : Сентябрь 24, 2019, 14:12 »

Если сразу после заполнения очереди вызвать join(), m_is_stop == true и в очереди могут остаться невыполненные задачи.

Выходить из цикла мы должны только при совместном выполнении условий: когда и очередь пуста и флаг m_is_stop == true.
Это скорее проблема невнятных имен. Stop - это что, "приостановить" (тогда лучше "pause") или "принудительно завершить" (тогда лучше "abort"). Но у Вас это ни то ни другое, это wait_for_done. Ну а что делать если считает и считает? "Из ризетки" выдергивать, что ли?  Улыбающийся Abort должен быть, join ниток может лучше в деструктор, паузу можно добавить хотя особой нужды в ней нет.

Стыдно на такие грабли было наступить)
Профессионалам это чувство чуждо  Улыбающийся

В аттаче пример из справочника, он зациклен и виснет тоже на joint. Я добавил небольшую задержку, хотя виснет и без нее если погонять побольше. Возможно это натолкнет на некоторые размышления  Улыбающийся
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #153 : Сентябрь 24, 2019, 17:04 »

Как я делаю ожидание в рабочих нитках:
Код
C++ (Qt)
 void thread_run()
 {
   for(;;)
   {
     std::unique_lock<std::mutex> locker( m_mutex );
     for(;;)
     {
       if( m_stop )
         return;
       else if( !m_tasks.empty() )
         break;
       m_cond.wait( locker );
     }
 
     Task task = m_tasks.front();
     m_tasks.pop();
 
     locker.unlock();
 
     if( task )
       task();
   }
 }
 

Завершение ниток:
Код
C++ (Qt)
 ~ThreadPool()
 {
   {
     std::unique_lock<std::mutex> locker( m_mutex );
     m_stop = true;
   }
   m_cond.notify_all();
 
   for( auto &it : m_threads )
     if( it.joinable() )
       it.join();
 }
 

Устанавливать m_stop в true нужно обязательно под мьютексом, а вот сигналить лучше уже без него.
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #154 : Сентябрь 24, 2019, 19:58 »

Цитировать
В аттаче пример из справочника, он зациклен и виснет тоже на joint. Я добавил небольшую задержку, хотя виснет и без нее если погонять побольше. Возможно это натолкнет на некоторые размышления   Улыбающийся
Вы этот пример привели как иллюстрацию, как делать не надо? Улыбающийся
Eсли закоментить все sleep_for'ы consumer может никогда и не добраться до очереди produced_nums  Грустный

Цитировать
А вот завершение конкретной задачи можно ждать по разному: футурами, callbackами или просто на них забить.  Улыбающийся

Ну т.е. если мыслить в таком ключе, то join() вообще нафиг не нужен?) В смысле делать его методом класса  особого резона и нет?)

Цитировать
Как я делаю ожидание в рабочих нитках:
Ага, понял)
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #155 : Сентябрь 24, 2019, 20:20 »

Блин, случайно отредактировал этот пост, вместо цитирования. Придется набирать заново. Улыбающийся

Устанавливать m_stop в true нужно обязательно под мьютексом, а вот сигналить лучше уже без него.
Хочу более подробно расписать почему так думаю.

Устанавливать m_stop необходимо обязательно под мьютексом.
Потому что может возникнуть ситуация, когда рабочая нить уже прошла проверку флага m_stop, но еще не уснула на wait, тогда установка m_stop и сигнал из другого потока никакого эффекта не вызовут - нитка уснет на wait.
А если устанавливать m_stop мод захваченным мьютексов, то нить завершающая пул остановиться на захвате мьютекса, пока рабочая нить его не освободит, уснув на wait.

А сигналить лучше без захваченного мьютекса, потому что на многоядерных системах планировщик сразу пытается запустить ожидающую (на wait) нить на свободном ядре при сигнале условной переменной.
Если сигналить при захваченном мьютексе, то разбуженная нить попытается захватить мьютекс, но он захвачен сигнальной ниткой. Рабочая нить уснет в ожидание мьютекса и проснется опять после его освобождения.
Т.е. будет дополнительная усыпление рабочей нитки.
« Последнее редактирование: Сентябрь 25, 2019, 06:40 от Old » Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #156 : Сентябрь 24, 2019, 20:35 »

Всё ясно, спасибо)
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #157 : Сентябрь 24, 2019, 22:41 »

Old, я правильно понимаю, что если я всё же хочу заиметь метод wait_for_all (который блокирует поток, где он был вызван и ждёт пока все рабочие нитки опустошат очередь) то нужна ещё одна condition_variable? Вот что у меня сейчас получилось (вроде работает Улыбающийся):
Код
C++ (Qt)
class thread_pool
{
public:
   explicit thread_pool(size_t num_threads = std::max(size_t(1), size_t(std::thread::hardware_concurrency())))
       : m_stop(false)
   {
       for (size_t i = 0; i < num_threads; ++i)
           m_threads.emplace_back(&thread_pool::worker, this);
   }
 
   thread_pool(const thread_pool &) = delete;
   thread_pool(thread_pool &&) = delete;
 
   thread_pool & operator=(const thread_pool &) = delete;
   thread_pool & operator=(thread_pool &&) = delete;
 
   ~thread_pool()
   {
       {
           std::lock_guard<std::mutex> locker(m_mutex);
           m_stop = true;
       }
 
       m_cond.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 
   void wait_for_all()
   {
       std::unique_lock<std::mutex> locker(m_mutex);
       m_cond_wait.wait(locker, [&]() { return m_queue.empty(); });
   }
 
   template<class F>
   auto add_task(F && f)->std::future<decltype(f())>
   {
       using R = decltype(f());
 
       auto task_ptr = std::make_shared<std::packaged_task<R()>>(std::forward<F>(f));
 
       {
           std::lock_guard<std::mutex> locker(m_mutex);
           m_queue.emplace( [=]() { (*task_ptr)(); } );
       }
 
       m_cond.notify_one();
 
       return task_ptr->get_future();
   }
 
private:
   std::atomic_bool m_stop;
   std::list<std::thread> m_threads;
   std::queue<std::function<void()>> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_cond;
   std::condition_variable m_cond_wait;
 
   void worker()
   {
       for(;;)
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           m_cond.wait(locker, [&]()
           {
               return !m_queue.empty() || m_stop;
           });
 
           if (m_stop)
               return;
 
           std::function<void()> task(m_queue.front());
           m_queue.pop();
 
           locker.unlock();
 
           if (task)
               task();
 
           m_cond_wait.notify_one(); // <== Эта условная переменная для wait_for_all
       }
   }
};
 
Или нафиг его выпилить вообще?)

update: Нет, этот метод ждёт только пока очередь не пуста, но не момент когда все нитки отработали..(

update2: Вобщем я его всё же выпилил.. Не нужно усложнять то, чего того не требует..
« Последнее редактирование: Сентябрь 24, 2019, 23:43 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #158 : Сентябрь 25, 2019, 06:02 »

m_ax, по моему мнению, такой метод у пула может понадобится в простой консольной программе (запустил - подождал - вышел), но если понадобится чуть расширить функционал, например, добавить сетевое взаимодействие, то тут же появляться более полезное применение запускающей нитке. Улыбающийся
Поэтому, заморочиться с таким методом можно, но реальных юзкейсов у него будет не много.
« Последнее редактирование: Сентябрь 25, 2019, 06:03 от Old » Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #159 : Сентябрь 25, 2019, 06:23 »

А вот сигналить лучше с уже отпущенным мьютексом.
Вспомнил еще один аргумент в пользу этого тезиса. Улыбающийся
Утилизация горячего воркера. Нам очень выгодно, если нитка выполнив одну работу, делает цикл и забирает следующую без пауз на мьютексе/условной переменной.
Если мы сигналим под мьютексом, то нитка которая только что освободилась не сможет захватить мьютекс и достать новую задачу.
Произойдет следующее: горячая нитка уткнется в мьютекс, шедулер разбудит холодную нитку (ждущую на wait), начнется гонка за мьютекс и может получиться так, что горячая нитка не спеет забрать новую работу и будет успылена на wait, а разбуженная пойдет работать.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #160 : Сентябрь 27, 2019, 15:10 »

Ну так что, все сидим на мешках с футурами? А ведь можно обойтись одной (аттач). Правда мелкоту масштабит слабенько, на с честного мутекса много не взять.Также пытался починить QThreadPool, задумка была не вызывать его waitForDone, а задействовать как дустовский пул. Без особого успеха Плачущий На одних тестах лучше (хотя все равно плохо), на др даже хуже.

И все-таки наверно дуст прав что не предоставляет метода waitForDone - нездоровая нагрузка если планируется универсальный пул. Если такой ф-ционал нужен - проще обеспечить его "через задачу".

И да, set_value действительно может сработать дважды! Ну кому интересно разберется, не буду кайф ломать Улыбающийся
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #161 : Сентябрь 27, 2019, 16:09 »

на с честного мутекса много не взять.
Так вы все обещали показать чудо-решение на атомиках, но не получается, хотя все уже готово. Улыбающийся
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #162 : Сентябрь 28, 2019, 00:57 »

Цитировать
И все-таки наверно дуст прав что не предоставляет метода waitForDone - нездоровая нагрузка если планируется универсальный пул. Если такой ф-ционал нужен - проще обеспечить его "через задачу".
Да, согласен) Но это можно реализовать гораздо более элегантней)
Код
C++ (Qt)
class wrapper_pool
{
public:
   wrapper_pool(specmath::thread_pool & pool)
       : m_pool(pool), m_task_count(0)
   {}
 
   template <class F>
   auto add_task(F && task)->std::future<decltype(task())>
   {
       std::lock_guard<std::mutex> locker(m_mutex);
       ++m_task_count;
       return m_pool.add_task(std::bind(&wrapper_pool::wrapper_task, this, std::forward<F>(task)));
   }
 
   void wait_for_all()
   {
       std::unique_lock<std::mutex> locker(m_mutex);
       m_cond.wait(locker, [&]()
       {
           return !m_task_count;
       });
   }
 
private:
   specmath::thread_pool & m_pool;
   std::atomic_int m_task_count;
   std::mutex m_mutex;
   std::condition_variable m_cond;
 
   void wrapper_task(const std::function<void()> & task)
   {
       if (task)
           task();
 
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0)
       {
           locker.unlock();
           m_cond.notify_one();
       }
   }
};
 

По поводу тестов. Во-первых, они не совсем корректны - у Вас однопоточный режим (Testing single thread) обгоняет QThreadPool (Testing QThreadPool), что уже всё это сравнение делает весьма спорным.
Во-вторых, самым показательным было бы сравнить Вашу реализацию CSimplePool с методом WaitForDone с реализацией без оного, но с реализацией его   "через задачу", например, вариант выше.
А так идея понятна)
 
« Последнее редактирование: Сентябрь 28, 2019, 06:51 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #163 : Сентябрь 28, 2019, 08:56 »

Но это можно реализовать гораздо более элегантней)
Моя задумка была обойтись без счетчика задач, т.е. сохранить весь ф-ционал пула но добавить WaitForDone. Со счетчиком жить легче, Ваша реализация подлиннее зато "не интрузивна", в чем есть смысл
Код
C++ (Qt)
   void wrapper_task(const std::function<void()> & task)
   {
       if (task)
           task();
 
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0)
       {
           locker.unlock();
           m_cond.notify_one();
       }
   }
};
 
Брать еще мутекс для каждой выполняемой задачи - ну так мы признаем что делаем лишь для "хороших" задач. Брать мутекс надо после атомарного декремента, это случится редко. А поскольку мутексу некого засисять то он здесь вообще не нужен. Также и в методе add_task - еще мутекс, ведь у m_pool есть свой. Оно конечно неопасно поскольку обычно главная набивает пуд, но все-таки нехорошо.

По поводу тестов. Во-первых, они не совсем корректны - у Вас однопоточный режим (Testing single thread) обгоняет QThreadPool (Testing QThreadPool), что уже всё это сравнение делает весьма спорным.
Но это совсем не вина тестов Улыбающийся Там есть TASK_WAIT, попробуйте его увеличить, все станет хорошо с QThreadPool. Но на мелких задачках ужасный провал (ото видать насовали мутексов). А юзать QThreadPool "с оглядкой" страшно неудобно. Более того, выясняется что даже простейшая самопальная реализация пула ведет себя гораздо лучше. Вот вам и "готовые проверенные"  Плачущий

Кстати а Вашу "футуристическую" реализацию не хотите прогнать на этих задачках?  Улыбающийся
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4350



Просмотр профиля
« Ответ #164 : Сентябрь 28, 2019, 09:29 »

Брать еще мутекс для каждой выполняемой задачи - ну так мы признаем что делаем лишь для "хороших" задач. Брать мутекс надо после атомарного декремента, это случится редко. А поскольку мутексу некого засисять то он здесь вообще не нужен.
Разбирались-разбирались... Улыбающийся

А без мютекса wait_for_all может никогда не завершиться.
Записан
Страниц: 1 ... 9 10 [11] 12 13   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.315 секунд. Запросов: 22.