Russian Qt Forum
Ноябрь 22, 2024, 22:52 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1] 2   Вниз
  Печать  
Автор Тема: parallel transform (параллелим алгоритмы)  (Прочитано 13832 раз)
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« : Ноябрь 05, 2013, 21:39 »

Приобщаясь к параллелизму написал (в рамках С++11) один пользующейся популярностью (во всяком случае у меня) алгоритм transform - аналог std::transform, но выполняющийся параллельно.
Возможно это велосипед.. Знаю про существование Intel TBB, VC++ PPL, OpenMP но.. просто было интересно)

Код
C++ (Qt)
#ifndef PARALLEL_H
#define PARALLEL_H
 
#include <algorithm>
#include <thread>
#include <memory>
#include <iterator>
#include <vector>
 
struct parallel
{
   template<class InputIt, class OutputIt, class UnaryOperation>
   static OutputIt transform(InputIt ifirst, InputIt ilast, OutputIt ofirst, UnaryOperation unary_op) {
 
       const size_t size = std::distance(ifirst, ilast);
       const size_t nthreads = std::min(size, static_cast<size_t>(std::thread::hardware_concurrency()));
       if (nthreads < 2)
           return std::transform(ifirst, ilast, ofirst, unary_op);
 
       const size_t group = size/nthreads;
 
       std::vector<std::shared_ptr<std::thread>> threads(nthreads-1);
       auto it = ifirst;
 
       for (auto & thread_ptr : threads) {
           std::advance(it, group);
           thread_ptr = std::make_shared<std::thread>([=]{ std::transform(ifirst, it, ofirst, unary_op); });
           ifirst = it;
           std::advance(ofirst, group);
       }
 
       for (auto & t : threads) { t->join(); }
 
       return std::transform(ifirst, ilast, ofirst, unary_op);
   }
 
   template<class InputIt1, class InputIt2, class OutputIt, class BinaryOperation>
   static OutputIt transform(InputIt1 ifirst1, InputIt1 ilast1, InputIt2 ifirst2, OutputIt ofirst, BinaryOperation binary_op) {
 
       const size_t size = std::distance(ifirst1, ilast1);
       const size_t nthreads = std::min(size, static_cast<size_t>(std::thread::hardware_concurrency()));
       if (nthreads < 2)
           return std::transform(ifirst1, ilast1, ifirst2, ofirst, binary_op);
 
       const size_t group = size/nthreads;
 
       std::vector<std::shared_ptr<std::thread>> threads(nthreads-1);
       auto it = ifirst1;
 
       for (auto & thread_ptr : threads) {
           std::advance(it, group);
           thread_ptr = std::make_shared<std::thread>([=]{ std::transform(ifirst1, it, ifirst2, ofirst, binary_op); });
           ifirst1 = it;
           std::advance(ifirst2, group);
           std::advance(ofirst, group);
       }
 
       for (auto & t : threads) { t->join(); }
 
       return std::transform(ifirst1, ilast1, ifirst2, ofirst, binary_op);
   }
};
 
#endif // PARALLEL_H
 

тест:

Код
C++ (Qt)
#include <iostream>
#include <vector>
#include <cmath>
#include <chrono>
 
#include "parallel.h"
 
double func(const double & x) {
   return sin(x)*cos(x*x)*exp(-x*x*0.5)/(1.0 + x*x);
}
 
int main()
{
   const size_t N = 10000000;
 
   std::vector<double> v1(N, 2.0);
   std::vector<double> v2(v1);
 
   auto start = std::chrono::high_resolution_clock::now();
   parallel::transform(v1.begin(), v1.end(), v2.begin(), func);
   auto stop = std::chrono::high_resolution_clock::now();
 
   auto par_duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop-start).count();
 
   start = std::chrono::high_resolution_clock::now();
   std::transform(v1.begin(), v1.end(), v2.begin(), func);
   stop = std::chrono::high_resolution_clock::now();
 
   auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop-start).count();
 
   std::cout << "par: " << par_duration << std::endl;
   std::cout << "seq: " << seq_duration << std::endl;
 
   return 0;
}
 
 

Критика приветствуется)


 
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
b-s-a
Гость
« Ответ #1 : Ноябрь 05, 2013, 23:49 »

1. вместо struct лучше использовать namespace
2. порядок записи результатов в выходной итератор неопределен. в первую очередь это касается ostream_iterator и back_insert_iterator. Поэтому тебе необходимо использовать ForwardIterator и сделать проверку на его использование.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #2 : Ноябрь 06, 2013, 01:38 »

2. порядок записи результатов в выходной итератор неопределен. в первую очередь это касается ostream_iterator и back_insert_iterator. Поэтому тебе необходимо использовать ForwardIterator и сделать проверку на его использование.
Не понял почему он не определен если каждая нитка пишет в свой участок?

- если size не кратен nthreads, то последние эт-ты не обрабатываются
- запуск нитки - удовольствие дорогое, поэтому заводят все нитки сразу, а в нужный момент "спускают с цепи"
- пока считается - все заморожено (запускающий стоит на join)
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #3 : Ноябрь 06, 2013, 10:04 »

1. вместо struct лучше использовать namespace
2. порядок записи результатов в выходной итератор неопределен. в первую очередь это касается ostream_iterator и back_insert_iterator. Поэтому тебе необходимо использовать ForwardIterator и сделать проверку на его использование.

1) Да, возможно) Тож думал над этим)

2) У меня аналогичный вопрос, что и у igors
Цитировать
Не понял почему он не определен если каждая нитка пишет в свой участок?


- если size не кратен nthreads, то последние эт-ты не обрабатываются
- запуск нитки - удовольствие дорогое, поэтому заводят все нитки сразу, а в нужный момент "спускают с цепи"
- пока считается - все заморожено (запускающий стоит на join)

1) Неправда, обрабатываются в любом случае.. (как раз последним std::transform из главной нитки)

2) А в чём разница между заводят и запускают?)

3) Именно так.. А для таких алгоритмов это кажется не логичным?
Немного исправил: (сейчас последний трансформ из запускающей нитки запускается до join пораждённых потоков.)
Код
C++ (Qt)
...
   ofirst = std::transform(ifirst, ilast, ofirst, unary_op);
 
   for (auto & t : threads) { t->join(); }
 
   return ofirst;
 
 



Спасибо)
« Последнее редактирование: Ноябрь 06, 2013, 12:31 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #4 : Ноябрь 06, 2013, 14:13 »

1) Неправда, обрабатываются в любом случае.. (как раз последним std::transform из главной нитки)
Давайте смотреть
Код
C++ (Qt)
       const size_t group = size/nthreads;
 
       std::vector<std::shared_ptr<std::thread>> threads(nthreads-1);
       auto it = ifirst;
 
       for (auto & thread_ptr : threads) {
           std::advance(it, group);
           thread_ptr = std::make_shared<std::thread>([=]{ std::transform(ifirst, it, ofirst, unary_op); });
           ifirst = it;
           std::advance(ofirst, group);
       }
 
Пусть size = 7, nthreads = 4. Запускаете 3 нитки (почему если есть 4 ядра?). group = 1, цикл выполняется 3 раза, обработано 3 (из 7), или как?

2) А в чём разница между заводят и запускают?)
Не "запускают" а "спускают", т.е. снимают с мутекслв. Причем мутексы комбинированные честный/нечестный. Напр после выполнения расчетов нитки усыпляются не сразу, а какое-то время работают вхолостую (а вдруг сейчас еще расчет). Длительность этой прокрутки значительна, по умолчанию 0.3 секунды(!) в OpenMP. А запуск - это тонна работы для ОС

3) Именно так.. А для таких алгоритмов это кажется не логичным?
Для любых. Никто не против "lite" веши, пусть есть многочисленные аналоги. Но выходит "ни богу свечка ни черту кочерга". Для маленьких рачсетов - запуск сожрет все, а если расчет идет секунды - ни отменить, ни обновить UI.
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #5 : Ноябрь 06, 2013, 14:38 »

Цитировать
Давайте смотреть
Пусть size = 7, nthreads = 4. Запускаете 3 нитки (почему если есть 4 ядра?). group = 1, цикл выполняется 3 раза, обработано 3 (из 7), или как?
Запускается 3 нитки: в каждую передаются итераторы:
[0 1]
[1 2]
[3 4]
4-ая нитка та, в которой и был вызвана parallel::transform В неё передаются итераторы: [4 7]. Этот хвост всегда будет отработан)
В итоге алгоритм распределяется между всеми ядрами.

Цитировать
Для любых. Никто не против "lite" веши, пусть есть многочисленные аналоги. Но выходит "ни богу свечка ни черту кочерга". Для маленьких рачсетов - запуск сожрет все, а если расчет идет секунды - ни отменить, ни обновить UI.

И всё-таки для таких алгоритмов, как простой transform требовать нечто большее (вплоть до возможности прерывания и т.д.) как то неоправданно..
Согласен с тем, что для маленьких расчётов (то, что делает передаваемая функция) особого выйгрыша не будет, если не наоборот..   
Но если функция выполняется в среднем от 0.1 - 0.01 сек, то для меня результат уже ощутим, даже если размер контейнера порядка 100..   
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #6 : Ноябрь 06, 2013, 16:37 »

Запускается 3 нитки: в каждую передаются итераторы:
[0 1]
[1 2]
[3 4]
4-ая нитка та, в которой и был вызвана parallel::transform В неё передаются итераторы: [4 7]. Этот хвост всегда будет отработан)
В итоге алгоритм распределяется между всеми ядрами.
[4 8]. А чего же всем по одной а главной аж 4 задачи? Улыбающийся Это легко исправить
Код
C++ (Qt)
size_t need = 0, fact = 0;
for (size_t i = 0; i < nthread - 1; ++i) {
need = (i + 1) * num_Tasks / nthread;  // 1, 3, 5  
num = need - fact;   // 1, 2, 2
fact = need;
std::advance(it, num);
...  
}
 

Атомарная переменная (отслеживающая число выполненных задач) здесь была бы к месту. Также лучше добавить флажок m_master (true для запускающей). Тогда "деревья не кажутся столь уж большими", напр
Код
C++ (Qt)
double func(const double & x)
{
   double result = sin(x)*cos(x*x)*exp(-x*x*0.5)/(1.0 + x*x);
   ++atomic_count;
   if (m_master)
     if (!UpdateUI(atomic_count))
       SetAbortFlag();
}
Нагрузку (result =) лучше вынести "хвунктором" Улыбающийся Тут правда возникает мелкая проблема - главная закончила свои дела и ушла на join - заморозка. Ну это тоже решаемо
« Последнее редактирование: Ноябрь 06, 2013, 16:40 от Igors » Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #7 : Ноябрь 06, 2013, 17:43 »

[4 8]. А чего же всем по одной а главной аж 4 задачи? Улыбающийся Это легко исправить
Код
C++ (Qt)
size_t need = 0, fact = 0;
for (size_t i = 0; i < nthread - 1; ++i) {
need = (i + 1) * num_Tasks / nthread;  // 1, 3, 5  
num = need - fact;   // 1, 2, 2
fact = need;
std::advance(it, num);
...  
}
 

Согласен, но думаю, можно проще:
Код
C++ (Qt)
const size_t group = std::lround(double(size)/nthreads);
 

Цитировать
Атомарная переменная (отслеживающая число выполненных задач) здесь была бы к месту. Также лучше добавить флажок m_master (true для запускающей). Тогда "деревья не кажутся столь уж большими", напр
Ну это уже больше детали прикручивания алгоритма к конкретным задачам/ситуациям.. Конечно, в более сложных случаях может понадобиться и функтор с разделяемыми ресурсами и т.д.. )   

Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
b-s-a
Гость
« Ответ #8 : Ноябрь 06, 2013, 18:18 »

2. порядок записи результатов в выходной итератор неопределен. в первую очередь это касается ostream_iterator и back_insert_iterator. Поэтому тебе необходимо использовать ForwardIterator и сделать проверку на его использование.
Не понял почему он не определен если каждая нитка пишет в свой участок?
Господа, вы вообще знаете чем отличается OutputIterator от ForwardIterator?

Я подскажу, попробуйте код, в котором результат пишется в изначально пустой вектор с помощью std::back_inserter.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #9 : Ноябрь 06, 2013, 18:24 »

Согласен, но думаю, можно проще:
Код
C++ (Qt)
const size_t group = std::lround(double(size)/nthreads);
 
Что бы ни выдала std::lround - это константв, а здесь это не устраивает. Не тянитесь к std::рюмочке, это Вам только мешает  Плачущий

Господа, вы вообще знаете чем отличается OutputIterator от ForwardIterator?

Я подскажу, попробуйте код, в котором результат пишется в изначально пустой вектор с помощью std::back_inserter.
Я лично не имею ни малейшего понятия  Улыбающийся Если нетрудно, поясните. Спасибо
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #10 : Ноябрь 06, 2013, 18:35 »

Цитировать
Господа, вы вообще знаете чем отличается OutputIterator от ForwardIterator?

Я подскажу, попробуйте код, в котором результат пишется в изначально пустой вектор с помощью std::back_inserter.

Понятно) Да, согласен - надо запрещать такие ситуации) Исправлю)
Спасибо)

Цитировать
Что бы ни выдала std::lround - это константв, а здесь это не устраивает.
Подумаю..

Цитировать
Я лично не имею ни малейшего понятия
b-s-a имеет в виду, что вот такая ситуация:
Код
C++ (Qt)
std::vector<int> v1(N);
std::vector<int> v2; // Пустой!
parallel::transform(v1.begin(), v1.end(), std::back_inserter(v2), func);
 
может привести к непредсказуемым последствиям)
« Последнее редактирование: Ноябрь 06, 2013, 18:40 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #11 : Ноябрь 06, 2013, 19:28 »

b-s-a имеет в виду, что вот такая ситуация:
Код
C++ (Qt)
std::vector<int> v1(N);
std::vector<int> v2; // Пустой!
parallel::transform(v1.begin(), v1.end(), std::back_inserter(v2), func);
 
может привести к непредсказуемым последствиям)
То ясно, но так же и для любого тулза. Вообще нужно ли следовать подходу transform, может гибче и лучше дать возможность функтору самому решать куда писать результат
Код
C++ (Qt)
#pragma omp parallel for
for (int i = 0; i < src.size(); ++i)
DoCalc(src[i], dst[i]);
Записан
b-s-a
Гость
« Ответ #12 : Ноябрь 06, 2013, 22:18 »

Igros, а если тебе надо тупо вывести в поток через std::ostream_iterator? Как ты это сделаешь? Через дополнительный массив?
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #13 : Ноябрь 07, 2013, 10:38 »

Igros, а если тебе надо тупо вывести в поток через std::ostream_iterator? Как ты это сделаешь? Через дополнительный массив?
Да, придется использовать доп контейнер если "элементы вывода" считаются параллельно. Иначе порядлк вывода невоспроизводим
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2095



Просмотр профиля
« Ответ #14 : Ноябрь 07, 2013, 11:54 »

2. порядок записи результатов в выходной итератор неопределен. в первую очередь это касается ostream_iterator и back_insert_iterator. Поэтому тебе необходимо использовать ForwardIterator и сделать проверку на его использование.

Исправил. Поставил статик асерт с проверкой на принадлежность к forward iterator type:
Код
C++ (Qt)
typedef typename std::iterator_traits<OutputIt>::iterator_category iterator_category;
static_assert(std::is_base_of<std::forward_iterator_tag, iterator_category>::value,
                   "iterator must meet all the requirements for forward iterator");
 


Цитировать
Что бы ни выдала std::lround - это константв, а здесь это не устраивает. Не тянитесь к std::рюмочке, это Вам только мешает
Согласен, что вариант с lround - неправильный. Вот только стандартная библиотека здесь ни при чём..

Всё равно можно сделать это проще:
Полное число элементов всегда можно представить в виде:
size = m * (size/nthreads + 1) + (nthreads - m) * (size/nthreads)

m - это число нитей которые принимают интервал длиной size/nthreads + 1
(nthreads - m) - число нитей принимающих интервал длиной size/nthreads

Далее логика такая: сначала раздаём m потокам длинные интервалы, а всем оставшимся достаются нормальные size/nthreads интервалы.

Выглядит сейчас это так:
Код
C++ (Qt)
...
       const size_t normal_group = size/nthreads;
       const size_t m = size - normal_group * nthreads;
       size_t i = 0;
       for (auto & thread_ptr : threads) {
           size_t group = (++i <= m) ? normal_group + 1 : normal_group;
           std::advance(it, group);
           ...
       }
 
   
 
« Последнее редактирование: Ноябрь 07, 2013, 13:12 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Страниц: [1] 2   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.436 секунд. Запросов: 23.