Russian Qt Forum

Qt => Многопоточное программирование, процессы => Тема начата: m_ax от Сентябрь 07, 2019, 00:32



Название: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 00:32
Приветствую, камрады)
Хотел проконсультироваться у экспертов по теме многопоточности.
Обрисую проблему:
Нужно распараллелить один алгоритм (псевдокод)
Код
C++ (Qt)
do {
    // эта функция может быть распараллелена
    doHardComputing();
 
   // А это легковесная функция. Она использует результаты, полученные doHardComputing
   doSomething();
 
} while (condition); // Всё это крутится в цикле пока не сработает условие выхода.
 


Наивный вариант распараллеливания видется таким (псевдокод):
Код
C++ (Qt)
do {
   std::vector<std::thread> threads;
   for (size_t i = 0; i < NThreads; ++i)
       threads.emplace_back(worker);
 
   for (auto & th : threads)
       th.join();
 
   doSomething();
 
} while(condition);
 

Однако такой подход не очень: не хорошо так в цикле создавать и убивать потоки..( (Хотя возможно, я ошибаюсь)

По хорошему нужен пул workerов (производителей) и один потребитель, который реализует логику doSomething().
Т.е. логика такая:
1. Запускаются workerы, каждый worker должен выполнить рассчёты, оповестить потребителя и заснуть.
2. Потребитель просыпается только тогда, когда все производители закончили вычисления.
3. Производитель проснулся, выполнил свои вычисления, разбудил всех производителей и заснул.
И так до тех пор, пока не будет выполнено условие condition.

Накидал на коленке тестовый вариант. Прошу покритиковать или советы как и что улучшить и сделать правильно)

Код
C++ (Qt)
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <algorithm>
 
std::mutex mutex;
 
std::condition_variable cv_worker;
std::condition_variable cv_loop;
 
std::atomic_bool is_done(false);
 
static constexpr int NThreads = 8;
 
void worker(std::vector<bool> & flags, unsigned int id)
{
   while (!is_done)
   {
       // Имитация расчётов..
       std::this_thread::sleep_for(std::chrono::seconds(5));
 
       {
           // Каждый worker имеет свой флажёк, который говорит о том, что он выполнил расчёты и готов спать)
           std::unique_lock<std::mutex> lk(mutex);
           flags[id] = false;
       }
 
       cv_loop.notify_one(); // Пытаемся разбудить потребителя (main_loop)
       // И засыпаем
       std::unique_lock<std::mutex> lk(mutex);
       cv_worker.wait(lk, [&]()
       {
           return flags[id] || is_done;
       });
   }
}
 
void main_loop(std::vector<bool> & flags)
{
   int i = 0;
   auto cond = [](int i) { return i > 10; }; // условие выхода, для наглядности
 
   do
   {
       {   // Спим до тех пор, пока все workerы не выполнят вычисления (все флажки = false)
           std::unique_lock<std::mutex> lk(mutex);
           cv_loop.wait(lk, [&]()
           {
               return !std::any_of(flags.begin(), flags.end(), [](bool x){ return x; });
           });
       }
 
       // Имитация вычислений..
       std::this_thread::sleep_for(std::chrono::seconds(1));
 
       {   // После вычислений заряжаем флажки
           std::unique_lock<std::mutex> lk(mutex);
           std::fill(flags.begin(), flags.end(), true);
           if (cond(i++))
               is_done = true;
       }
       // Будим всех воркеров и засыпаем
       cv_worker.notify_all();
 
   } while (!is_done);
}
 
int main()
{
   std::vector<bool> flags(NThreads, false); // флажки
 
   std::thread mainthread(main_loop, std::ref(flags)); // Это наш потребитель
 
   std::vector<std::thread> threads; // Это наши производители workerы
 
   for (unsigned int i = 0; i < NThreads; ++i)
       threads.emplace_back(worker, std::ref(flags), i);
 
   mainthread.join();
 
   for (auto & th : threads)
       th.join();
 
   return 0;
}
 
 
Коректна ли такая реализация? Или можно улучшить, или я вообще велосипед изобретаю?)
Спасибо :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 07, 2019, 08:00
Так может просто OpenMP? :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 07, 2019, 08:20
Ну, блин, наворотили :) Это хрестоматийный случай, выдумывать ничего не надо, посмотреть пример (напр std::consition_variable) и подогнать под свои нужды, напр
Код
C++ (Qt)
std::thread consumer([&]() {
       while (!abortFlag) {
          std::unique_lock<std::mutex> lock(m);
 
/* каждый из worker'ов, закончив работу, уменьшает атомарный счетчик
workerInProcess и пытается нас (производителя) будить -
но мы игнорируем побудку если хотя бы один worker активен.
Собсно идея в этом */

           while (workerInProcess) {
               if (abortFlag) return;
               cond_var.wait(lock);
            }
 
/*  В этот момент все worker'ы отстрелялись и висят на (захваченном нами) мутексе, перезаряжаемся */
           if (CheckDone()) break;
           SetupWorkersData();   // заряжаем новые данные worker'ов
           workerInProcess = NUM_WORKERS;
           cond_var.notify_all();   // стартуем worker'ов, сами засыпаем
       }
 
/* даем worker'ам выйти */
       abortFlag = true;
       cond_var.notify_all();
 
   });
Теперь worker

Код:
    std::thread worker([&]() {
        while (true) {
 
// Пресекаем "ложные побудки"
          {
            std::unique_lock<std::mutex> lock(m);
            while (!HasData()) {
               if (abortFlag) return;
               cond_var.wait(lock);    // освобождаем мутекс и спим
             }
          }

// считаем
           DoWorkerJob();
        
// уменьшаем счетчик и будим всех
           --workerInProcess;
           cond_var.notify_all();
    });
Обычная ошибка - считать/полагать что wake_one/all "кого-то будит". В действительности это эффект лишь для ожидающих. И также побудка "строго одного" не гарантируется (на уровне ОС). Поэтому каждая нитка должна проверяться на ложную побудку.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 11:07
Так может просто OpenMP? :)
Возможно.. Нужно глянуть на него по внимательней :)
А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?

Цитировать
Это хрестоматийный случай, выдумывать ничего не надо, посмотреть пример (напр std::consition_variable) и подогнать под свои нужды, напр
Этот хрестоматийный случай будет работать не корректно (Поправте, если я ошибаюсь):
Просыпаются воркеры: пусть первый из них оказался самым шустрым, выполнил DoWorkerJob(), декрементировал  счётчик --workerInProcess и ушёл на второй круг (HasData() = false)
Затем остальные воркеры доходят до --workerInProcess, будят консумера, тот видит  workerInProcess == 0 и просыпается  :'(
А тем временем у нас ещё считает тот первый (уже не шустрый) воркер  :'(
 
Цитировать
Обычная ошибка - считать/полагать что wake_one/all "кого-то будит". В действительности это эффект лишь для ожидающих. И также побудка "строго одного" не гарантируется (на уровне ОС). Поэтому каждая нитка должна проверяться на ложную побудку.
Да, поэтому я и передаю предикат в метод wait, который и спасает от ложных пробудок :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Авварон от Сентябрь 07, 2019, 11:32
Ну это же типичный шаг map из map&reduce, в QtConcurrent реализован как раз на тред пуле


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 07, 2019, 11:44
А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 12:20
Ну это же типичный шаг map из map&reduce, в QtConcurrent реализован как раз на тред пуле
Не знал про QtConcurrentMap, хотя я Qt особо не пользуюсь..
Но спасибо, покурю в сторону их подхода) 

Цитировать
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.
Понятненько) Надо будет тогда его тоже поизучать)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 13:59
А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.

Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Код
C++ (Qt)
#pragma omp parallel
{
   do {
         #pragma omp task
         {
              worker();
         }
         #pragma omp taskwait
 
         doSomething();
 
   } while (condition);
}
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 07, 2019, 14:00
Этот хрестоматийный случай будет работать не корректно (Поправте, если я ошибаюсь):
Просыпаются воркеры: пусть первый из них оказался самым шустрым, выполнил DoWorkerJob(), декрементировал  счётчик --workerInProcess и ушёл на второй круг (HasData() = false)
HasData - это тот самый флажок. Все равно Вы будете подавать в нитку какие-то данные для расчетов, ну и флажок там же, выделять их в отдельный контейнер незачем. Конечно после выполнения порции расчетов worker должен свой флажок зачистить.

Код:
void worker(std::vector<bool> & flags, unsigned int id)
{
    while (!is_done)
    {
        // Имитация расчётов..
        std::this_thread::sleep_for(std::chrono::seconds(5));
Не ошибка, но лучше все-таки не полагаться на то что, мол, "данные есть" а проверить (это у Вас в конце цикла).
Код
C++ (Qt)
       {
           // Каждый worker имеет свой флажёк, который говорит о том, что он выполнил расчёты и готов спать)
           std::unique_lock<std::mutex> lk(mutex);
           flags[id] = false;
       }
 
Не вижу зачем тут брать мутекс

Не знал про QtConcurrentMap, хотя я Qt особо не пользуюсь..
Но спасибо, покурю в сторону их подхода)  
Если цель = минимум усилий, то прощe всего QThreadPool, там только пристроить run(), потом накидал и waitForDone();

А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Ну создавать и убивать нитки - это вообще дичь, пул создается один раз или по запросу. Даже усыплять нитки - дорого, поэтому в OpenMP после окончания порции расчетов нитки остаются активными в течение некоторого "времени прокрутки" которое значительно (насколько помню 0.3  СЕКУНДЫ)

А вообще OpenMP - прекрасная вещь (особенно Intel реализация), единственное возражение - увы, это еще одна зависимость


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 07, 2019, 14:04
Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Зачем? Вам же известно кол-во задач и исходные данные для каждой из них. Просто так
Код
C++ (Qt)
#pragma omp parallel for
for (int i = 0; i < data.size(); ++i)
DoCalc(data[i]);


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 14:19
Цитировать
HasData - это тот самый флажок. Все равно Вы будете подавать в нитку какие-то данные для расчетов, ну и флажок там же, выделять их в отдельный контейнер незачем. Конечно после выполнения порции расчетов worker должен свой флажок зачистить.
Именно вот поэтому у каждого воркера должен быть свой индивидуальный флажёк.
Или я не понимаю чего то?

Цитировать
Не вижу зачем тут брать мутекс
Потребитель (main_loop) может случайно проснуться, вызвать предикат и тогда появится возможность одновременного доступа к масссиву flags из предиката и из воркера, который в этот момент устанавливает флажёк.

Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Зачем? Вам же известно кол-во задач и исходные данные для каждой из них. Просто так
Код
C++ (Qt)
#pragma omp parallel for
for (int i = 0; i < data.size(); ++i)
DoCalc(data[i]);
 
Так мне этот цикл нужно повторять до возникновения условия завершения.
Здесь получается, что они постоянно будут создаваться и уничтожаться?  


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 07, 2019, 14:39
Код
C++ (Qt)
void worker(std::vector<bool> & flags, unsigned int id)
{...}
 

Воркеру обязательно знать про массив флажков, свой id, глобальный mutex? Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 14:48
Цитировать
Воркеру обязательно знать про массив флажков, свой id, глобальный mutex?
Ну это просто пример на коленке, который илюстрирует логику/концепт :)
Воркеру можно передавать ссылку на его уникальный флажёк, например..
Код
C++ (Qt)
void worker(bool & flag)
 

бес всякого id..

А мьютекс должен быть общим для всех воркеров и для потребителя, разумеется..

Цитировать
Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?

Не понял, можно по подробней?

Т.е. в конечном счёте я хочу некий класс, назовём его concurrent_loop, который используется примерно так:
Код
C++ (Qt)
bool loop_function() { ... }
 
void do_work() { ... }
 
 
concurrent_loop cloop(num_threads);
 
cloop.start(loop_function, do_work);
 
cloop.join();
 
 
 
Что, фактически, эквивалентно
Код
C++ (Qt)
do {
   std::vector<std::thread> threads;
   for (unsigned int i = 0; i < num_threads; ++i)
       threads.emplace_back(do_work);
 
   for (auto & th : threads)
       th.join();
 
} while (loop_function());
 
за исключением того, что не будет на каждый чих создаваться и уничтожаться пул потоков.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 07, 2019, 16:12
Т.е. в конечном счёте я хочу некий класс, назовём его concurrent_loop, который используется примерно так:
...

Как уже говорили, лучше сначала существующие решения и подходы рассмотреть: map-reduce, OpenMP, в C++17 вроде есть параллелизация в алгоритмах (https://en.cppreference.com/w/cpp/algorithm/execution_policy_tag_t), std::packaged_task (https://en.cppreference.com/w/cpp/thread/packaged_task) может на что сгодится, и т.д.

Цитировать
Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?

Не понял, можно по подробней?

Схематично:
Код
C++ (Qt)
template <class Done>
void worker(Done done)
{
   // hard computing...
   done();
}
 
void caller()
{
   const std::size_t N = 4;
   std::vector<bool> flags(N, false);
 
   for (int i = 0; i < N; ++i)
       worker([&flags, i]() { flags[i] = true; });
}
 

Т.е. в функтор перенести флажки, мьютексы и уведомление о завершении работы. У вызывающей стороны всё это есть и она лучше знает, как это организовать. Тогда интерфейс воркера будет чище и без ненужных  зависимостей.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 07, 2019, 17:14
Потребитель (main_loop) может случайно проснуться, вызвать предикат и тогда появится возможность одновременного доступа к масссиву flags из предиката и из воркера, который в этот момент устанавливает флажёк.
На здоровье, если запись/чтение флажка атомарны, то все норм.

Так мне этот цикл нужно повторять до возникновения условия завершения.
Здесь получается, что они постоянно будут создаваться и уничтожаться?  
Да, цикл придется повторять, это нормально если, как Вы сказали в стартовом посте, все нитки должны закончить все расчеты а потом главная решает. При использовании OpenMP нитки пере-создаваться не будут, они не будут даже засыпать если циклов несколько

Именно вот поэтому у каждого воркера должен быть свой индивидуальный флажёк.
Или я не понимаю чего то?
Грубо говоря, к чему сводится multi-threading: дать каждой нитке "свои" данные что не пересекаются с другими по чтению и записи. Если это удается обеспечить (а часто удается), то все сводится к одной директиве OpenMP. Как только от тестового sleep Вы перейдете к реальной задаче - такие "индивидуальные данные нитки" у Вас неизбежно появятся, каждая должна знать исходные данные (что считать) и куда сливать результаты

Неясно сколько ниток Вы собираетесь создавать. Тупо "по числу задач" так-сяк работает до определенного предела. Напр если ядер 4, а задач (data.size()) 100, то overhead будет не слабый. Также есть такая противная "гранулярность", т.е. ну вот попался один длинный расчет, все давно отстрелялись, а одна нитка все с ним мучается, и приходится ее ждать. При использовании низкоуровневых примитивов синхронизации (как у Вас) эти проблемы не решаются автоматычно/легко


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: RedDog от Сентябрь 07, 2019, 17:25
А каждый воркер в свой тред засунуть, а обработчик пусть от них сигналы в своем треде получает?
Так на кутишный эвент луп вся синхронизация ляжет.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 07, 2019, 18:35
Цитировать
А каждый воркер в свой тред засунуть, а обработчик пусть от них сигналы в своем треде получает?
Да, сейчас каждый воркер в своём треде и на каждом шаге шлёт сигнал обработчику (main_loop).
Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.

Написал свой класс для этого concurrent_loop. Вот простой пример использования:
Код
C++ (Qt)
#include <iostream>
#include "concurrent_loop.h"
#include <list>
 
int main()
{
   static constexpr int num_threads = 4;
   std::mutex print_mutex;
 
   std::list<std::function<void()>> workers;
 
   for (unsigned int i = 0; i < num_threads; ++i)
       workers.push_back([&]()
       {
           std::this_thread::sleep_for(std::chrono::seconds(1));
           std::lock_guard<std::mutex> lk(print_mutex);
           std::cout << std::this_thread::get_id() << std::endl;
       });
 
   int ntests = 3;
 
   auto loop_function = [&]()
   {
       std::cout << --ntests << "---------------------" << std::endl;
       return ntests == 0;
   };
 
   concurrent_loop cloop(loop_function, workers);
 
   cloop.join();
 
   return 0;
}
 
А вот примерный вывод:
Код
Bash
140269582214912
140269557036800
140269573822208
140269565429504
2---------------------
140269557036800
140269565429504
140269582214912
140269573822208
1---------------------
140269573822208
140269565429504
140269557036800
140269582214912
0---------------------
 


Проект приаттачен.

Да, в контексте данной задачи, похоже OpenMP наиболее разумное решение  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 01:13
Да, провёл ещё несколько тестов (конкретно 3: с condition_variable, "наивный подход " и с использованием single thread реализациями). Результат не то что бы удивил, но возникли некоторые тонкости.. Они вполне предсказуемы, но если кто из уважаемых оппонентов имеет желание обсудить данные результаты, я готов предоставить реальные "боевые" исходники с подробным описанием  :)
К сожалению, на момент написания данного поста, из-за скудного опыта работы с OpenMP, не могу представить, для полной картины, соответствующих сравнительных данных по отношению к OpenMP :-[ Но всё же склоняюсь, что OpenMP здесь в большинстве случаев будет выйгрышна.. (но это не точно :))


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: RedDog от Сентябрь 08, 2019, 01:33
Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.
А что будет в случае если за один "цикл" один из воркеров более одного раза отработает?
К примеру один вычисляет сумму двух интов, а другой какой нибудь синус в степени <...>
Или главный "манагер" должен им по одной итерации давать отрабатывать?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 01:48
Цитировать
А что будет в случае если за один "цикл" один из воркеров более одного раза отработает?
Плохо будет  :) Об этом и дискутируем)
Вот мы и хотим таких ситуаций избежать) Чтоб какой-либо воркер более одного раза НЕ проскочил за один степ (цикл)

Цитировать
Или главный "манагер" должен им по одной итерации давать отрабатывать?
Да, в контексте данной задачи, "главный" должен им давать лишь одну итерацию.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 08:53
Код
C++ (Qt)
#include <vector>
#include <functional>
#include <thread>
#include <mutex>
#include <iostream>
 
using namespace std;
 
int main()
{
 static constexpr size_t num_threads = 4;
 mutex print_mutex;
 
 vector< function< void() > > workers;
 workers.reserve( num_threads );
 
 for( size_t i = 0; i < num_threads; ++i)
     workers.push_back([&]()
     {
         this_thread::sleep_for( std::chrono::seconds( 1 ) );
         lock_guard<mutex> lk( print_mutex );
         cout << this_thread::get_id() << endl;
     });
 
 const size_t ntests = 3;
 for( size_t n = 0; n < ntests; ++n )
 {
   #pragma omp parallel for
   for( size_t i = 0; i < workers.size(); ++i )
     workers[ i ]();
 
   std::cout << n << "---------------------" << std::endl;
 }
 
 return 0;
}
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 08, 2019, 09:26
Да, сейчас каждый воркер в своём треде и на каждом шаге шлёт сигнал обработчику (main_loop).
Вопрос был как лучше их синхронизировать так, чтобы вначале отработали все воркеры, только потом проснулся главный тред, сделал свои дела, разбудил воркеров и опять уснул, ожидая пока вновь ВСЕ воркеры отработают и не разбудят его. И так до тех пор, пока главный тред не решит, что пора заканчивать.
Будить главный только раз - дело нехитрое
Код
C++ (Qt)
if (--workersInProcess == 0)
...
Проблема в другом. По-хорошему число воркеров должно быть == числу ядер (а не числу задач). Правильно перейти к самой обычной очереди (самый что ни на есть избитый пример). Главная нитка насовала N задач в очередь, воркеры их расхватали, когда атомарный счетчик обнулился - будится главная, опять сует N задач и.т.д.

 
Да, в контексте данной задачи, похоже OpenMP наиболее разумное решение
Здесь проходит любое стандартное решение. Посмотрите QThreadPool - там минут за 10-15 все можно сделать с нуля.

 
К сожалению, на момент написания данного поста, из-за скудного опыта работы с OpenMP, не могу представить, для полной картины, соответствующих сравнительных данных по отношению к OpenMP. Но всё же склоняюсь, что OpenMP здесь в большинстве случаев будет выйгрышна.. (но это не точно)
Результаты такого тестирования давно известны :) Просто здесь все зависит от "кластера". Т.е. если единичная задача выполняется секунды (или хотя бы "хорошие доли секунды") - то все отлично и все подходы дадут хороший и примерно одинаковый КПД. Однако чем меньше кластер - тем больше весит захват мутексов и др накладные расходы. А если кластер слишком мал - то вообще, как говорится, "не масштабируется", т.е. скорость примерно та же что и на одном дизеле  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 11:13
Цитировать
По-хорошему число воркеров должно быть == числу ядер (а не числу задач). Правильно перейти к самой обычной очереди (самый что ни на есть избитый пример). Главная нитка насовала N задач в очередь, воркеры их расхватали, когда атомарный счетчик обнулился - будится главная, опять сует N задач и.т.д.
Нет. Предположим я Бос и у меня есть команда воркеров. Мне нужно подготовить проект. Я рассылаю своим воркерам задание, все необходимые материалы и т.д.
Они их получают и начинают рукожопить над проектом. Я в это время отдыхаю. Я могу зафиксировать факт того, что они все получили свои задания, но мне нужен конечный продукт.
Они заканчивают предворительную версию проекта, присылают мне и идут пить пиво. Я получаю первую версию проекта и начинаю вносить свои правки, замечания, курю и громко ругаюсь матом. После этого я отсылаю им проект на доработку. Они его получают и начинают править. Процесс закончится, когда меня полностью устроит конечный результат. Тогда я им пишу: -Всем спасибо! Мы сделали это!

Именно такую логику реализует concurrent_loop.
Решение с классическим тредпулом здесь не совсем к месту. Поскольку мне важен не факт того, что очередь задачь пуста и можно заряжать её снова, а факт того, что очередной пакет задач был выполнен. Полностью!

Поэтому одним атомарным каунтером вы здесь не отделаетесь.(
Или покажите как?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 08, 2019, 12:28
Нет. Предположим я Бос и у меня есть команда воркеров.
Так число воркеров в команде определяется "хватит ли денег им платить", а вовсе не "числом задач". Неизбежно какой-то воркер должен будет выполнять  2 или более задач (в рамках одного подхода/сессии). Или придется городить "мертвые души" (1 задача = 1 нитка, ничем хорошим это не кончается).

Решение с классическим тредпулом здесь не совсем к месту. Поскольку мне важен не факт того, что очередь задачь пуста и можно заряжать её снова, а факт того, что очередной пакет задач был выполнен. Полностью!
Да очень даже к месту, меняется только условие ложной побудки. Не "очередь пуста" - при этом какие-то задачи уже извлечены из очереди, но еще в процессе, а счетчик "сделано" обнулился. 

Поэтому одним атомарным каунтером вы здесь не отделаетесь.(
Или покажите как?
Пойдем по пути наименьшего сопротивления
Код
C++ (Qt)
struct Task : public QRunnable {
 Task( Data * data ) : mData(data) {}  
 virtual void run( void ) { DoCalc(*mData); }
 
 Data * mData;
};
 
// main thread
QThreadPool * pool = QThreadPool::globalInstance();
while (!solved) {
 
// заряжаем данные расчетов
 std::vector<Data> data;
 for (size_t i = 0; i < num_tasks; ++i)
   data.push_back(Data(i));
 
// запускаем
 for (size_t i = 0; i < num_tasks; ++i)
   pool->start(new Task(&data[i]));
 
// ждем когда сварится
  pool->waitForDone();
 
  solved = AnalyzeResults(data);
};
Что не нравится?  :) Или непременно нужно на примитивах std? Это нетрудно, но нужно ли?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 12:52
Цитировать
Да очень даже к месту, меняется только условие ложной побудки. Не "очередь пуста" - при этом какие-то задачи уже извлечены из очереди, но еще в процессе, а счетчик "сделано" обнулился.
Да, всё, понял идею)

Цитировать
Что не нравится?  :)
Согласен)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 08, 2019, 13:08
А что копаетесь - хорошо и правильно. Без велика нет понимания, то так, "почитал", "посмотрел примеры" - ну вот уже и овладел очередной "технологией"  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 19:05
Цитировать
pool->waitForDone();
Это, конечно, хорошо, когда есть такой метод.. Но на мой взгляд правильнее чтобы метод start возвращал Future..

Набросал свой велосипед thread_pool (у которого метод add_task возврощает std::future.. по сути те же самые флажки, но опционально. Идея навеяна от сюда https://github.com/mtrebi/thread-pool)
Код
C++ (Qt)
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
 
#include <list>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
#include <memory>
#include <functional>
 
class thread_pool
{
public:
   thread_pool(unsigned int nthreads = 1)
       : m_is_stop(false)
   {
       for (unsigned int i = 0; i < nthreads; ++i)
           m_threads.push_back(std::thread(&thread_pool::worker, this));
   }
 
   thread_pool(const thread_pool &) = delete;
   thread_pool(thread_pool &&) = delete;
 
   thread_pool & operator=(const thread_pool &) = delete;
   thread_pool & operator=(thread_pool &&) = delete;
 
   void join()
   {
       m_is_stop = true;
       m_loop_cv.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 
   template<class F>
   auto add_task(F && work_function)->std::future<decltype(work_function())>
   {
       auto task_ptr = std::make_shared<std::packaged_task<decltype(work_function())()>>(std::forward<F>(work_function));
 
       std::function<void()> wrapper_func = [task_ptr]() { (*task_ptr)(); };
 
       {
           std::lock_guard<std::mutex> lock(m_mutex);
           m_queue.push(wrapper_func);
       }
 
       m_loop_cv.notify_one();
 
       return task_ptr->get_future();
   }
 
private:
   std::atomic_bool m_is_stop;
   std::list<std::thread> m_threads;
   std::queue<std::function<void()>> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_loop_cv;
 
   void worker()
   {
       while (!m_is_stop)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
           m_loop_cv.wait(lock, [&]()
           {
               return !m_queue.empty() || m_is_stop;
           });
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
 
           lock.unlock();
 
           work_function();
       }
   }
 
};
 
#endif // THREAD_POOL_H
 

Вроде всё работает..)
Код
C++ (Qt)
#include <iostream>
 
#include "thread_pool.h"
 
int main()
{
   static constexpr size_t nthreads = 4;
   static constexpr size_t ntests = 3;
   static constexpr size_t ntasks = 12;
   std::mutex mutex;
 
   std::vector<std::future<void>> results(ntasks);
 
   thread_pool pool(nthreads);
 
   for (size_t i = 0; i < ntests; ++i)
   {
       for (size_t j = 0; j < ntasks; ++j)
           results[j] = pool.add_task([&]()
           {
               std::this_thread::sleep_for(std::chrono::seconds(1));
               std::lock_guard<std::mutex> lk(mutex);
               std::cout << std::this_thread::get_id() << std::endl;
           });
 
       for (size_t j = 0; j < results.size(); ++j)
           results[j].wait();
 
       std::cout << i << "---------------------" << std::endl;
   }
 
   pool.join();
 
   return 0;
}
 
:)
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 20:15
m_ax, небольшое замечание по условным переменным.
При wait условная переменная освобождает парный мьютекс, но при пробуждении она захватывает его обратно.
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 20:26
m_ax, небольшое замечание по условным переменным.
При wait условная переменная освобождает парный мьютекс, но при пробуждении она захватывает его обратно.
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.

Да, при выходе из wait у меня m_cv_mutex освобождается:
Код
C++ (Qt)
...
    {
               std::unique_lock<std::mutex> lock(m_cv_mutex);
               m_loop_cv.wait(lock, [&]()
               {
                   std::lock_guard<std::mutex> lk(m_queue_mutex);
                   return !m_queue.empty() || m_is_stop;
               });
     }
...
 
И после wait m_queue_mutex уже разблочен..
Цитировать
Поэтому, при выходе из wait мьютекс уже захвачен, "работу" из очереди можно доставать, после этого ралочивать мьютекс и запускать работу.
Не понял, почему он захвачен?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 20:30
Стоп, так у вас два мьютекса вместо одного? :)
Зачем, достаточно одного мьютекса.
Самой условной переменной не нужен мьютекс, этот мьютекс должен блокировать наш ресурс, т.е. очередь.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 20:31
Стоп, так у вас два мьютекса вместо одного? :)
Зачем, достаточно одного мьютекса.
Да, два  :) Один для очереди, другой для condition_variable) А одним можно?  :)

Аааа))) Дошло  ;D


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 20:36
Спасибо, понял) Исправил)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 20:56
Код
C++ (Qt)
   void worker()
   {
       while (!m_is_stop)
       {
           {
               std::unique_lock<std::mutex> lock(m_mutex);
               m_loop_cv.wait(lock, [&]()
               {
                   return !m_queue.empty() || m_is_stop;
               });
           }
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function([](){});
 
           {
               // std::lock_guard<std::mutex> lock(m_mutex);  <<< и так золочен
               // if (!m_queue.empty())  <<< тут точно не пусто - проверено в wait
               {
                   work_function = std::move(m_queue.front());
                   m_queue.pop();
               }
           }
          // Разлочиваем и запускаем работу
          lock.unlock();
 
          work_function();
       }
   }
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 21:03
Нет..
Код
C++ (Qt)
while (!m_is_stop)
       {
           {
               std::unique_lock<std::mutex> lock(m_mutex);
               m_loop_cv.wait(lock, [&]()
               {
                   return !m_queue.empty() || m_is_stop;
               });
           }
           // <<< Здесь lock уже откинулся и освободил m_mutex
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function([](){});
 
           {
               // std::lock_guard<std::mutex> lock(m_mutex);  <<< и так золочен
               // if (!m_queue.empty())  <<< тут точно не пусто - проверено в wait
               {
                   work_function = std::move(m_queue.front());
                   m_queue.pop();
               }
           }
          // Разлочиваем и запускаем работу
          lock.unlock();
 
          work_function();
       }
   }
 
Можно тогда сделать так (чтоб дважды m_mutex не захватывать)
Код
C++ (Qt)
void worker()
   {
       while (!m_is_stop)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
            m_loop_cv.wait(lock, [&]()
            {
                   return !m_queue.empty() || m_is_stop;
            });
 
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
           lock.unlock();
 
          work_function();
       }
   }
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 21:12
// <<< Здесь lock уже откинулся и освободил m_mutex
Нет.
wait работает следующим образом (псевдокод):
Код
C++ (Qt)
wait( mutex m )
{
 m.unlock()
 yield()
 m.lock()
}
 

Т.е. он освобождает мьютекс и отдает управление планировщику, говоря производителям: я освободил мьютекс и жду когда вы накидаете мне в очередь работы. Производитель теперь может захватить мьютекс и положить работу в очередь. Затем он сигналит и освобождает мьютекс. Когда планировщик разбудит нить, управление вернется после yield, далее wait опять захватит мьютикс и завершится. Мьютекс снова захвачен, мы можем работать с очередью.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 08, 2019, 21:14
Можно тогда сделать так (чтоб дважды m_mutex не захватывать)
Так это тот же вариант, который получается с моими комментариями. :)

UPDATE. Ааа, увидел что локер был в {}. Да последний вариант - нормальный.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 08, 2019, 21:39
Цитировать
Да последний вариант - нормальный.
Исправил  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 09, 2019, 07:10
Код
C++ (Qt)
auto task_ptr = std::make_shared<std::packaged_task<decltype(work_function())()>>(std::forward<F>(work_function));
 
Не стоит так гнаться за модой :)  Это лишь засирает мозги и отвлекает программиста от содержательной работы


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 09, 2019, 07:40
Код
C++ (Qt)
auto task_ptr = std::make_shared<std::packaged_task<decltype(work_function())()>>(std::forward<F>(work_function));
 
Не стоит так гнаться за модой :)  Это лишь засирает мозги и отвлекает программиста от содержательной работы
Это не мода, а более общее решение, которое позволяет легко написать код как для случая, когда нужно дождаться завершения всех работ в пуле, так и для ожидания завершения только части работ. Ведь один и тот же пул может использоваться для разных типов работ, завершение каких-то работ нужно ждать в одном месте, завершение других  в другом.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 09, 2019, 12:18
Хотел поинтересоваться:
Код
C++ (Qt)
class thread_pool
{
...
private:
...
   std::list<std::shared_ptr<std::thread>> m_threads;
...
};
 

Почему shared_ptr использовали? Какой ход мыслей был, или на автомате? Мне  в исследовательских целях интересно :).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 09, 2019, 12:38
Хотел поинтересоваться:
Код
C++ (Qt)
class thread_pool
{
...
private:
...
   std::list<std::shared_ptr<std::thread>> m_threads;
...
};
 

Почему shared_ptr использовали? Какой ход мыслей был, или на автомате? Мне  в исследовательских целях интересно :).
Я по привычке, поскольку std::thread - некопируемый объект. А я все некопируемые объекты в контейнерах по указателю храню.. (Мало ли что  :))
А std::shared_ptr использовал чтоб не заморачиваться с их ручным удалением в деструкторе thread_poolа  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 09, 2019, 12:49
Я по привычке, поскольку std::thread - некопируемый объект. А я все некопируемые объекты в контейнерах по указателю храню.. (Мало ли что  :))
А std::shared_ptr использовал чтоб не заморачиваться с их ручным удалением в деструкторе thread_poolа  :)

Ок, спасибо за ответ :).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 09, 2019, 13:01
Я по привычке, поскольку std::thread - некопируемый объект. А я все некопируемые объекты в контейнерах по указателю храню.. (Мало ли что  :))
Ну так он перемещаемый. :)
Код
C++ (Qt)
vector<thread> m_threads;
 
и все. :)



Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 09, 2019, 13:52
Я по привычке, поскольку std::thread - некопируемый объект. А я все некопируемые объекты в контейнерах по указателю храню.. (Мало ли что  :))
Ну так он перемещаемый. :)
Код
C++ (Qt)
vector<thread> m_threads;
 
и все. :)


Да, верно, перемещаемый)

PS Кстатии в boost::thread_group тоже используют список с указателями на thread  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 09, 2019, 13:59
PS Кстатии в boost::thread_group тоже используют список с указателями на thread  :)
Это легаси. :)
boost::thread появился в далеком 2001 году, когда о move-семантики даже не задумывались. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Авварон от Сентябрь 09, 2019, 21:36
Ну шаред_птр можно использовать для повторного исполнения одной таски (аналог убого QRunnnable::autoDelete)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 10, 2019, 12:07
Возможно классическая очередь здесь не является лучшим решением. Ну конечно с "хорошим" кластером все будет норм, но если он противно мал, напр порядка 10 ms? (это навскидку, крытычное время может быть совсем иным).

Вообще "многопоточностью" занимаются/владеют все, но ни разу не видел замеров КПД. Интересно было бы проверить, напр на одной нитке 1000 задач выполняется за 10 сек. Сколько будет считаться на 4 нитках? Др словами каковы накладные расходы или насколько время больше идеального 2.5 сек ?

Лучшее решение думается есть  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 10, 2019, 13:04
Цитировать
Возможно классическая очередь здесь не является лучшим решением. Ну конечно с "хорошим" кластером все будет норм, но если он противно мал, напр порядка 10 ms? (это навскидку, крытычное время может быть совсем иным).
У меня на 8 ядрах, моя задача с последней реализацией thread_poolа примерно в 6 раз стала быстрее. И это только пока на "игрушечных" данных.  8) В реальной ситуации время расчётов там до нескольких часов может легко составить и там, по-видимому, выйгрыш будет стремиться к 8.

Цитировать
Интересно было бы проверить, напр на одной нитке 1000 задач выполняется за 10 сек. Сколько будет считаться на 4 нитках? Др словами каковы накладные расходы или насколько время больше идеального 2.5 сек ?

Зависит от криворукости того, кто будет параллелить) И от самой задачи: может она в принципе не распараллеливается.. Тогда только пройгрыш будет..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Авварон от Сентябрь 10, 2019, 13:56
Вообще "многопоточностью" занимаются/владеют все, но ни разу не видел замеров КПД. Интересно было бы проверить, напр на одной нитке 1000 задач выполняется за 10 сек. Сколько будет считаться на 4 нитках? Др словами каковы накладные расходы или насколько время больше идеального 2.5 сек ?

Лучшее решение думается есть  :)

Самое смешное, что на нескольких нитках прирост может быть весьма большой. Гораздо интереснее, как ведет себя приложение на большом количестве ниток (16, 32). Например, не так давно было сравнение (https://lists.qt-project.org/pipermail/qbs/attachments/20190722/6fb78e56/attachment.pdf) qbs и сmake и ВНЕЗАПНО оба деградируют начиная с какого-то значения - сколько бы ниток дополнительных мы бы не насыпали, общее время сборки не уменьшается.
А так, то, что вы не видели замеров КПД, не означает, что их не делают. Вот, например, сравнение стеков (https://habrastorage.org/getpro/habr/post_images/5f4/7e1/913/5f47e19135293897b9324aaefc03d9e8.png) (lock-free и нет) (всё из того же цикла статей что вы не осилили :)). А вот очереди (https://habrastorage.org/getpro/habr/post_images/53e/fcc/220/53efcc220ce61d2fbce1be193079bcca.png).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 10, 2019, 14:30
А так, то, что вы не видели замеров КПД, не означает, что их не делают. Вот, например, сравнение стеков (https://habrastorage.org/getpro/habr/post_images/5f4/7e1/913/5f47e19135293897b9324aaefc03d9e8.png) (lock-free и нет) (всё из того же цикла статей что вы не осилили :)). А вот очереди (https://habrastorage.org/getpro/habr/post_images/53e/fcc/220/53efcc220ce61d2fbce1be193079bcca.png).
Я имел ввиду "ни разу не видел замеров КПД на этом форуме", напр Ваших :)

Пару месяцев назад был интересный случай. Я знал что это место "не масштабится", ну так и оказалось с QThreadPool - скорость та же что и на 1 нитке. Ну думаю, ладно, реализация не накладная, пусть так и остается. А у заказчика 32 ядра, хирак - вдвое МЕДЛЕННЕЕ чем на одной. Не могу объяснить почему.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 10, 2019, 15:12
Я знал что это место "не масштабится", ну так и оказалось с QThreadPool - скорость та же что и на 1 нитке.
То есть вы знали что это место "не масштабится", но каким то образом его распаралелили?  Или оно все же масштабируется, но у вас не получилось?

Не могу объяснить почему.
Ну так вы бы показали, что вы там "распаралелили", и получите ответ. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 10, 2019, 16:15
Цитировать
В реальной ситуации время расчётов там до нескольких часов может легко составить и там, по-видимому, выйгрыш будет стремиться к 8.
Прибавил немного нагрузку, и, как предпологал, выйгрыш в производительности медленно пополз вверх)
В однопоточном варианте алгоритма время составило t = 7919 sec, а на 8 ядрах t = 1210 sec. Т.е. в 6.54 раза быстрее получается с пулом)
Издержки в основном связаны с тем, что есть общие данные, которые нужно лочить..

В общем я доволен  :)  


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 10, 2019, 18:30
Есть хороший метод который я стал забывать (наверно тлетворное влияние этого форума). Когда стандартное решение проходит, полезно поискать - а чем данная задача отличается от стандартной? Это такие "мелкие детали" которые, на первый взгляд, ничего не меняют. Но если присмотреться, то нередко получаются велики которые не только полезны для понимания, но и объективно являются лучшим решением. Попробуем применить к данному случаю.

По классике воркер извлекает задачу из очереди под защитой мутекса. Все верно, ведь в общем случае очередь может пополняться другими, поэтому надо брать мутекс. Но в данном случае пополнения-то нет. Поэтому задачу можно и не извлекать, а просто застолбить ее индекс (легко делается атомиком). И мутекс воркеру нужен только для того чтобы заснуть когда нет больше задач. Последнее тоже спорно, но не все сразу.

Основная сложность - трудновато заставить себя вернуться к тому что "уже работает" - и вроде хорошо  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 10, 2019, 18:41
По классике воркер извлекает задачу из очереди под защитой мутекса. Все верно, ведь в общем случае очередь может пополняться другими, поэтому надо брать мутекс. Но в данном случае пополнения-то нет. Поэтому задачу можно и не извлекать, а просто застолбить ее индекс (легко делается атомиком). И мутекс воркеру нужен только для того чтобы заснуть когда нет больше задач. Последнее тоже спорно, но не все сразу.

А потом:

А у заказчика 32 ядра, хирак - вдвое МЕДЛЕННЕЕ чем на одной. Не могу объяснить почему.


:)))


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 00:19
Цитировать
Поэтому задачу можно и не извлекать, а просто застолбить ее индекс (легко делается атомиком). И мутекс воркеру нужен только для того чтобы заснуть когда нет больше задач. Последнее тоже спорно, но не все сразу.
Вообще говоря, это проблема из области теории игр https://www.youtube.com/watch?v=zypuneus6b0 (https://www.youtube.com/watch?v=zypuneus6b0).. И в данном контексте она легко моделируется..
Я не вижу здесь каких то нетривиальных затруднений, чтоб сообразить как нужно рационально воспользоваться thread_pool или concurrent_loop.. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 11:55
Вообще говоря, это проблема из области теории игр https://www.youtube.com/watch?v=zypuneus6b0 (https://www.youtube.com/watch?v=zypuneus6b0).. И в данном контексте она легко моделируется..
Не вижу здесь никакой связи с теорией игр. Просмотрел треть мувика - ясно, типичный преподаватель-халтурщик. Да, байки что он рассказывает интересные, но чему он научил, что дал "под запись", что вообще осталось в голове студента после таких лекций? Да ничего.

Я не вижу здесь каких то нетривиальных затруднений, чтоб сообразить как нужно рационально воспользоваться thread_pool или concurrent_loop.. :)
Как быстро мы становимся благодушными и снисходительными после первой же "разпоточенной" задачи :) Воспользоваться любым из стандартных тулзов можно было сразу, а если уж делать велик - так делать, а не так себе, громоздить бульварные подробности std. Ну ладно, не надо - так не надо  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 12:15
Цитировать
Не вижу здесь никакой связи с теорией игр.
Связь прямая. Это типично игровая ситуация, когда нужно организовать работу воркеров так, чтоб минимизировать их стояние (толкучку) в очереди за задачами.

Цитировать
что вообще осталось в голове студента после таких лекций? Да ничего.
Это не студенты - это открытая лекция. Вот У Вас, видимо ничего не останется)
Я своим студентам тоже даю задачку смоделировать на плюсах жизненную ситуацию из теорию игр) И они в восторге обычно после этого остаются)

Цитировать
Ну ладно, не надо - так не надо   :)
Давайте обсудим Вашу реализацию на атомиках и сравним :) (Для чистоты эксперимента лучше дать реализацию с STL)


ЗЫ Задачка: На ФизФаке 6 лифтов. Этажей 14. Время подъёма лифта на один этаж t1. Если лифт останавливается на каком-либо этаже, мы ждём пока двери откроются, кто-то выйдет и двери закроются - это время ожидания t2. Вопрос: как организовать работу лифтов так, чтобы минимизировать время развоза всех N студентов с первого этажа по их  этажам. Никто из N студентов не знает кому из других студентов на какой этаж нужно.  Времена t1 и t2 мы менять не можем.

ЗЗЫ Да, студены уже заранее знают ответ, поскольку на ФизФаке это уже реализовано) Но интересно колличественно сравнить)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 13:22
Связь прямая. Это типично игровая ситуация, когда нужно организовать работу воркеров так, чтоб минимизировать их стояние (толкучку) в очереди за задачами.
Это уже совсем другая задача которая требует доп данных, напр примерное/ожидаемое время выполнения каждой.

Давайте обсудим Вашу реализацию на атомиках и сравним :)
Уменьшайте нагрузку до тех пор пока КПД не упадет, напр на 4 нитках быстрее всего в полтора раза, а не в 4 как хотелось. Собсно интересно каким окажется это время. А я исполню на атомиках.

ЗЫ Задачка: На ФизФаке 6 лифтов. Этажей 14. Время подъёма лифта на один этаж t1. Если лифт останавливается на каком-либо этаже, мы ждём пока двери откроются, кто-то выйдет и двери закроются - это время ожидания t2. Вопрос: как организовать работу лифтов так, чтобы минимизировать время развоза всех N студентов с первого этажа по их  этажам. Никто из N студентов не знает кому из других студентов на какой этаж нужно.  Времена t1 и t2 мы менять не можем.

ЗЗЫ Да, студены уже заранее знают ответ, поскольку на ФизФаке это уже реализовано) Но интересно колличественно сравнить)
Не понял чем тут "манипулировать" (что менять). Наверное какой лифт куда развозит? (до какого этажа).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 13:26
Цитировать
Не понял чем тут "манипулировать" (что менять). Наверное какой лифт куда развозит? (до какого этажа).
Вы главный лифтёр и можете программировать их работу как хотите. Но времена t1 и t2 менять не можете (причём обычно t2 > t1). И ещё вы знаете, что в фае (на первом этаже) никто из N cтудентов на знает кому другому на какой этаж нужно) Ваши действия? :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 13:32
Цитировать
Это уже совсем другая задача которая требует доп данных, напр примерное/ожидаемое время выполнения каждой.
Нет, я бы не сказал.. Есть задачи, которые можно эффективно распараллелить (если они в принципе параллелются), не интересуясь временем исполнения одной таски..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 13, 2019, 13:33
Это уже совсем другая задача которая требует доп данных, напр примерное/ожидаемое время выполнения каждой.

С каких пор Вам дополнительные данные потребовались? ;D В своих же темах Вы постоянно жалуетесь, когда доп. данные  спрашивают, подразумеваете, что и без них можно задачу решить.  :D


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 13:49
Цитировать
Вы главный лифтёр и можете программировать их работу как хотите. Но времена t1 и t2 менять не можете (причём обычно t2 > t1). И ещё вы знаете, что в фае (на первом этаже) никто из N cтудентов на знает кому другому на какой этаж нужно) Ваши действия?  :)
И да, забыл сказать, ваш алгоритм программирования лифтов студенты, конечно же, знают) Т.е. они в курсе по какому закону они ходят)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 14:17
Цитировать
Вы главный лифтёр и можете программировать их работу как хотите. Но времена t1 и t2 менять не можете (причём обычно t2 > t1). И ещё вы знаете, что в фае (на первом этаже) никто из N cтудентов на знает кому другому на какой этаж нужно) Ваши действия?  :)
И да, забыл сказать, ваш алгоритм программирования лифтов студенты, конечно же, знают) Т.е. они в курсе по какому закону они ходят)
Какой "мой" алгоритм? Я об этой задаче впервые слышу  :)

Наверное имеется ввиду нечто типа: вот лифт № 1 ходит до 5-го этажа и останавливается на всех со 2-го по 5-й. А лифт № 2 ходит до 10-го, но останавливается с 5-го по 10-й. Так, что ли?

То критиковать других за, якобы, "кривую постановку" - ума много не надо, а сам?  :)

Да, и Вы от дела не отвлекайтесь с посторонними задачами. Ждем Ваш вариант с упавшим  КПД


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 14:24
Нет, я бы не сказал.. Есть задачи, которые можно эффективно распараллелить (если они в принципе параллелются), не интересуясь временем исполнения одной таски..
Может лучше сначала поднабраться опыта распараллеливания, а потом уж "сказал - не сказал"?  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 13, 2019, 14:28
Может лучше сначала поднабраться опыта распараллеливания, а потом уж "сказал - не сказал"?  :)
Боже, кто это говорит, парень который еще совсем недавно не знал как работают условные переменные и предлагал везде использовать семафоры. :)

А сейчас уже "опыт распареллеливания", так что там у заказчика с 32 ядрами? :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 14:43
Цитировать
Какой "мой" алгоритм? Я об этой задаче впервые слышу  :)
Простая жизненная задачка не из "букваря", как Вы любите выражаться  :)

Цитировать
Наверное имеется ввиду нечто типа:
У нас есть определённый критерий правильности нашего решения: а именно время развоза всех N студентов)
Наиболее оптимальное решение соответсвтует наименьшему времени. Что ещё нужно?  :)

Цитировать
Может лучше сначала поднабраться опыта распараллеливания, а потом уж "сказал - не сказал"?   :)
Хотел сам ответить, но Old уже всё за меня сказал  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 15:25
У нас есть определённый критерий правильности нашего решения: а именно время развоза всех N студентов)
Наиболее оптимальное решение соответсвтует наименьшему времени. Что ещё нужно?  :)
Это забава, которая с темой никак не связана. Если мне еще тут выяснять - то на такие забавы нет ни времени ни желания.

По теме: хотите померяться производительностью на "мелких" задачах - давайте, я найду время. Нет - нечего засорять эфир  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 15:38
Цитировать
Это забава, которая с темой никак не связана.
Она на прямую связана с темой о птребителе и пулом производителей.
Я, если хотите, могу открыть исходники, которые ещё не опубликованы (да и кто их здась будет искать..)

Цитировать
По теме: хотите померяться производительностью на "мелких" задачах - давайте, я найду время. Нет - нечего засорять эфир  :)
Я лишь говорю о том, что в некоторых случаях (откинем OpenMP) реализация thread_pool и  concurrent_loop сопостовимы (при условии грамотной консолидации между данными), а все игры над атомиками не дадут значимый прирост в производительности..

Почему меня сподвигла идея реализовать свой алгоритм нп thread_pool отдельная история, но издержки там (при грамотном расспределении воркеров) связаны из-за доступуа к общим данным..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 13, 2019, 16:29
Она на прямую связана с темой о птребителе и пулом производителей.
Когда нужно накидать задач - никто не будет разбираться "а какая там каждая" - слишком хлопотно. Даже очевидную оптимизацию "бОльшие задачи первыми" нередко бывает затруднительно реализовать.

Я лишь говорю о том, что в некоторых случаях (откинем OpenMP) реализация thread_pool и  concurrent_loop сопостовимы, а все игры над атомиками не дадут значимый прирост в производительности..

Почему меня сподвигла идея реализовать свой алгоритм нп thread_pool отдельная история, но издержки там (при грамотном расспределении воркеров) связаны из-за доступуа к общим данным..
Вы находитесь под впечатлением "ну вот же, получилось" что вполне простительно :) Кстати доступ к общим данным засисяют атомарным мутексом (а не обычным). Уменьшайте кластер и сравнивайте КПД - и Вы быстро увидите "не только розы"  :) Попробуйте напр задача = какое-нибудь заполнение std::string или QString


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 13, 2019, 22:03
Цитировать
[Когда нужно накидать задач - никто не будет разбираться "а какая там каждая
Опять нет! Кая я уже говорил, в некоторых случаях (каковой является моя проблема) можно особо не интересоваться какое время исполняется одна таска.
Весь вопрос сводится к тому,  как организовать воркеров, если вы знаете сколько в принципе у Вас ядер и тасков.. (ну ещё предположения о временах t1 и t2) Этого достаточно..

Цитировать
Кстати доступ к общим данным засисяют атомарным мутексом (а не обычным). Уменьшайте кластер и сравнивайте КПД - и Вы быстро увидите "не только розы"  Улыбающийся Попробуйте напр задача = какое-нибудь заполнение std::string или QString
Отправлено: Сегодня в 03:38Автор: m_ax
Вам в любом случае придётся усыплять и пробуждать тпотоки.. С помощью флажков или как иначе - это другой разговор.. Т.е. всё равно будет mutrex.. Покажите как с помощью только атомиков решить эту проблему? А мы потом эти два решиния сравним)



Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 14, 2019, 12:49
Вам в любом случае придётся усыплять и пробуждать тпотоки..
А можно я как-нибудь попробую без этого?  :)

С помощью флажков или как иначе - это другой разговор.. Т.е. всё равно будет mutrex.. Покажите как с помощью только атомиков решить эту проблему? А мы потом эти два решиния сравним)
Покажу, только меряться надо на одних и тех же задачах. Мне их самому придумать? Можно и так, только чтобы потом претензий не было, мол, "не та задача". Тогда давайте "ту" :) Подтвердите


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 14, 2019, 12:53
Цитировать
Покажу, только меряться надо на одних и тех же задачах. Мне их самому придумать? Можно и так, только чтобы потом претензий не было, мол, "не та задача". Тогда давайте "ту"  :) Подтвердите
Да, конечно, я приведу алгоритм, который параллелю) (сегодня-завтра)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 14, 2019, 14:36
Да, конечно, я приведу алгоритм, который параллелю) (сегодня-завтра)
Необязательно "живой" алгоритм, ни на какие секреты я не претендую. Просто он должен быть одним и тем же для тестирующих :) и должна быть возможность уменьшить/увеличить время выполнения этой единичной задачи


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 14, 2019, 14:49
Цитировать
Необязательно "живой" алгоритм, ни на какие секреты я не претендую.
Да это, в принципе, не особый секрет.. Просто на одной конференции в одном из докладов утверждалось, что если в системе имеется более чем один тип атомов (с различным типом взаимодействия (взаимодействие зависит только лишь от расстояния между атомами..) (откинем все квантовые поправки)) то симметрия кристаллической структуры может быть отлична от треугольной.. (громкое заявление, которое хотелось бы проверить..)

У нас всегда треугольная получается)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 14, 2019, 18:21
Взял такую задачу - простые расчеты + создание/удаление вектора
Код:
	virtual void run( void )
{
std::minstd_rand0 gen(mSeed);
std::vector<double> data(mCount * 2);
mResult = 0.0;
for (size_t i = 0; i < data.size(); ++i) {
double val = (gen() % 32768) / 32768.0;
data[i] = val * 2 - 1.0;
}
for (size_t i = 0; i < data.size(); i += 2)
mResult += atan2(data[i + 1], data[i]);

mResult /= mCount;
}
Для тестов ниже mСount = 128. Результат меня удивил. В очередной раз недооценил разработчиков Qt (из сейчас модно хаять). Ну задрочить QThreadPool конечно можно (что я и сделал), но для этого нужно давать задачи 10 МИКРО секунд и меньше. Уже на 20 все вполне норм (КПД около двойки), а на 100 и больше нет разницы с моим великом.   
Цитировать
Testing single thread
single pass 0 : time(s) = 5.960, tasks 826368, unit(ms) = 0.0072, sum = -1785.4426
single pass 1 : time(s) = 5.716, tasks 826368, unit(ms) = 0.0069, sum = -1785.4426
single pass 2 : time(s) = 5.691, tasks 826368, unit(ms) = 0.0069, sum = -1785.4426
single pass 3 : time(s) = 5.690, tasks 826368, unit(ms) = 0.0069, sum = -1785.4426

Testing QThreadPool
pool pass 0 : time(s) = 6.327, tasks 826368, unit(ms) = 0.0077, sum = -1785.4426
pool pass 1 : time(s) = 7.473, tasks 826368, unit(ms) = 0.0090, sum = -1785.4426
pool pass 2 : time(s) = 6.239, tasks 826368, unit(ms) = 0.0075, sum = -1785.4426
pool pass 3 : time(s) = 6.782, tasks 826368, unit(ms) = 0.0082, sum = -1785.4426

Pool scale = 0.860 (4 threads)

Testing AtomPool
atom pass 0 : time(s) = 2.025, tasks 826368, unit(ms) = 0.0025, sum = -1785.4426
atom pass 1 : time(s) = 2.041, tasks 826368, unit(ms) = 0.0025, sum = -1785.4426
atom pass 2 : time(s) = 2.030, tasks 826368, unit(ms) = 0.0025, sum = -1785.4426
atom pass 3 : time(s) = 2.025, tasks 826368, unit(ms) = 0.0025, sum = -1785.4426

Atom scale = 2.839 (4 threads)
Ну ладно, посмотрим с др нагрузкой


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 16, 2019, 10:13
И так, выкладываю алгоритм.
Речь идёт о поиске минимума функции в многомерном пространстве (обычно это 100 - 1000 мерное пространство и более).
Суть алгоритма опишу, для простоты, на примере одномерной задачи (в прикреплённом проекте реализация для многомерного пространства в многопоточном и однопоточной реализации).
Алгорит маботает так:
Код
C++ (Qt)
 
for(uint_type steps = 0; !tol(radius, steps); ++steps)
       {
           mean_x = x;
 
           for (uint_type i = 0; i < point_per_step; ++i)
           {
               std::uniform_real_distribution<real_type> dist(mean_x - radius, mean_x + radius);
               vector_type c = dist(rng);
               real_type cur_val = function(cx);
 
               if (cur_val < min)
               {
                   x = std::move(cx);
                   min = cur_val;
               }
           }
 
           dx = x - mean_x;
           real_type sqr_ds = dx*dx; /* ds^2 = (dx, dx) */
 
           if (sqr_ds < radius*radius*mean_sqr_radius)
               radius *= m_params.alpha;
           else
              radius = std::min(m_params.init_radius, radius*m_params.beta);
 
           observer(x, min, steps);
       }
 
 
Т.е. на каждом шаге, мы генерируем point_per_step точек относительно некоторой mean_x, равномерно распределённых в интервале от [mean_x - radius, mean_x + radius].
Если значение функции в какой из точки оказывается меньше, то мы устанавливаем новое значение минимума и точки минимума.

После того, как было проверено point_per_step точек, мы смотрим как далеко от изначальной точки мы ушли. Если это расстояние меньше чем radius^2 * vector.size/3 то мы  уменьшаем радиус в alpha раз (alpha < 1), в противном случае мы его растягиваем в beta раз (beta > 1).

Процесс поиска заканчивается, когда либо радиус сколопсирует до некоторой малой величины, либо пока не закончатся все ходы.

Т.е. алгоритм на входе требует 4 параметра: начальный радиус: init_radius, point_per_step и alpha и beta.  

В проекте две реализации этого алгоритма: многопоточный (класс minsearch) и однопоточный вариант (класс minsearch_st)

Код
C++ (Qt)
#include <iostream>
 
#include "minsearch.h"
#include "model_energy.h"
#include "potential.h"
 
static constexpr unsigned int NAtomsA = 50; // Number of atoms type A
static constexpr unsigned int NAtomsB = 50; // Number of atoms type B
static constexpr unsigned int NAtoms = NAtomsA + NAtomsB; // Total number of atoms
static constexpr unsigned int Dim = 2; // Physical dimension
 
int main()
{
 
   ABModelEnergy<double, NAtomsA, NAtomsB> modelEnergy(std::bind(potential, std::placeholders::_1, 1.0, -10.0, 1.0),
                                                           std::bind(potential, std::placeholders::_1, 1.0, -12.0, 1.0),
                                                           std::bind(potential, std::placeholders::_1, 0.0, -20.0, 4.0));
 
   std::vector<double> x(Dim*NAtoms, 0.0); // The initial phase vector (all atoms are in the zero point now)
 
   double init_radius = 0.1;
   unsigned int point_per_step = 8*10;
   double alpha = 0.9;
   double beta = 2.0;
 
   //thread pool realisation
   specmath::minsearch<double> minsearch(init_radius, point_per_step, alpha, beta);
 
   auto start = std::chrono::high_resolution_clock::now();
 
   const double eps = 1e-6;
   const unsigned long max_steps = 3000000000;
 
   double res = minsearch.find_minimum(modelEnergy, x, specmath::breaker<double>(eps, max_steps));
 
   std::cout << " (multi threads) Energy = " << res << std::endl;
 
   auto stop = std::chrono::high_resolution_clock::now();
 
   auto duration = std::chrono::duration_cast<std::chrono::seconds>(stop-start).count();
 
   std::cout << "duration = " << duration << " sec" << std::endl;
 
   // single thread realisation
   specmath::minsearch_st<double> minsearch_st(init_radius, point_per_step, alpha, beta);
 
   for (auto & xi : x)
       xi = 0.0;
 
   start = std::chrono::high_resolution_clock::now();
 
   res = minsearch_st.find_minimum(modelEnergy, x, specmath::breaker<double>(eps, max_steps));
 
   std::cout << "(single threads) Energy = " << res << std::endl;
 
   stop = std::chrono::high_resolution_clock::now();
 
   duration = std::chrono::duration_cast<std::chrono::seconds>(stop-start).count();
 
   std::cout << "duration = " << duration << " sec" << std::endl;
 
   return 0;
}
 

Выйгрыш во времени с данными настройками, примерно от 5 до 7 раз (есть фактор случайности) на 8 ядрах (AMD Vishera FX-8350).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: ViTech от Сентябрь 16, 2019, 13:18
Речь идёт о поиске минимума функции в многомерном пространстве (обычно это 100 - 1000 мерное пространство и более).

Имхо, при работе с такими размерностями хорошо бы рассчитывать и выводить примерную общую длительность вычислений по времени. А то может оказаться, что ждать окончания расчётов придётся несколько тысяч лет :).


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 16, 2019, 13:54
Имхо, при работе с такими размерностями хорошо бы рассчитывать и выводить примерную общую длительность вычислений по времени. А то может оказаться, что ждать окончания расчётов придётся несколько тысяч лет :).
Все верно, только не с "такими", а с "любыми", прогон одного теста "на скорость" не должен превышать 10-15 сек, иначе невыносимо. Поэтому мне сразу же пришлось уменьшить max_steps до 1000 (было 3.0e+9). Ладно с исходными данными (4 ядра, clang, release, OSX 10.14.3)

Цитировать
NAtomsA = NAtomsB = 50, point_per_step = 80, max_steps = 1000

 (multi threads) Energy = 3.55041e+09
duration = 4.32 sec
(single threads) Energy = 4.44666e+09
duration = 10.292 sec
scale = 2.38241
Вполне хорошо масштабит, особенно учитывая что OSX держит одно ядро "для себя", т.е. получить масштаб > 3 на 4 ядрах не удается по-любому (это чисто мои наблюдения, ничего не читал). Ну ладно, то подробности моей платформы. А вот воспроизводимость (Energy) все-таки должна быть.

Хорошо, теперь уменьшаем задачу.

Цитировать
NAtomsA = NAtomsB = 16, point_per_step = 16, max_steps = 1000 * 100

 (multi threads) Energy = -16527.9
duration = 6.509 sec
(single threads) Energy = -16761.4
duration = 7.699 sec
scale = 1.18282

 (multi threads) Energy = -17795.5
duration = 3.995 sec
(single threads) Energy = -17198.4
duration = 5.478 sec
scale = 1.37121
И вот уже КПД село вдвое. Еще уменьшаем
Цитировать
NAtomsA = NAtomsB = 8, point_per_step = 8, max_steps = 1000 * 100

 (multi threads) Energy = -7481.85
duration = 2.594 sec
(single threads) Energy = -7155.09
duration = 0.563 sec
scale = 0.217039

 (multi threads) Energy = -6643.89
duration = 0.613 sec
(single threads) Energy = -7320.58
duration = 0.674 sec
scale = 1.09951
Еще село + нестабильность. Или я неправильно уменьшаю? И хотелось бы конечно иметь статистику сколько задач посчитано

И не понял этого
Код
C++ (Qt)
for(uint_type steps = 0; !tol(radius, steps); ++steps)
       {
           mean_x = x;
           std::atomic_int ntests(m_params.point_per_step);
 
           for (uint_type i = 0; i < m_num_threads; ++i)
               results[i] = pool.add_task(std::bind(task, radius, std::ref(ntests)));
 
           for (auto & res : results)
               res.wait();
           ...
 
Зачем подавать число задач == числу ниток и затем всякий раз ждать окончания? Запихивайте все и ждите когда все сварится, затем и очередь делалась.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 16, 2019, 14:32
Цитировать
Имхо, при работе с такими размерностями хорошо бы рассчитывать и выводить примерную общую длительность вычислений по времени. А то может оказаться, что ждать окончания расчётов придётся несколько тысяч лет  :)
Это ещё фигня, там и до суток может время расчётов доходить)

Цитировать
Или я неправильно уменьшаю? И хотелось бы конечно иметь статистику сколько задач посчитано
Ну это уж слишком до безобразия малое колличество атомов..  :) Желательно порядка 1000 атомов в системе иметь)

Цитировать
Зачем подавать число задач == числу ниток и затем всякий раз ждать окончания? Запихивайте все и ждите когда все сварится, затем и очередь делалась.
А как по другому? Да, я могу задать число тасков == числу point_per_step.. Но это не самый оптимальный вариант, поскольку мы здесь нарываемся на возможную ситуацию пробки (когда потоки скапливаются в очередь за задачами). Я не могу двигаться дальше, пока все воркеры не отработали, поэтому и жду..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 16, 2019, 15:11
Это ещё фигня, там и до суток может время расчётов доходить)
Так что, каждый прогон теста = сутки? :)

Ну это уж слишком до безобразия малое колличество атомов..  :) Желательно порядка 1000 атомов в системе иметь)
Ну если у Вас такой жирный, шикарный кластер, то проходит что угодно, любой тул. О чем уже не раз говорилось, такой случай просто неинтересно обсуждать.

А как по другому? Да, я могу задать число тасков == числу point_per_step.. Но это не самый оптимальный вариант, поскольку мы здесь нарываемся на возможную ситуацию пробки (когда потоки скапливаются в очередь за задачами). Я не могу двигаться дальше, пока все воркеры не отработали, поэтому и жду..
Вот у Вас машина с 8-ю ядрами (8 воркеров). У клиента машина всего с 4-мя, или наоборот с 16. Расчеты/рез-ты ведь от этого зависеть не должны. Поэтому Вы не можете двигаться дальше пока все ЗАДАЧИ не завершены, а сколько воркеров их делало - влияет на время исполнения, но не на рез-ты.

"Пробка" - да, возможна, но она зависит от числа воркеров вкупе с размером задачи - а вовсе НЕ от числа задач. Продолжим пример что Вы давеча приводили. Пусть босс раздает задачи на полчаса. Воркер (добросовестный) через полчаса придет за новой задачей, а в это время босс дает задачу другому, придется ждать. Время воркеров будет тратиться в основном на беготню к шефу. Но это никак не связано с тем что "много задач"


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: kuzulis от Сентябрь 16, 2019, 15:42
Я немного ос стороны зайду (особо не вчитывался). А имеются ли в здешнем "конечном решении" такие фичи, как "дорасчет оставшихся задач" из других ядер?
Например, имеем 4 ядра, на каждое при старте выделено по 100 задач... Пусть первое ядро выполнило свои задачи быстрее всех, а у трех оставшихся ядер осталось, 11, 22, и 33 задачи (от балды). Может ли первое ядро забрать себе еще (отобрать) несколько задач у других, дабы быстрее все посчиталось?

Или данный вопрос не рассматривается в этой теме?    


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 16, 2019, 16:29
Или данный вопрос не рассматривается в этой теме?    
Рассматривался и пул потоков для этого как раз и предназначен.
Все зависит от пользователя пула, как он разделит задачу на подзадачи и заполнит очередь.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 16, 2019, 18:30
Цитировать
Так что, каждый прогон теста = сутки?  :)
Это нормально. Я уже не помню, когда свой комп в последний раз выключал)

Цитировать
Ну если у Вас такой жирный, шикарный кластер, то проходит что угодно, любой тул. О чем уже не раз говорилось, такой случай просто неинтересно обсуждать.
"Наивное" решение, когда каждый раз создаётся и уничтожается пул потоков работает заметно медленнее..
Но в целом да, я согласен..

Цитировать
Вот у Вас машина с 8-ю ядрами (8 воркеров). У клиента машина всего с 4-мя, или наоборот с 16. Расчеты/рез-ты ведь от этого зависеть не должны. Поэтому Вы не можете двигаться дальше пока все ЗАДАЧИ не завершены, а сколько воркеров их делало - влияет на время исполнения, но не на рез-ты.
И сейчас число воркеров никак на результаты не влияет. Влияет только общее время. Вы задаёте параметр point_per_step и оно всегда отрабатывает ровно столько же раз.
Код
C++ (Qt)
auto task = [&](const real_type & r, std::atomic_int & ntests)
       {
           static thread_local std::random_device rd;
           static thread_local rng_type rng(rd());
           static thread_local std::uniform_real_distribution<real_type> dist;
           typedef typename std::uniform_real_distribution<real_type>::param_type range_t;
 
           // ntests - это и есть атомарный счётчик point_per_step
           while (std::atomic_fetch_sub(&ntests, 1) > 0)
           {
               vector_type cx = mean_x;
               std::transform(mean_x.begin(), mean_x.end(), cx.begin(), [&](const real_type & x0){ return dist(rng, range_t(x0-r, x0+r)); });
               real_type cur_val = function(cx);
 
               std::lock_guard<std::mutex> locker(mutex);
               if (cur_val < min)
               {
                   x = std::move(cx);
                   min = cur_val;
               }
           }
       };
 

Каждая таска берёт себе задачу, декрементируя значение ntests. В результате мы всегда получим point_per_step вычислений на каждом шаге.

Цитировать
"Пробка" - да, возможна, но она зависит от числа воркеров вкупе с размером задачи - а вовсе НЕ от числа задач. Продолжим пример что Вы давеча приводили. Пусть босс раздает задачи на полчаса. Воркер (добросовестный) через полчаса придет за новой задачей, а в это время босс дает задачу другому, придется ждать. Время воркеров будет тратиться в основном на беготню к шефу. Но это никак не связано с тем что "много задач"
Для жирных задач, да, проблема пробок нивилируется (здесь ей можно пренебречь), согласен..

Цитировать
Я немного ос стороны зайду (особо не вчитывался). А имеются ли в здешнем "конечном решении" такие фичи, как "дорасчет оставшихся задач" из других ядер?
Например, имеем 4 ядра, на каждое при старте выделено по 100 задач... Пусть первое ядро выполнило свои задачи быстрее всех, а у трех оставшихся ядер осталось, 11, 22, и 33 задачи (от балды). Может ли первое ядро забрать себе еще (отобрать) несколько задач у других, дабы быстрее все посчиталось?
Да, сейчас так и реализовано, как только таска заканчивает работу она тут же берёт следующую. Т.е. ситуации, когда одно ядро выполнело свои задачи быстрее остальных нет. Поскольку нет как таковых "своих задачь".  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 17, 2019, 09:14
Например, имеем 4 ядра, на каждое при старте выделено по 100 задач... Пусть..
Чтобы выделить именно "на ядро" надо сильно постараться, и возможно это не на всех платформах. В (подавляющем) большинстве случаев это не нужно, наверное Вы имели ввиду "по 100 задач на воркера", т.е. на рабочую нитку, а как ОС их будет перебрасывать с одного ядра на другое - это его личное дело.

Но и в этом случае сделать руками "по N на рыло" часто очень непросто. Нужно городить доп структуры, знать число задач и как-то выбрать этот злосчаcтный N. Время выполнения одной задачи может быть любым, напр оказалось рез-т уже в кеше, считать опять не нужно. Да и вообще, увидев текст задачи (пусть всего страничку текста) сможете сказать сколько она будет выполняться? Насколько точен будет этот ответ?  :)

Пулы оперируют с теми задачами что кладутся в пул, как говорят, "по одной не ошибешься", нагрузка всегда распределяется автоматычно. Но в этом случае возникает проблема "мелких" задач.  На первый взгляд кажется что получить на 32 ядрах МЕДЛЕННЕЕ чем на одном - ну это надо быть уж совсем криворуким :) На самом деле это "элементарно" - просто совать в пул слишком маленькие задачи - и пул будет не ускорять а тормозить. Что считать "мелким" - см мои тесты в этой теме


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 17, 2019, 09:40
Код
C++ (Qt)
auto task = [&](const real_type & r, std::atomic_int & ntests)
       {
           static thread_local std::random_device rd;
           static thread_local rng_type rng(rd());
           static thread_local std::uniform_real_distribution<real_type> dist;
           typedef typename std::uniform_real_distribution<real_type>::param_type range_t;
 
           // ntests - это и есть атомарный счётчик point_per_step
           while (std::atomic_fetch_sub(&ntests, 1) > 0)
           {
               vector_type cx = mean_x;
               std::transform(mean_x.begin(), mean_x.end(), cx.begin(), [&](const real_type & x0){ return dist(rng, range_t(x0-r, x0+r)); });
               real_type cur_val = function(cx);
 
               std::lock_guard<std::mutex> locker(mutex);
               if (cur_val < min)
               {
                   x = std::move(cx);
                   min = cur_val;
               }
           }
       };
 

Каждая таска берёт себе задачу, декрементируя значение ntests. В результате мы всегда получим point_per_step вычислений на каждом шаге.
Лучше так
Код
C++ (Qt)
              ...
              real_type cur_val = function(cx);
 
               if (cur_val < min) {
                 std::lock_guard<std::mutex> locker(mutex);
                 if (cur_val < min)
                 {
                     x = std::move(cx);
                     min = cur_val;
                 }
               }
 
:)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 17, 2019, 10:58
Цитировать
Чтобы выделить именно "на ядро" надо сильно постараться, и возможно это не на всех платформах. В (подавляющем) большинстве случаев это не нужно, наверное Вы имели ввиду "по 100 задач на воркера", т.е. на рабочую нитку, а как ОС их будет перебрасывать с одного ядра на другое - это его личное дело.
Да, имеется в виду один воркер выполняет несколько задач, в среднем: point_per_step/num_threads. Воркеров столько, сколько сколько потоков в пуле.
И такой вариант организации будет в целом более "быстрым" чем когда мы в пул кладём point_per_step задач.

Цитировать
Но и в этом случае сделать руками "по N на рыло" часто очень непросто. Нужно городить доп структуры, знать число задач и как-то выбрать этот злосчаcтный N. Время выполнения одной задачи может быть любым, напр оказалось рез-т уже в кеше, считать опять не нужно. Да и вообще, увидев текст задачи (пусть всего страничку текста) сможете сказать сколько она будет выполняться? Насколько точен будет этот ответ?
Да ничего подобного) Число задач N на "рыло"  мы явно и не вычисляем, нам достаточно знать сколько всего задач вообще. И сколько мы организуем воркеров в пуле, которые эти задачи будут решать. 

Цитировать
Пулы оперируют с теми задачами что кладутся в пул, как говорят, "по одной не ошибешься", нагрузка всегда распределяется автоматычно. Но в этом случае возникает проблема "мелких" задач.
Вот поэтому сейчас реализовано так, как реализовано  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 17, 2019, 11:37
И такой вариант организации будет в целом более "быстрым" чем когда мы в пул кладём point_per_step задач.
Даже не зная "какой вариант" можно смело утверждать что он будет хуже чем положить все необходимые point_per_step задач в очередь :)

Да ничего подобного)
"Остыньте" :)
Число задач N на "рыло"  мы явно и не вычисляем, нам достаточно знать сколько всего задач вообще. И сколько мы организуем воркеров в пуле, которые эти задачи будут решать. 
Это я отвечал на др пост, с Вашей задачей никак не связано. Вам не нужно знать ни число задач ни число воркеров (глухая общая схема).

Вот поэтому сейчас реализовано так, как реализовано  :)
Ну так и хорошо  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 17, 2019, 11:45
Цитировать
Даже не зная "какой вариант" можно смело утверждать что он будет хуже чем положить все необходимые point_per_step задач в очередь  :)
Нет, результаты показывают ровно обратное) И это легко проверить.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 18, 2019, 19:46
Покажу, только меряться надо на одних и тех же задачах.
Так когда ожидать ваш чудо вариант? :)

Мне их самому придумать?
Не, уже все придумано.
Ждем.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 20, 2019, 20:13
Цитировать
Так когда ожидать ваш чудо вариант?  :)
Дык, нет другого разумного решения) В контексте данной задачи, всё так или иначе сводится к пулу :)
Так что мы здесь не увидим решения чисто на атомиках, без усыпления и пробуждения потоков)

P.S. Задача о лифтах: Самое простое (но не единственное) решение: сделать так, чтоб одна половина лифтов ходила только по чётным этажам, а другая - только по нечётным  :)  


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 20, 2019, 20:21
Дык, нет другого разумного решения) В контексте данной задачи, всё так или иначе сводится к пулу :)
Так что мы здесь не увидем решения чисто на атомиках, без усыпления и пробуждения потоков)

А вот Igors писал, что

Вам в любом случае придётся усыплять и пробуждать тпотоки..
А можно я как-нибудь попробую без этого?  :)

Думаете уже не будет пробовать? :)
Жаль. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 20, 2019, 20:28
Цитировать
А вот Igors писал, что
Вы что, igors'а не знаете?  :)
Сколько у нас тут баталий уже было  :)

Цитировать
Думаете уже не будет пробовать?  :)
Думаю, всё) Тема выдохлась)

Ну, во всяком случае, я ответы на свои вопросы получил, и благодарен всем, кто мотивировал меня на то, чтобы разобратся в этом вопросе  :)
  


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 20, 2019, 21:06
Дык, нет другого разумного решения) В контексте данной задачи, всё так или иначе сводится к пулу :)
Так что мы здесь не увидим решения чисто на атомиках, без усыпления и пробуждения потоков)
Оно давно готово, там кстати несложно. Но, поскольку это никого не интересует, то чего я буду лезть типа "посмотрите как я сделал!" :) Это ни к чему. Кстати погонял дустовский пул.

...чтобы разобратся в этом вопросе  :)
Развитие происходит по спирали :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 20, 2019, 21:09
Ну вот, мы все ждем, ждем.... а решение уже давно готово. только природная скромность не дала его показать общественности... :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 20, 2019, 21:13
Цитировать
Оно давно готово, там кстати несложно. Но, поскольку это никого не интересует, то чего я буду лезть типа "посмотрите как я сделал!"  :)
Ну почему же, я всегда готов (как Гагарин и Титов) к конструктивному обсуждению) Так что, всегда велком)

Цитировать
Кстати погонял дустовский пул.
И как впечатления?)

Цитировать
Развитие происходит по спирали  :)
Да я бы не сказал.. но это уже совсем другая история, как говорил Леонид Каневский)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 21, 2019, 09:57
Ну почему же, я всегда готов (как Гагарин и Титов) к конструктивному обсуждению)
Ну пока еще не очень готовы :) Напр

Дык, нет другого разумного решения) В контексте данной задачи, всё так или иначе сводится к пулу :)
С точностью до наоборот. Пул очень хорош когда задачи возникают "спонтанно", его сила именно в "бездумности" - закинул и все. А у Вас на каждом раунде заранее известно и число задач и исходные данные для каждой. А значит вовсе необязательно "брать по одной", всегда засисяться мутексом и.т.п. Поэтому более подходящим выглядит хотя бы QtConcurrent (хотя я сам его ни разу не использовал :)), ну а OpenMP еще лучше.

Ну и в (в сотый раз) упомяну что все это роялит если накладные расходы велики. А если они порядка 2% - то оптимизировать это нафиг надо, можно цвести, пахнуть, и считать что уже все постиг, со всем "разобрался"  :)



Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 10:05
Ну пока еще не очень готовы :) Напр
Ну вы то точно готовы. :)

Поэтому более подходящим выглядит хотя бы QtConcurrent (хотя я сам его ни разу не использовал :))
Под капотом QtConcurrent/OpenMP все тот же пул потоков. :)

и считать что уже все постиг, со всем "разобрался"  :)
Вот вот.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 10:14
Цитировать
С точностью до наоборот. Пул очень хорош когда задачи возникают "спонтанно", его сила именно в "бездумности" - закинул и все. А у Вас на каждом раунде заранее известно и число задач и исходные данные для каждой. А значит вовсе необязательно "брать по одной", всегда засисяться мутексом и.т.п.
Я уже мозг сломал, обдумывая как эту проблему (помните, описанную ситуацию с босом и подчинёнными) реализовать как то иначе..
Я буду только рад новым идеям и примерам реализации этого взаимодействия  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 10:34
Я уже мозг сломал, обдумывая как эту проблему (помните, описанную ситуацию с босом и подчинёнными) реализовать как то иначе..
Я буду только рад новым идеям и примерам реализации этого взаимодействия  :)
В лучшем случае мы увидим решение на спинлоке. :)
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 21, 2019, 10:49
Я уже мозг сломал, обдумывая как эту проблему (помните, описанную ситуацию с босом и подчинёнными) реализовать как то иначе..
Я буду только рад новым идеям и примерам реализации этого взаимодействия  :)
Да какую "эту"? У Вас же никакой проблемы нет. С такими шикарными задачами можно брать любой тул - и все получится хорошо.

QThreadPool
QtConcurrent
boost::thread_pool
OpenMP
и.т.п.

Выбирайте ЛЮБОЙ. Хотите велик, чисто для понимания? Хорошее дело. Ну и перепишите Вы эту несчастную очередь, делов на 10 мин. Чего/зачем Вы там нагородили? Зачем связались с футурой и др std::говном? Оно там 100 лет не нужно


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 10:57
Цитировать
Выбирайте ЛЮБОЙ. Хотите велик, чисто для понимания? Хорошее дело. Ну и перепишите Вы эту несчастную очередь, делов на 10 мин. Чего/зачем Вы там нагородили? Зачем связались с футурой и др std::говном? Оно там 100 лет не нужно
Как без футуры то/флажков? Мне же нужно ждать ожидания/сигнала/отмашки, что ВСЕ задачи воркеры уже сделали, чтоб мне пойти потом на следующий круг..

Кстатии, boost::thread_pool, я там так и не увидел (может, плохо искал), как там понять, что все задачи выполнены..  ??? 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 11:01
Цитировать
С такими шикарными задачами можно брать любой тул - и все получится хорошо.
Подождите, мы же сейчас не о пуле говорим?) С пулом уже всё понятно, мне интересно альтернативное решение, чтоб его понять и сравнить) И задачи могут быть даже самые "мелкие".. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 11:12
Кстатии, boost::thread_pool, я там так и не увидел (может, плохо искал), как там понять, что все задачи выполнены..  ??? 
Вы про пул из asio?
Там есть join().


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 21, 2019, 11:14
Да, и по поводу лифтов. Типичная задача оптимизации, наверняка хорошо известная. Ну ладно, допустим мы хотим это как-то приспособить к "разпоточиванию".

Начало: очевидно "число лифтов" - это у нас "число ядер". Уже тут должен сработать рефлекс типа "ой, что-то слишком гладко, не к добру"

Позиция 1: если есть какое-то "расписание лифтов", то в любой лифт студент уже не полезет. Чему это соответствует в multi-threading? Что задачи должны иметь специфику и выполняться на определенных рабочих нитках (а не на каждой). Так бывает лишь "иногда". Впрочем это еще цветочки

Позиция 2: а вот времена t1 и t2 - уже ягодки. Для задачи оптимизации они действительно необходимы, но как (или куда) воткнуть их в multi-threading?

"Контрольный выстрел": и оказывается надо еще заранее иметь готовое аналитическое решение, а откуда его брать - хз.

Вывод: да, и "разпоточивание" и задачи оптимизации часто стремятся "наилучшим образом распределить имеющиеся ресурсы", но на этом всякое сходство заканчивается. Multi-threading не имеет ничего общего с аналитической оптимизацией. С теорией игр - тем более. Вот для "моделирования" можно/удобно использовать N лифтов-ниток, но это уже "совсем другая история"


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 11:29
Джентельмены) Должен откланиться, срочно убегаю) На последние два поста отвечу позже)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 11:29
Кстатии, boost::thread_pool, я там так и не увидел (может, плохо искал), как там понять, что все задачи выполнены..  ??? 
В любую нормальную очередь мы передаем работу в виде функтора/лямбды. Никто не мешает передавать в ней callback-функцию, которая будет вызываться в конце работы. Так мы можем контролировать завершение каждой конкретной работы, а не всех вместе.
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 21, 2019, 11:52
Как без футуры то/флажков? Мне же нужно ждать ожидания/сигнала/отмашки, что ВСЕ задачи воркеры уже сделали, чтоб мне пойти потом на следующий круг..
Атомарный счетчик (по 20 копеек) - и вся любовь. Простое улучшение классической схемы (под специфику) - берем задачу (индекс) атомарным счетчиком (не извлекая ее при этом из очереди). И лишь когда все задачи завершены - уходим спать на мутекс. А в начале след раунда создаем всю новую очередь.

Кстатии, boost::thread_pool, я там так и не увидел (может, плохо искал), как там понять, что все задачи выполнены..  ???  
Я надыбал пример для 1.66, а у меня стоит 1.59. Переставлять "чревато" т.к. 1.59 используется в рабочих проектах. Ладно, погнал по хедерам. Хирак - не линкует. Пытался подлить еще хедеров - ни фига. Потом дошло - для pthread нужны либы. Правда их компиляция и добавление прошло с пол-пинка, но пока сообразишь...

Ладно, разбираюсь. Метод join всего лишь завершение ниток, если его просто так позвать - то висит и все. Оказывается сперва надо сделать closе (закрыть очередь). Но это всего лишь "закрыть пул", а удобного метода waitFotDone просто НЕТУ, пришлось сделать самому, псевдокод
Код
C++ (Qt)
// worker
void MyTask::operator()( void )
{
 ....
 if (--MyTask::taskCount <= 0) {
   std::unique_lock..
   MyTask::cond.notify_one();
 }
}
 
// main thread
MyTask::taskCount = tasks.size();
for (int i = 0; i < tasks.size(); ++i
pool.submit(tasks[i]);
 
while (MyTask::taskCount) {
std::unique_lock..
MyTask::cond.wait(lock);
}
На мелких задачах производительность также садится, но "запас прочности" побольше чем у QThreadPool. В принципе тул неплохой, но ни доки ни примеров - ни хрена нету  :'( Тратить время на разборки (да еще и с каждой версией) - непозволительная роскошь для профессионала.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:17
Цитировать
В любую нормальную очередь мы передаем работу в виде функтора/лямбды. Никто не мешает передавать в ней callback-функцию, которая будет вызываться в конце работы. Так мы можем контролировать завершение каждой конкретной работы, а не всех вместе.
Вот.. Я вначале тоже думал, что можно завести список флажков и передавать его в лямбду. Но.. Но это означает, что потребитель должен в "холостую" крутится в цикле, опрашивая каждый раз, не завершились ли все задачи.. Это немного напрягает.. Футура, на мой взгляд, здесь более логичное решение..

Цитировать
Начало: очевидно "число лифтов" - это у нас "число ядер". Уже тут должен сработать рефлекс типа "ой, что-то слишком гладко, не к добру"
Это почему же?

Цитировать
Позиция 2: а вот времена t1 и t2 - уже ягодки. Для задачи оптимизации они действительно необходимы, но как (или куда) воткнуть их в multi-threading?
Здесь особого секрета нет. Время t2 - это время когда нам нужно заблочить мьютекс, взять задачу, сделать pop у очереди и освободить мьютекс. Если Вы в очередь кидаете мульён мелких задач (у которых время выполнения сопостоаимо с описанной выше ситуацией) в один присяд, то Вы неменуемо свалитесь в пробку.

Цитировать
"Контрольный выстрел": и оказывается надо еще заранее иметь готовое аналитическое решение, а откуда его брать - хз.
Что значит откуда?
Мы же это уже обсуждали..  Там не нужно каких то фундаментальных теорий.. Я сейчас говорю, опираясь на обозначенную мной проблему.. В этой ситуации ответ очевиден:  нужно сделать очередь как можно меньше, а задачи, как можно более жирными.

Цитировать
а удобного метода waitFotDone просто НЕТУ
Вот, я как раз и хотел заиметь этод метод, и, как следствие, мой велик.. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:20
Так что там с реализацией на атомиках онли? :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:28
Цитировать
Оказывается сперва надо сделать closе (закрыть очередь).
На сколько я понимаю, это костыль в контекте моей проблемы..((


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 20:37
Цитировать
Оказывается сперва надо сделать closе (закрыть очередь).
На сколько я понимаю, это костыль в контекте моей проблемы..((
Что за пул из буста вы обсуждаете, у которого есть метод close?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:43
Цитировать
Что за пул из буста вы обсуждаете, у которого есть метод close?
Не знаю :) Я имею в виду пул из asio)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:45
Я уже плохо соображаю сейчас  :)

Я хотел там найти, но не нашёл метода типа waitFotDone


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 20:54
Цитировать
Оказывается сперва надо сделать closе (закрыть очередь).
На сколько я понимаю, это костыль в контекте моей проблемы..((
Что за пул из буста вы обсуждаете, у которого есть метод close?
А там и нет такого метода.. Поддался на мнения igors'а))


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 21:17
Вот.. Я вначале тоже думал, что можно завести список флажков и передавать его в лямбду. Но.. Но это означает, что потребитель должен в "холостую" крутится в цикле, опрашивая каждый раз, не завершились ли все задачи.. Это немного напрягает.. Футура, на мой взгляд, здесь более логичное решение..
Вместо флажков можно было передавать std::promise, а полученные из него std::future использовать для ожидания.
Те же яйца только в профили. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 21:36
Цитировать
Вместо флажков можно было передавать std::promise, а полученные из него std::future использовать для ожидания.
Те же яйца только в профили.  :)
Согласен) Но тем не менее это лишние Телодвижения) Которых хотелось бы избежать)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 21, 2019, 21:38
Согласен) Но тем не менее это лишние целодвижения) Которых хотелось бы избежать)
Этот прием можно использовать, если пул сам не умеет в футуры. :)
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 21, 2019, 21:41
Цитировать
Этот прием можно использовать, если пул сам не умеет в футуры.  :)
Согласен)) Но это всё же лишние телодвижения.. Я так делать не хочу(  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 08:54
А там и нет такого метода..
Метод close есть
Код
C++ (Qt)
   /**
    * \b Effects: close the \c basic_thread_pool for submissions.
    * The worker threads will work until there is no more closures to run.
    */

   void close()
   {
     work_queue.close();
   }
 
Хедер в аттаче. А метода waitForDone (по аналогии с QThreadPool) нету о чем я Вам сообщил большими буквами.
Поддался на мнения igors'а))
А разве я Вас чем-то обманул?

Футура, на мой взгляд, здесь более логичное решение..
А у Вас здесь есть какой-то интерес к "именно этой/конкретной задаче" чтобы ее мониторить? Нет. Так зачем бездумно добавлять еще какие-то обертки/обвязки? Только потому что это "std" и, значит, "уже хорошо"?

Если атомарный счетчик вызывает у Вас затруднения (не знаю почему), то есть еще приемчик: пусть главная нитка работает как worker. Ведь босс тоже может делать какие-то задачи, не только раздавать их другим. При этом босс знает что "он босс" и может позаботиться об обновлении UI и.т.п.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 09:12
А разве я Вас чем-то обманул?
Вы писали что попробовали бустовский пул. В бусте пул поток есть только в asio и это не то, что пробовали вы. У вас какая то библиотека не буста.

Если атомарный счетчик вызывает у Вас затруднения (не знаю почему)
Вам уже много раз говорили, что для организации эффективного ожидания атомарных счетчиков не достаточно.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 10:51
Вы писали что попробовали бустовский пул. В бусте пул поток есть только в asio и это не то, что пробовали вы. У вас какая то библиотека не буста.
Беру свои слова назад.
Нашел этот пул в экспериментальных расширениях thread.
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 10:52
Можно оформить через future+promise
Код
C++ (Qt)
// worker
void MyTask::operator()( void )
{
 ....
if (--MyTask::taskCount <= 0)
 MyTask::mPromise.set_value(1);
}
 
// main thread
MyTask::taskCount = tasks.size();
MyTask::mPromise = std::promise<int> ();
std::future<int> future = MyTask::mPromise.get_future();
 
for (int i = 0; i < tasks.size(); ++i)
pool.submit(tasks[i]);
 
future.wait();
}
Как всегда, std сэкономило пару строк. Но делать футуру на каждую (из многочисленных) задач - явная глупость  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 10:59
Но делать футуру на каждую (из многочисленных) задач - явная глупость  :)
Ну конечно. :)
Уже согласились на одну. :)
Подождем, через пару лет вы будете здесь всем рекомендовать их использовать для всех задач. :)

Ну хоть что-то полезное случилось, Igors освоил future+promise.
Теперь следующий шажок понять что package_task объединяет future+promise и функтор задачи и основы будут освоены. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 22, 2019, 14:08
Цитировать
Можно оформить через future+promise
И чем же такое решение лучше?  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 14:42
И чем же такое решение лучше?  :)
Лучше предыдущего что я приводил? Да в принципе ничем, ну пару строк сэкономили (правда ценой запоминания глупых классов). То я вижу что для Вас std - пуп Земли, ну и заглянул в справочник  :)

А вот "ничтоже сумняшеся" создавать потенциально большой контейнер жирных структур - тяжелая ошибка, и никакие познания в std/boost ее не компенсируют


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 14:46
А вот "ничтоже сумняшеся" создавать потенциально большой контейнер жирных структур - тяжелая ошибка, и никакие познания в std/boost ее не компенсируют
А почему вы решили, что эти структуры такие жирные для помещения в потенциально большой контейнер? :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 22, 2019, 16:02
Цитировать
Лучше предыдущего что я приводил?
Нет, я имел в виду в сравнении с моей реализацией..
Минусом Вашей реализацией является то, что я должен оборачивать таску в MyTask, заводить там статический каунтер и промис. Уже не безопасно, кто-нибудь да и выстрелит себе в ногу.. Однако, это поправимо и решаемо. Но на этом проблемы не заканчиваются.
Код
C++ (Qt)
// worker
void MyTask::operator()( void )
{
 ....
if (--MyTask::taskCount <= 0)
 MyTask::mPromise.set_value(1);
}
 
Здесь мы словим исключение, в попытке более одного раза записать значение в promise.
Но это тоже решаемо..

Давайте посмотрим на примерную реализацию такого пула с методом wait_for_done:
Код
C++ (Qt)
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
 
#include <list>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
#include <memory>
#include <functional>
 
 
class thread_pool
{
public:
   thread_pool(size_t num_threads = std::max(size_t(1), size_t(std::thread::hardware_concurrency())))
       : m_is_stop(false), m_counter(0)
   {
       for (size_t i = 0; i < num_threads; ++i)
           m_threads.push_back(std::thread(&thread_pool::worker, this));
   }
 
   thread_pool(const thread_pool &) = delete;
   thread_pool(thread_pool &&) = delete;
 
   thread_pool & operator=(const thread_pool &) = delete;
   thread_pool & operator=(thread_pool &&) = delete;
 
   void join()
   {
       m_is_stop = true;
       m_loop_cv.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 
   template<class G>
   void add_task_group(const G & task_group)
   {
       {
           std::lock_guard<std::mutex> lock(m_mutex);
           m_counter += task_group.size();
           for (const auto & task : task_group)
               m_queue.push(task);
       }
 
       m_loop_cv.notify_one();
   }
 
   void wait_for_done()
   {
       m_promise.get_future().wait();
   }
 
private:
   std::atomic_bool m_is_stop;
   std::atomic_int m_counter;
   std::list<std::thread> m_threads;
   std::queue<std::function<void()>> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_loop_cv;
   std::promise<void> m_promise;
 
   void worker()
   {
       while (!m_is_stop)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
           m_loop_cv.wait(lock, [&]()
           {
               return !m_queue.empty() || m_is_stop;
           });
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
 
           lock.unlock();
 
           work_function();
 
           if (!--m_counter)
               m_promise.set_value();
       }
   }
};
 
#endif // THREAD_POOL_H
 


Код
C++ (Qt)
thread_pool pool;
 
   const size_t num_tasks = 100;
 
   std::vector<std::function<void()>> tasks;
   for (size_t i = 0; i < num_tasks; ++i)
       tasks.push_back(task);
 
   pool.add_task_group(tasks);
 
   pool.wait_for_done();
 
   pool.join();
 

Т.е. чтоб не выстрелить себе в ногу, мы как минимум должны подавать в очередь не по одной задачи, а целую группу.
И тем не менее, этот код безопасен?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 17:11
..заводить там статический каунтер и промис. Уже не безопасно, кто-нибудь да и выстрелит себе в ногу..
Не то чтобы "небезопасно", а просто "коряво" как и любой статик. Конечно сделать эти переменные членами пула - намного лучше, но я ж дустовский пул не мог кромсать  :)

Здесь мы словим исключение, в попытке более одного раза записать значение в promise.
Не должны, на то он и атомик чтобы установиться лишь раз. Кстати не вижу где у Вас сброс promise перед след раундом? Наверно надо добавить в конец wait_for_done

Код
C++ (Qt)
   template<class G>
   void add_task_group(const G & task_group)
   {
       {
           std::lock_guard<std::mutex> lock(m_mutex);
           m_counter += task_group.size();
           for (const auto & task : task_group)
               m_queue.push(task);
       }
 
       m_loop_cv.notify_one();
   }
 
Наверное notify_all (задач-то прибыло не одна). А так все хорошо.  


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 17:25
Код
C++ (Qt)
   void join()
   {
       m_is_stop = true;
       m_loop_cv.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 

notify_all должно быть под мутексом - на этот момент какие-то нитки могут еще не уснуть.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 17:26
Не должны, на то он и атомик чтобы установиться лишь раз.
Ну давайте посмотрим:
Код
C++ (Qt)
// worker
void MyTask::operator()( void )
{
 ....
if (--MyTask::taskCount <= 0)
 MyTask::mPromise.set_value(1);
}
 
Предположим остались 2 задачи в очереди (taskCount = 2).
Завершается первая из них, она успевает уменьшить taskCount на 1, но не успела пройти сравнение с 0 и была вытеснена (taskCount = 1).
Управление получила другая задача, отработала, уменьшила taskCount на единицу и выполнила set_value (taskCount = 0).
Управление получает первая задача, сравнивает taskCount c 0, он действительно 0 и она торжественно вызывает set_value второй раз.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 17:32
на этот момент какие-то нитки могут еще не уснуть.
Ну и что, нитки все равно завершаться. Не обязательно их вешать на wait.
Мутекс в условной переменной нужен не для самой условной переменной, а для тех ресурсов, которые на ней ждут.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 22, 2019, 17:36
Цитировать
А так все хорошо.
Нет, нехорошо( Нам в любом случае нужет контейнер task_group. И хз какие по жирности (по размеру) там будут таски.
Уж лучше опционально иметь контейнер футур)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 22, 2019, 17:44
Нет, нехорошо( Нам в любом случае нужет контейнер task_group. И хз какие по жирности (по размеру) там будут таски.
Не понял в чем проблема


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 17:47
Не понял в чем проблема
Все хорошо. Сойдет. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 22, 2019, 18:15
Цитировать
Не понял в чем проблема
Я не вижу чем это решение как минимум лучше.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 20:33
Я не вижу чем это решение как минимум лучше.
Как минимум? :)
Да оно дырявое. Я выше все расписал.
Мы взяли надежные future с promise и сделали на их основе дырявый механизм. Для чего? Что бы счетчик атомарный задействовать?
А потом все время надеяться, что бы компилятор хорошо код сгенерил без команды сравнения. В отладке соберешь - будет дыра, в релизе соберешь - будет зависеть от компилятора и уровня оптимизации.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 22, 2019, 21:31
Цитировать
Мы взяли надежные future с promise и сделали на их основе дырявый механизм. Для чего? Что бы счетчик атомарный задействовать?
Я так понял, что это всё для того, чтоб сделать подобие waitForDone  :)

Цитировать
А потом все время надеяться, что бы компилятор хорошо код сгенерил без команды сравнения. В отладке соберешь - будет дыра, в релизе соберешь - будет зависеть от компилятора и уровня оптимизации.
Не, ну там, справедливости ради, можно сравнение заменить на
Код
C++ (Qt)
if (std::atomic_fetch_sub(&MyTask::taskCount, 1) == 1)
 MyTask::mPromise.set_value(1);
 

Но всё равно дизайн кривой получается. Да и потом повторный вызов wait_for_done, до того, как мы положим в очередь очередную группу задач выкинет исключение.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 22, 2019, 21:39
Я так понял, что это всё для того, чтоб сделать подобие waitForDone  :)
Ну так взяли уже футуры, используем - там уже все есть. :)
Нет, добавим счетчик, для чего? Просто потому что обещал? Так вроде обещано было другое - атомики без усыпления. Где? :)

Не, ну там, справедливости ради, можно сравнение заменить на
Вот. Сделали решение, отдали заказчику. А у него 32 ядра и нихрена не работает. Не знаю почему.
А потом уже заменим. :)
Хотя есть проверенные решения в стандартной библиотеке, даже буст брать не надо.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 23, 2019, 06:48
Интересную библиотеку нашел на github:
https://github.com/cpp-taskflow/cpp-taskflow


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 23, 2019, 07:58
Интересную библиотеку нашел на github:
https://github.com/cpp-taskflow/cpp-taskflow

Крутой проект, судя по описанию и отзывам) Спасибо, буду иметь в виду)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 23, 2019, 09:16
Не, ну там, справедливости ради, можно сравнение заменить на
Ладно, пожуем азы
Код
C++ (Qt)
std::atomic_int test(5);
if (--test <= 0)
....;
 
Этот код выливается по существу в команду xaddl (аттач) которая выполняется с префиксом lock. Команда или выполняется до вытеснения нитки или нет. Если вытеснение произошло после, то при возврате управления в еax торчит рез-т, т.е последующее сравнение имеет абсолютно тот же эффект что и без вытеснения. Да, если бы этот код выполнялся неск нитками, то возвращаемое значение могло быть напр 3 или 2 (а не 4), т.е. кто-то "успел до нас". И на момент сравнения атомик мог уже стать другим, это нормально. НО возвращаемое значение обязательно УНИКАЛЬНО. Это широко известный и популярный прием.

"А если вот компилятор сгенерит др код.." НЕТ, не имеет права, такое поведение атомика регламентировано. Вот если бы мы понадеялись что, мол, "int и так атомарный" - тогда да.

Я так понял, что это всё для того, чтоб сделать подобие waitForDone  :)
А разве Вам не это нужно?

Но всё равно дизайн кривой получается.
Да в чем же его кривизна?

Да и потом повторный вызов wait_for_done, до того, как мы положим в очередь очередную группу задач выкинет исключение.
Ну очевидно в wait_for_done надо провериться на "нет задач" и сбросить промису. Опять Вы в 2 соснах заблудились   :'(


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 23, 2019, 09:27
Этот код выливается по существу в команду xaddl (аттач) которая выполняется с префиксом lock.
Этот код также может вылиться (gcc и clang) в
Код
ASM
       lock sub        QWORD PTR taskCount[rip], 1
       je      .L13
       ret
 
и выполнится корректно.
Только, насколько я помню (хотя я и не знаток стандарта), это нигде не гарантируется в стандарте, то что современные компиляторы умные и генерируют корректный код, не значит что так будет всегда и везде.
Это скорее побочный эффект: в случае с xadd в регистре останется предыдущее значение счетчика, в случае с sub - будут установлены флаги, и компилятор использует их в дальнейшем.
Но компилятору никто не запрещает генерировать отдельно декремент и load для сравнения.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 23, 2019, 10:50
Цитировать
А разве Вам не это нужно?
Но не в такой реализации.
Код
C++ (Qt)
template<class G>
   void add_task_group(const G & task_group)
   {
       {
           std::lock_guard<std::mutex> lock(m_mutex);
           m_counter += task_group.size();
           m_promise = std::promise<void>();
           for (const auto & task : task_group)
               m_queue.push(task);
       }
 
       m_loop_cv.notify_one();
   }
 
И вот здесь можно подорваться на мине (возможно ошибаюсь):
Код
C++ (Qt)
threap_pool pool;
 
pool.add_tasks_group(tasks_group);
 
...
 
pool.add_tasks_group(tasks_group);
 
pool.wait_for_done();
 
Т.е. при повторном вызове add_tasks_group  после ожидания на wait_for_done  мы можем получить, что не все задачи отработали.. И это косяк..
Примером изящного дизайна, решающего, в частности, и эту проблему есть Cpp-Taskflow, обозначенная по ссылке выше)  

Да, и потом, здесь мы создаём контейнер тасков. А чем это лучше контейнера футур?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 24, 2019, 06:01
Примером изящного дизайна, решающего, в частности, и эту проблему есть Cpp-Taskflow, обозначенная по ссылке выше) 
Плиз "ткните носиком" в это место, интересно что же там такого хорошего.

И вот здесь можно подорваться на мине (возможно ошибаюсь):
...
Т.е. при повторном вызове add_tasks_group  после ожидания на wait_for_done  мы можем получить, что не все задачи отработали.. И это косяк..
А это уже та самая "архитектура". Вы/мы механически следуем идеям/методам пула, что давно неверно. Пул предназначен для "подбрасывния" задач по мере их поступления, и это действительно очень полезный ф-ционал. Но нужен ли он здесь? Вам виднее, но из того что я слышал - думаю что нет. У Вас четко виден "раунд/сессия" вычислений - известно число задач и данные для каждой. Поэтому логичнее делать по образцу OpenMP или QtConcurrent, т.е.

- заряжаем контейнер задач в главной нитке
- запускаем наш класс и ждем завершения

Да, при этом мы полагаем что "зарядка" достаточно дешева, она выполняется в главной нитке. Если зарядка обнаруживает что (предыдущие) вычисления еще в процессе - сразу assert.

Владение (о котором так любят говорить). Очень неплохо сделано в QThreadPool - пул может и сам удалять задачи и нет. Почему бы не позаимствовать эту удачную идею? Также мне нравится что в пул добавляется указатель на базовый класс QRunnable (кажется он из жабы но могу путать). Ну можно и дусто-любимый оператор () call. Но копировать данные в "вычислялку" не вижу никакого смысла - у нее контейнер указателей. Также добавлять контейнер не очень - "по одному" проще и лучше, ведь вычислялка не должна немедленно толкать выполнение.

Ну и наверно метод Run (запуск вычислений) должен иметь флажок async. Ну и геттеров там подрисовать (типа GetProgress и.т.п)

Да, и потом, здесь мы создаём контейнер тасков. А чем это лучше контейнера футур?
Тем что первый вызывается необходимостью, а второй нет.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 24, 2019, 06:08
Код
C++ (Qt)
   void join()
   {
       m_is_stop = true;
       m_loop_cv.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 

notify_all должно быть под мутексом - на этот момент какие-то нитки могут еще не уснуть.
Не хочу быть навязчивым, но это классическая ошибка которую делают ВСЕ (что бы потом ни свистели  :)). Видимо Вам это показалось не очень важным - ну подождем пока Вы ее опять сделаете  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 06:17
Пул предназначен для "подбрасывния" задач по мере их поступления, и это действительно очень полезный ф-ционал. Но нужен ли он здесь? Вам виднее, но из того что я слышал - думаю что нет. У Вас четко виден "раунд/сессия" вычислений - известно число задач и данные для каждой. Поэтому логичнее делать по образцу OpenMP или QtConcurrent
Под капотом и у OpenMP и у QtConcurrent все тот же пул. Эти инструменты берут на себя "нарезку на задачи" и загрузку пула, но пул никуда не девается. Здесь можно не пытаться уйти от пула, а написать алгоритм по аналогии с каким нибудь QtConcurrent::mapped и все средства для этого в пуле m_ax уже есть.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 06:21
Не хочу быть навязчивым, но это классическая ошибка которую делают ВСЕ (что бы потом ни свистели  :)). Видимо Вам это показалось не очень важным - ну подождем пока Вы ее опять сделаете  :)
Классическая ошибка в том, что вы опять не до конца разобрались с условными переменными и пытаетесь навязать ваши заблуждения всем с видом гуру. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 09:59
Цитировать
Плиз "ткните носиком" в это место, интересно что же там такого хорошего.
Ну хотя бы вот как можно легко создавать различные графы исполнения тасков, что частенько требуется во всяких вычислениях (пример с сайта: https://github.com/cpp-taskflow/cpp-taskflow)
Код
C++ (Qt)
#include <taskflow/taskflow.hpp>  // Cpp-Taskflow is header-only
 
int main(){
 
 tf::Executor executor;
 tf::Taskflow taskflow;
 
 auto [A, B, C, D] = taskflow.emplace(
   [] () { std::cout << "TaskA\n"; },               //  task dependency graph
   [] () { std::cout << "TaskB\n"; },               //
   [] () { std::cout << "TaskC\n"; },                    //            +---+          
   [] () { std::cout << "TaskD\n"; }                     //    +---->| B |-----+  
 );                                                                    //    |        +---+     |
                                                                       //  +---+           +-v-+
 A.precede(B);  // A runs before B                  //  | A  |            | D  |
 A.precede(C);  // A runs before C                  //  +---+           +-^-+
 B.precede(D);  // B runs before D                     //    |     +---+     |    
 C.precede(D);  // C runs before D                  //    +---->| C |-----+    
                                                                           //          +---+          
 executor.run(taskflow).wait();
 
 return 0;
}
 
У меня, фактически, тоже граф.. циклический)

Цитировать
А это уже та самая "архитектура". Вы/мы механически следуем идеям/методам пула, что давно неверно.
Вы же вначале говорили, что всё хорошо:
Цитировать
А так все хорошо.
Если такое решение попадёт в руки, какому-нибудь пользователю, то наверняка он себе что-нить да отстрелит) Это пример плохой, непродуманной архитектуры.
Цитировать
Вы/мы механически следуем идеям/методам пула, что давно неверно.

Что Вы предлагаете? Можно это формализовать в виде кода, чтоб понять о чём идея? :)

Цитировать
Если зарядка обнаруживает что (предыдущие) вычисления еще в процессе - сразу assert.
Нет уж.. мне такие неожиданности не нужны.. Мне нужно все расчёты безопасно довести до конца. Без всяких возможных асёртов)

Цитировать
Почему бы не позаимствовать эту удачную идею?
Я считаю, что вариант, когда пул имеет возможность предоставлять футуру для любой задачи более гибкая.
  
Цитировать
Не хочу быть навязчивым, но это классическая ошибка которую делают ВСЕ
Ошибка в другом месте.. Покажу позже.. :(


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 12:33
Цитировать
Не хочу быть навязчивым, но это классическая ошибка которую делают ВСЕ
Да, накасячил, конечно  :( Но проблема не в condition_variable а здесь:
Код
C++ (Qt)
void worker()
   {
       while (!m_is_stop)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
           m_loop_cv.wait(lock, [&]()
           {
               return !m_queue.empty() || m_is_stop;
           });
 
           if (m_is_stop)
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
 
           lock.unlock();
 
           work_function();
       }
   }
 
Если сразу после заполнения очереди вызвать join(), m_is_stop == true и в очереди могут остаться невыполненные задачи.
Стыдно на такие грабли было наступить)
Выходить из цикла мы должны только при совместном выполнении условий: когда и очередь пуста и флаг m_is_stop == true.
Т.е. правильно так:
Код
C++ (Qt)
void worker()
   {
       for(;;)
       {
           std::unique_lock<std::mutex> lock(m_mutex);
           m_loop_cv.wait(lock, [&]()
           {
               return !m_queue.empty() || m_is_stop;
           });
 
           if (m_is_stop && m_queue.empty())
               return;
 
           std::function<void()> work_function(m_queue.front());
           m_queue.pop();
 
           lock.unlock();
 
           work_function();
       }
   }
 
   


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 13:32
 m_ax, это спорно. :)
У вас join нужно переименовать в stop, потому что он не ждет завершения работы задач, а завершает работу пула вообще. Если нам нужно завершить программу, уже все равно остались задачи в очереди или нет.
А завершения всех задач вы ждете по футурам.
Кстати, чтобы не плодить проверок, я делаю проверку в нитках чуть по другому. Когда доберусь до компьютера покажу как.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 13:58
Цитировать
У вас join нужно переименовать в stop, потому что он не ждет завершения работы задач, а завершает работу пула вообще.
Да? А я думал наоборот, что вызов join блокирует поток, в котором он был вызван до тех пор, пока не будут выполнены все задачи.. Во всяком случае такой логики придерживается thread_pool из asio:
Цитировать
This function blocks until the threads in the pool have completed. If stop() is not called prior to join(), the join() call will wait until the pool has no more outstanding work.


Цитировать
Если нам нужно завершить программу, уже все равно остались задачи в очереди или нет.
Это да, но есть ситуации, когда нам и не нужно, в принципе, футуру: просто закинули задачу в пул и забыли :)

Цитировать
Кстати, чтобы не плодить проверок, я делаю проверку в нитках чуть по другому. Когда доберусь до компьютера покажу как.

Ok) спасибо)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 14:08
По хорошему join дожидается, когда реально завершатся рабочие нитки пула.
У меня в конструкторе пула нитки запускаются, в деструкторе завершаются и joinом контролируется завершение. А все время жизни пула нитки активны или ждут работу.
А вот завершение конкретной задачи можно ждать по разному: футурами, callbackами или просто на них забить. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 24, 2019, 14:12
Если сразу после заполнения очереди вызвать join(), m_is_stop == true и в очереди могут остаться невыполненные задачи.

Выходить из цикла мы должны только при совместном выполнении условий: когда и очередь пуста и флаг m_is_stop == true.
Это скорее проблема невнятных имен. Stop - это что, "приостановить" (тогда лучше "pause") или "принудительно завершить" (тогда лучше "abort"). Но у Вас это ни то ни другое, это wait_for_done. Ну а что делать если считает и считает? "Из ризетки" выдергивать, что ли?  :) Abort должен быть, join ниток может лучше в деструктор, паузу можно добавить хотя особой нужды в ней нет.

Стыдно на такие грабли было наступить)
Профессионалам это чувство чуждо  :)

В аттаче пример из справочника, он зациклен и виснет тоже на joint. Я добавил небольшую задержку, хотя виснет и без нее если погонять побольше. Возможно это натолкнет на некоторые размышления  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 17:04
Как я делаю ожидание в рабочих нитках:
Код
C++ (Qt)
 void thread_run()
 {
   for(;;)
   {
     std::unique_lock<std::mutex> locker( m_mutex );
     for(;;)
     {
       if( m_stop )
         return;
       else if( !m_tasks.empty() )
         break;
       m_cond.wait( locker );
     }
 
     Task task = m_tasks.front();
     m_tasks.pop();
 
     locker.unlock();
 
     if( task )
       task();
   }
 }
 

Завершение ниток:
Код
C++ (Qt)
 ~ThreadPool()
 {
   {
     std::unique_lock<std::mutex> locker( m_mutex );
     m_stop = true;
   }
   m_cond.notify_all();
 
   for( auto &it : m_threads )
     if( it.joinable() )
       it.join();
 }
 

Устанавливать m_stop в true нужно обязательно под мьютексом, а вот сигналить лучше уже без него.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 19:58
Цитировать
В аттаче пример из справочника, он зациклен и виснет тоже на joint. Я добавил небольшую задержку, хотя виснет и без нее если погонять побольше. Возможно это натолкнет на некоторые размышления   :)
Вы этот пример привели как иллюстрацию, как делать не надо? :)
Eсли закоментить все sleep_for'ы consumer может никогда и не добраться до очереди produced_nums  :(

Цитировать
А вот завершение конкретной задачи можно ждать по разному: футурами, callbackами или просто на них забить.  :)

Ну т.е. если мыслить в таком ключе, то join() вообще нафиг не нужен?) В смысле делать его методом класса  особого резона и нет?)

Цитировать
Как я делаю ожидание в рабочих нитках:
Ага, понял)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 24, 2019, 20:20
Блин, случайно отредактировал этот пост, вместо цитирования. Придется набирать заново. :)

Устанавливать m_stop в true нужно обязательно под мьютексом, а вот сигналить лучше уже без него.
Хочу более подробно расписать почему так думаю.

Устанавливать m_stop необходимо обязательно под мьютексом.
Потому что может возникнуть ситуация, когда рабочая нить уже прошла проверку флага m_stop, но еще не уснула на wait, тогда установка m_stop и сигнал из другого потока никакого эффекта не вызовут - нитка уснет на wait.
А если устанавливать m_stop мод захваченным мьютексов, то нить завершающая пул остановиться на захвате мьютекса, пока рабочая нить его не освободит, уснув на wait.

А сигналить лучше без захваченного мьютекса, потому что на многоядерных системах планировщик сразу пытается запустить ожидающую (на wait) нить на свободном ядре при сигнале условной переменной.
Если сигналить при захваченном мьютексе, то разбуженная нить попытается захватить мьютекс, но он захвачен сигнальной ниткой. Рабочая нить уснет в ожидание мьютекса и проснется опять после его освобождения.
Т.е. будет дополнительная усыпление рабочей нитки.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 20:35
Всё ясно, спасибо)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 24, 2019, 22:41
Old, я правильно понимаю, что если я всё же хочу заиметь метод wait_for_all (который блокирует поток, где он был вызван и ждёт пока все рабочие нитки опустошат очередь) то нужна ещё одна condition_variable? Вот что у меня сейчас получилось (вроде работает :)):
Код
C++ (Qt)
class thread_pool
{
public:
   explicit thread_pool(size_t num_threads = std::max(size_t(1), size_t(std::thread::hardware_concurrency())))
       : m_stop(false)
   {
       for (size_t i = 0; i < num_threads; ++i)
           m_threads.emplace_back(&thread_pool::worker, this);
   }
 
   thread_pool(const thread_pool &) = delete;
   thread_pool(thread_pool &&) = delete;
 
   thread_pool & operator=(const thread_pool &) = delete;
   thread_pool & operator=(thread_pool &&) = delete;
 
   ~thread_pool()
   {
       {
           std::lock_guard<std::mutex> locker(m_mutex);
           m_stop = true;
       }
 
       m_cond.notify_all();
 
       for (auto & th : m_threads)
       {
           if (th.joinable())
               th.join();
       }
   }
 
   void wait_for_all()
   {
       std::unique_lock<std::mutex> locker(m_mutex);
       m_cond_wait.wait(locker, [&]() { return m_queue.empty(); });
   }
 
   template<class F>
   auto add_task(F && f)->std::future<decltype(f())>
   {
       using R = decltype(f());
 
       auto task_ptr = std::make_shared<std::packaged_task<R()>>(std::forward<F>(f));
 
       {
           std::lock_guard<std::mutex> locker(m_mutex);
           m_queue.emplace( [=]() { (*task_ptr)(); } );
       }
 
       m_cond.notify_one();
 
       return task_ptr->get_future();
   }
 
private:
   std::atomic_bool m_stop;
   std::list<std::thread> m_threads;
   std::queue<std::function<void()>> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_cond;
   std::condition_variable m_cond_wait;
 
   void worker()
   {
       for(;;)
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           m_cond.wait(locker, [&]()
           {
               return !m_queue.empty() || m_stop;
           });
 
           if (m_stop)
               return;
 
           std::function<void()> task(m_queue.front());
           m_queue.pop();
 
           locker.unlock();
 
           if (task)
               task();
 
           m_cond_wait.notify_one(); // <== Эта условная переменная для wait_for_all
       }
   }
};
 
Или нафиг его выпилить вообще?)

update: Нет, этот метод ждёт только пока очередь не пуста, но не момент когда все нитки отработали..(

update2: Вобщем я его всё же выпилил.. Не нужно усложнять то, чего того не требует..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 25, 2019, 06:02
m_ax, по моему мнению, такой метод у пула может понадобится в простой консольной программе (запустил - подождал - вышел), но если понадобится чуть расширить функционал, например, добавить сетевое взаимодействие, то тут же появляться более полезное применение запускающей нитке. :)
Поэтому, заморочиться с таким методом можно, но реальных юзкейсов у него будет не много.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 25, 2019, 06:23
А вот сигналить лучше с уже отпущенным мьютексом.
Вспомнил еще один аргумент в пользу этого тезиса. :)
Утилизация горячего воркера. Нам очень выгодно, если нитка выполнив одну работу, делает цикл и забирает следующую без пауз на мьютексе/условной переменной.
Если мы сигналим под мьютексом, то нитка которая только что освободилась не сможет захватить мьютекс и достать новую задачу.
Произойдет следующее: горячая нитка уткнется в мьютекс, шедулер разбудит холодную нитку (ждущую на wait), начнется гонка за мьютекс и может получиться так, что горячая нитка не спеет забрать новую работу и будет успылена на wait, а разбуженная пойдет работать.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 27, 2019, 15:10
Ну так что, все сидим на мешках с футурами? А ведь можно обойтись одной (аттач). Правда мелкоту масштабит слабенько, на с честного мутекса много не взять.Также пытался починить QThreadPool, задумка была не вызывать его waitForDone, а задействовать как дустовский пул. Без особого успеха :'( На одних тестах лучше (хотя все равно плохо), на др даже хуже.

И все-таки наверно дуст прав что не предоставляет метода waitForDone - нездоровая нагрузка если планируется универсальный пул. Если такой ф-ционал нужен - проще обеспечить его "через задачу".

И да, set_value действительно может сработать дважды! Ну кому интересно разберется, не буду кайф ломать :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 27, 2019, 16:09
на с честного мутекса много не взять.
Так вы все обещали показать чудо-решение на атомиках, но не получается, хотя все уже готово. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 28, 2019, 00:57
Цитировать
И все-таки наверно дуст прав что не предоставляет метода waitForDone - нездоровая нагрузка если планируется универсальный пул. Если такой ф-ционал нужен - проще обеспечить его "через задачу".
Да, согласен) Но это можно реализовать гораздо более элегантней)
Код
C++ (Qt)
class wrapper_pool
{
public:
   wrapper_pool(specmath::thread_pool & pool)
       : m_pool(pool), m_task_count(0)
   {}
 
   template <class F>
   auto add_task(F && task)->std::future<decltype(task())>
   {
       std::lock_guard<std::mutex> locker(m_mutex);
       ++m_task_count;
       return m_pool.add_task(std::bind(&wrapper_pool::wrapper_task, this, std::forward<F>(task)));
   }
 
   void wait_for_all()
   {
       std::unique_lock<std::mutex> locker(m_mutex);
       m_cond.wait(locker, [&]()
       {
           return !m_task_count;
       });
   }
 
private:
   specmath::thread_pool & m_pool;
   std::atomic_int m_task_count;
   std::mutex m_mutex;
   std::condition_variable m_cond;
 
   void wrapper_task(const std::function<void()> & task)
   {
       if (task)
           task();
 
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0)
       {
           locker.unlock();
           m_cond.notify_one();
       }
   }
};
 

По поводу тестов. Во-первых, они не совсем корректны - у Вас однопоточный режим (Testing single thread) обгоняет QThreadPool (Testing QThreadPool), что уже всё это сравнение делает весьма спорным.
Во-вторых, самым показательным было бы сравнить Вашу реализацию CSimplePool с методом WaitForDone с реализацией без оного, но с реализацией его   "через задачу", например, вариант выше.
А так идея понятна)
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 28, 2019, 08:56
Но это можно реализовать гораздо более элегантней)
Моя задумка была обойтись без счетчика задач, т.е. сохранить весь ф-ционал пула но добавить WaitForDone. Со счетчиком жить легче, Ваша реализация подлиннее зато "не интрузивна", в чем есть смысл
Код
C++ (Qt)
   void wrapper_task(const std::function<void()> & task)
   {
       if (task)
           task();
 
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0)
       {
           locker.unlock();
           m_cond.notify_one();
       }
   }
};
 
Брать еще мутекс для каждой выполняемой задачи - ну так мы признаем что делаем лишь для "хороших" задач. Брать мутекс надо после атомарного декремента, это случится редко. А поскольку мутексу некого засисять то он здесь вообще не нужен. Также и в методе add_task - еще мутекс, ведь у m_pool есть свой. Оно конечно неопасно поскольку обычно главная набивает пуд, но все-таки нехорошо.

По поводу тестов. Во-первых, они не совсем корректны - у Вас однопоточный режим (Testing single thread) обгоняет QThreadPool (Testing QThreadPool), что уже всё это сравнение делает весьма спорным.
Но это совсем не вина тестов :) Там есть TASK_WAIT, попробуйте его увеличить, все станет хорошо с QThreadPool. Но на мелких задачках ужасный провал (ото видать насовали мутексов). А юзать QThreadPool "с оглядкой" страшно неудобно. Более того, выясняется что даже простейшая самопальная реализация пула ведет себя гораздо лучше. Вот вам и "готовые проверенные"  :'(

Кстати а Вашу "футуристическую" реализацию не хотите прогнать на этих задачках?  :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 28, 2019, 09:29
Брать еще мутекс для каждой выполняемой задачи - ну так мы признаем что делаем лишь для "хороших" задач. Брать мутекс надо после атомарного декремента, это случится редко. А поскольку мутексу некого засисять то он здесь вообще не нужен.
Разбирались-разбирались... :)

А без мютекса wait_for_all может никогда не завершиться.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 28, 2019, 11:15
Цитировать
Моя задумка была обойтись без счетчика задач, т.е. сохранить весь ф-ционал пула но добавить WaitForDone.
Ну ну.. А mInCalc это тогда что такое? Не счётчик ли :)




Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 28, 2019, 12:57
Цитировать
Кстати а Вашу "футуристическую" реализацию не хотите прогнать на этих задачках?   :)
Прогнал) Одинаково) (см. аттач)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 28, 2019, 18:12
Ну ну.. А mInCalc это тогда что такое? Не счётчик ли :)
Счетчик, да не тот - это всего лишь кол-во задач которые сейчас выполняются. Об общем числе задач этот вариант понятия не имеет, он отлавливает ситуацию когда "очередь пуста" + "нет выполняющихся задач". На первый взгляд это совсем просто

Тут я немного попутал
Код
C++ (Qt)
    template <class F>
   auto add_task(F && task)->std::future<decltype(task())>
   {
       std::lock_guard<std::mutex> locker(m_mutex);
       ++m_task_count;
       return m_pool.add_task(std::bind(&wrapper_pool::wrapper_task, this, std::forward<F>(task)));
   }
Это здесь мутекс не нужен. Атомарный счетчик корректно инкременируется/декрементируется любым числом ниток. Из этого часто делается вывод типа "если атомик то мутекс не нужен". Это обычно не так, но в данном случае верно :) Разумеется полагаем что у m_pool есть свой мутекс для защиты очереди


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 28, 2019, 18:24
Цитировать
Счетчик, да не тот - это всего лишь кол-во задач которые сейчас выполняются. Об общем числе задач этот вариант понятия не имеет, он отлавливает ситуацию когда "очередь пуста" + "нет выполняющихся задач". На первый взгляд это совсем просто
Это я понял  :) Но это, тем не менее, счётчик)

Цитировать
Это здесь мутекс не нужен.
Это почему это он там не нужен? Он нужен и old выше уже объяснял почему необходимо лочить этот каунтер, дажеесли он атомарен..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 28, 2019, 18:24
Кстатии, а Вы проверяли тесты с футурами и без них? :) Я проверял  :)

update: Там в аттаче небольшая ошибка с wrapper_pool, которая, однако, никак не влияет на результаты.. Правильнее реализовать его так (чтоб любые футуры получать):
Код
C++ (Qt)
class wrapper_pool
{
public:
   wrapper_pool(specmath::thread_pool & pool)
       : m_pool(pool), m_task_count(0)
   {}
 
   template <class F>
   auto add_task(F && task)->std::future<decltype (task())>
   {
       using R = decltype (task());
 
       std::lock_guard<std::mutex> locker(m_mutex);
       ++m_task_count;
       return m_pool.add_task(std::bind(&wrapper_pool::wrapper_task<R>, this, std::forward<F>(task)));
   }
 
   void wait_for_all()
   {
       std::unique_lock<std::mutex> locker(m_mutex);
       m_cond.wait(locker, [&]()
       {
           return !m_task_count;
       });
   }
 
private:
   specmath::thread_pool & m_pool;
   std::atomic_int m_task_count;
   std::mutex m_mutex;
   std::condition_variable m_cond;
 
   template <class R>
   R wrapper_task(const std::function<R()> & task)
   {
       auto res = task();
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           if (--m_task_count == 0)
           {
               locker.unlock();
               m_cond.notify_one();
           }
       }
       return res;
   }
};
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 29, 2019, 06:02
Это почему это он там не нужен?
Пусть счетчик был 0 и теперь, без всякой защиты, стал 1. В рез-те wait_for_all могло уснуть хотя должно было пробудиться. Но это ничем не грозит, когда-то счетчик обнулится и wait_for_all проснется.

Теперь здесь.
Код
C++ (Qt)
   template <class R>
   R wrapper_task(const std::function<R()> & task)
   {
       auto res = task();
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           if (--m_task_count == 0)
           {
               locker.unlock();
               m_cond.notify_one();
           }
       }
       return res;
   }
О том что лочить все подряд нехорошо мы уже говорили. Но здесь "защищать условие" нет смысла. Как только случился unlock, кто-то может сунуть задачу в очередь, m_task_count станет ненулевым, в рез-те wait_for_all не уснет или продолжит спать. Ситуация та же что и в первом случае - если пул пополняется со стороны, то можно потерять моменты когда он был пуст. Правда это ничем особым не грозит. Лучше так
Код
C++ (Qt)
   template <class R>
   R wrapper_task(const std::function<R()> & task)
   {
       auto res = task();
       if (--m_task_count == 0)
           std::unique_lock<std::mutex> locker(m_mutex);
           if (m_task_count == 0)
            m_cond.notify_one();
       }
       return res;
   }
Если пул хорошо "накормлен", то обнуление - случай редкий. Стандартный прием "двойная проверка" (за время когда брали мутекс все могло измениться).

Да, и опять забыл! Там небольшая помарка: в случае abort нужно отпустить ожидающего


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 29, 2019, 08:22
Кстатии, а Вы проверяли тесты с футурами и без них? :) Я проверял  :)
Да уж  :'(

1) Контрольная сумма не бьется, где-то насвистели (наверное с рандомом)
2) WEIGHT_TASK = 10000, т.е. опять "только хорошая задача"
3) Проходов - аж один, и все раунды одинаковы, имитировать гранулярность не нужно

Ну ладно, изменил печать и увеличил число проходовю Ставим WEIGHT_TASK = 64
Цитировать
pass 0
with std::futures<double>  result = -1.00525e+06 total time (ms) = 4299
with wait_for_all  result = -1.00526e+06 total time (ms) = 6061

pass 1
with std::futures<double>  result = -1.00517e+06 total time (ms) = 3883
with wait_for_all  result = -1.00547e+06 total time (ms) = 5927

pass 2
with std::futures<double>  result = -1.00534e+06 total time (ms) = 3781
with wait_for_all  result = -1.0053e+06 total time (ms) = 5817

pass 3
with std::futures<double>  result = -1.00513e+06 total time (ms) = 3962
with wait_for_all  result = -1.0052e+06 total time (ms) = 5824

End
Смотрите, как быстрее с футурой! (я же говорил!) и.т.п.  :) Ладно, теперь уберем бездумно насаженные Вами мутексы (там я поставил define в хедере). Имеем
Цитировать
pass 0
with std::futures<double>  result = -1.00545e+06 total time (ms) = 4353
with wait_for_all  result = -1.00562e+06 total time (ms) = 3767

pass 1
with std::futures<double>  result = -1.00538e+06 total time (ms) = 4097
with wait_for_all  result = -1.00502e+06 total time (ms) = 3749

pass 2
with std::futures<double>  result = -1.0054e+06 total time (ms) = 3925
with wait_for_all  result = -1.00499e+06 total time (ms) = 3758

pass 3
with std::futures<double>  result = -1.00513e+06 total time (ms) = 3911
with wait_for_all  result = -1.00494e+06 total time (ms) = 3776

End
И выясняется что футуры чего-то весят, как оно и должно быть. И что лепить мутексы где ни попадя не так уж хорошо  :)

Изменения в аттаче


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 13:28
Цитировать
Пусть счетчик был 0 и теперь, без всякой защиты, стал 1. В рез-те wait_for_all могло уснуть хотя должно было пробудиться. Но это ничем не грозит, когда-то счетчик обнулится и wait_for_all проснется.

Цитировать
Ситуация та же что и в первом случае - если пул пополняется со стороны, то можно потерять моменты когда он был пуст. Правда это ничем особым не грозит. Лучше так

Чтож.. Убедили, согласен  :)

Правда у меня Ваши тесты показывают +- одинаковые результаты
Код
C++ (Qt)
const size_t WEIGHT_TASK = 64;
const size_t NUM_TASKS = 1024 * 1000;
const size_t NUM_PASSES = 8;
 
#define M_AX 0
 

Вывод:
Код
Bash
pass 0
with std::futures<double>  result = -52.8008 total time (ms) = 5184
with wait_for_all  result = -156.814 total time (ms) = 5220
 
pass 1
with std::futures<double>  result = -66.3582 total time (ms) = 4875
with wait_for_all  result = -16.7101 total time (ms) = 5256
 
pass 2
with std::futures<double>  result = 209.773 total time (ms) = 4785
with wait_for_all  result = 137.421 total time (ms) = 5227
 
pass 3
with std::futures<double>  result = -109.539 total time (ms) = 4818
with wait_for_all  result = -311.828 total time (ms) = 5272
 
pass 4
with std::futures<double>  result = 64.4322 total time (ms) = 4721
with wait_for_all  result = -55.3244 total time (ms) = 5272
 
pass 5
with std::futures<double>  result = 141.65 total time (ms) = 4828
with wait_for_all  result = 272.018 total time (ms) = 5184
 
pass 6
with std::futures<double>  result = 7.6451 total time (ms) = 5081
with wait_for_all  result = -56.6884 total time (ms) = 5194
 
pass 7
with std::futures<double>  result = -171.047 total time (ms) = 4989
with wait_for_all  result = -130.11 total time (ms) = 5253
 
End
 

Цитировать
1) Контрольная сумма не бьется, где-то насвистели (наверное с рандомом)
Да, насвистел.. Во-первых здесь:
Код
C++ (Qt)
for (size_t i = 0; i < data.size(); ++i)
   {
       double val = dist(gen);
       data[i] = val * 2 - 1.0;
   }
 
нужно заменить на:
Код
C++ (Qt)
for (size_t i = 0; i < data.size(); ++i)
   {
       data[i] = dist(gen);
   }
 
а во-вторых, при таком WEIGHT_TASK = 64 такие флуктуации совершенно нормальны..

Всё же лучше так:
Код
C++ (Qt)
void wrapper_task(const std::function<void()> & task)
   {
       if (task)
           task();
 
#if M_AX
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0) {
           locker.unlock();
           m_cond.notify_one();
       }
   #else
       if (std::atomic_fetch_sub(&m_task_count, 1) == 1)
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           if (m_task_count == 0)
           {
               locker.unlock(); // <== делаем unlock перед пробудкой..
               m_cond.notify_one();
           }
       }
   #endif
   }
 


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 14:23
Вообще, в идеале, хотелось бы реализовать идею cpp-taskflow https://github.com/cpp-taskflow/cpp-taskflow (https://github.com/cpp-taskflow/cpp-taskflow)
Понравилась мне она очень  :)

Так, чисто для удовлетворения своих эстетических чувств) К тому же я могу себе это позволить)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 29, 2019, 15:48
m_ax, а какой выигрыш принесли последние исправления? ;)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 15:59
m_ax, а какой выигрыш принесли последние исправления? ;)

Ну на моём железе, особо, никакой) Я бы вообще бы сказал, что никакой  :)

Ну там и тест ещё весьма спорный.. Едва ли в реальных задачах так делать  будут..

update: на мой сугубо взгляд, конечно


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 29, 2019, 17:50
Ну на моём железе, особо, никакой) Я бы вообще бы сказал, что никакой  :)
Если вы замерите, сколько раз рабочие нитки упирались в заблокированный мьютес, то увидите, на сколько это редкий случай. У меня на машине для 10000 задач блок случался от 10 до 20 раз. И все эти ухищрения с двойными проверками не принесут даже 1% прироста. :)

Плюс современные мьютексы на основных платформах делают на футексах, которые при захвате свободного мьютекса бесплатны (это изменение атомарной переменной), дальше в зависимости от реализации может покрутиться  spin-lock, а только потом будет syscall в ядро на wait futex'а.


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 20:31
Цитировать
Если вы замерите, сколько раз рабочие нитки упирались в заблокированный мьютес, то увидите, на сколько это редкий случай. У меня на машине для 10000 задач блок случался от 10 до 20 раз. И все эти ухищрения с двойными проверками не принесут даже 1% прироста. :)
Вы имеете в виду ситуацию пробки? Когда более одной нитки ждут на мьютексе?


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 29, 2019, 20:37
да


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 20:47
да
Ну я, специально, этот случий не расматривал.. Хотя, в общем, это не сложно посмотреть.. (Посмотрю)

Здесь, как я понял, весь затык в том, что когда задачи становятся "дешёвыми" и их много, разница в подходах с футурами и этим методом wait_for_all нивилируется, а то и последний вариант работает быстрей.. Но это, на мой взгляд, проблемы самой реализации распараллеливания  :)

Не будет нормальный человек так бездумно кидать задачи в пул)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 20:59
Т.е. для каждой конкретной задачи, должен ставится вопрос:  а нужно ли её распараллеливать, и если нужно, то как наиболее эффективно..


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 29, 2019, 21:21
Ну я, специально, этот случий не расматривал.. Хотя, в общем, это не сложно посмотреть.. (Посмотрю)
Я хотел сказать, что в подавляющем большинстве случаев мьютекс будет свободен.
Ваши "оптимизации" с двойной проверкой могут стоить дороже вашего первоначального варианта. Точнее в варианте с ожиданием завершения всех задач это не будет иметь роли, а вот если такие "оптимизации" применять в постоянно исполняемом коде, то может быть медленнее.

Современный мьютекс это атомарная переменная и пытаться ускорить ее использованием другой атомарной операции не всегда удается. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: m_ax от Сентябрь 29, 2019, 22:55
Цитировать
Я хотел сказать, что в подавляющем большинстве случаев мьютекс будет свободен.
Что значит в подавляющем большинстве случаев?

Да, я согласен, что если вот так, как в предложенных тестах, кидать "мелкие" задачи, то мы неменуемо свалимся в  bottlen neck эффект (эффукт узкого горла - пробка)
Ну даже тут вариант с футурами проходит  :)

Я не хочу в продакшен выкидывать wrapper_pool, поскольку оно спорно..(Но это обсуждаемо :))


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Igors от Сентябрь 30, 2019, 04:28
       if (std::atomic_fetch_sub(&m_task_count, 1) == 1)
Я пользуюсь атомарным инк(дек)рементом уже не один десяток лет. Не верите - ну страхуйтесь :) Вспомнилось неплохое объяснение типа:

Атомарная арифметика оперирует/возвращает "моментальные фотографии". Т.е. то что "изображено на фото" - правда, реальный факт который "имел место". НО когда мы получили это "фото" мы должны иметь ввиду что это "уже в прошлом" и могли произойти др события. На первый взгляд в этом нет смысла - ведь возвращенному значению все равно "нельзя верить", оно могло уже стать иным. Но "факт" можно использовать как в примере выше. И факты эти случаются строго один за одним (за счет отой "ordered memory semantic"), а вот какая нитка получит какое фото - уже как фишка ляжет.

Ну даже тут вариант с футурами проходит  :)
Да какой там "вариант", просто "так заработало" :) Зачем нужен контейнер если можно обойтись счетчиком? 
Цитировать
- А почему Вы ищете только под фонарем?
- Потому что там светло
Да, я согласен, что если вот так, как в предложенных тестах, кидать "мелкие" задачи, то мы неменуемо свалимся в  bottlen neck эффект (эффукт узкого горла - пробка)
Дело не только в "мелких задачах". Вы набиваете пул по самые "не балУйся" - но это "хороший" случай, а в жизни часто не так - напр нужно нужно сначала посчитать всего с десяток задач, "осмотреться", потом еще десяток-другой и.т.д. (в тестах что я выкладывал это есть). Здесь постоянное усыпление/пробуждение ниток обходится дорого.

Т.е. для каждой конкретной задачи, должен ставится вопрос:  а нужно ли её распараллеливать, и если нужно, то как наиболее эффективно..
Да-да-да :) В жизни происходит примерно так

- во, тут превью совсем дохнет, 2-3 fps. Ладно, профайлим - ага, жрется на пересчете нормалей. Отключаем это место - во, 70 fps. На GPU не перенести. А "задача" здесь - получить 2 вектора вычитанием и записать в рез-т векторное произведение. До того atan2 там ой далеко. Ну и понеслась...

Вообще, в идеале, хотелось бы реализовать идею cpp-taskflow https://github.com/cpp-taskflow/cpp-taskflow (https://github.com/cpp-taskflow/cpp-taskflow)
Понравилась мне она очень  :)

Так, чисто для удовлетворения своих эстетических чувств) К тому же я могу себе это позволить)
А конкретнее? Те картинки мне, правду сказать, и нафиг не нужны. А если что интересное - поддержу обсуждение, только тему новую создайте


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 30, 2019, 05:40
Я не хочу в продакшен выкидывать wrapper_pool, поскольку оно спорно..(Но это обсуждаемо :))
m_ax, по моему мнению нет особого смысла в подобных "оптимизациях":
Код
C++ (Qt)
#if M_AX
       std::unique_lock<std::mutex> locker(m_mutex);
       if (--m_task_count == 0) {
           locker.unlock();
           m_cond.notify_one();
       }
   #else
       if (std::atomic_fetch_sub(&m_task_count, 1) == 1)
       {
           std::unique_lock<std::mutex> locker(m_mutex);
           if (m_task_count == 0)
           {
               locker.unlock(); // <== делаем unlock перед пробудкой..
               m_cond.notify_one();
           }
       }
   #endif
 

Это либо ничего не ускорит, как в данном случае, а может и ухудшить.
Я это хотел сказать. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 30, 2019, 05:43
Я пользуюсь атомарным инк(дек)рементом уже не один десяток лет.
Я думаю сотни. :)


Название: Re: К вопросу об организации взаимодействия пула производителей и одного потребителя
Отправлено: Old от Сентябрь 30, 2019, 06:01
Я не хочу в продакшен выкидывать wrapper_pool, поскольку оно спорно..(Но это обсуждаемо :))
Этот враппер можно положить в примеры. :)

По поводу "толстоты" future... :)
future это указатель на приватную структуру promise, т.е. если вы используете packaged_task, то вы их и так создаете. :)
Внутри promise использует тот же атомик + futex, т.е. весит она атомарную переменную + хранилку для результата (если он есть).

Ну а дальше сами (или пользователи пула) решайте, хранить массив futur (массив указателей) или нет. :)