C++ (Qt) void thread_run() { for(;;) { std::unique_lock<std::mutex> locker( m_mutex ); for(;;) { if( m_stop ) return; else if( !m_tasks.empty() ) break; m_cond.wait( locker ); } Task task = m_tasks.front(); m_tasks.pop(); locker.unlock(); if( task ) task(); } }
C++ (Qt) ~ThreadPool() { { std::unique_lock<std::mutex> locker( m_mutex ); m_stop = true; } m_cond.notify_all(); for( auto &it : m_threads ) if( it.joinable() ) it.join(); }
C++ (Qt)class thread_pool{public: explicit thread_pool(size_t num_threads = std::max(size_t(1), size_t(std::thread::hardware_concurrency()))) : m_stop(false) { for (size_t i = 0; i < num_threads; ++i) m_threads.emplace_back(&thread_pool::worker, this); } thread_pool(const thread_pool &) = delete; thread_pool(thread_pool &&) = delete; thread_pool & operator=(const thread_pool &) = delete; thread_pool & operator=(thread_pool &&) = delete; ~thread_pool() { { std::lock_guard<std::mutex> locker(m_mutex); m_stop = true; } m_cond.notify_all(); for (auto & th : m_threads) { if (th.joinable()) th.join(); } } void wait_for_all() { std::unique_lock<std::mutex> locker(m_mutex); m_cond_wait.wait(locker, [&]() { return m_queue.empty(); }); } template<class F> auto add_task(F && f)->std::future<decltype(f())> { using R = decltype(f()); auto task_ptr = std::make_shared<std::packaged_task<R()>>(std::forward<F>(f)); { std::lock_guard<std::mutex> locker(m_mutex); m_queue.emplace( [=]() { (*task_ptr)(); } ); } m_cond.notify_one(); return task_ptr->get_future(); } private: std::atomic_bool m_stop; std::list<std::thread> m_threads; std::queue<std::function<void()>> m_queue; std::mutex m_mutex; std::condition_variable m_cond; std::condition_variable m_cond_wait; void worker() { for(;;) { std::unique_lock<std::mutex> locker(m_mutex); m_cond.wait(locker, [&]() { return !m_queue.empty() || m_stop; }); if (m_stop) return; std::function<void()> task(m_queue.front()); m_queue.pop(); locker.unlock(); if (task) task(); m_cond_wait.notify_one(); // <== Эта условная переменная для wait_for_all } }};
C++ (Qt)class wrapper_pool{public: wrapper_pool(specmath::thread_pool & pool) : m_pool(pool), m_task_count(0) {} template <class F> auto add_task(F && task)->std::future<decltype(task())> { std::lock_guard<std::mutex> locker(m_mutex); ++m_task_count; return m_pool.add_task(std::bind(&wrapper_pool::wrapper_task, this, std::forward<F>(task))); } void wait_for_all() { std::unique_lock<std::mutex> locker(m_mutex); m_cond.wait(locker, [&]() { return !m_task_count; }); } private: specmath::thread_pool & m_pool; std::atomic_int m_task_count; std::mutex m_mutex; std::condition_variable m_cond; void wrapper_task(const std::function<void()> & task) { if (task) task(); std::unique_lock<std::mutex> locker(m_mutex); if (--m_task_count == 0) { locker.unlock(); m_cond.notify_one(); } }};
C++ (Qt) void wrapper_task(const std::function<void()> & task) { if (task) task(); std::unique_lock<std::mutex> locker(m_mutex); if (--m_task_count == 0) { locker.unlock(); m_cond.notify_one(); } }};