C++ (Qt)#include <boost/noncopyable.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <iostream> #include <queue> boost::mutex mutexCout; // Блокировка std::cout boost::mutex mutexDatas;boost::condition condDatas; std::queue<int> datas; void thread_func(){ for(;;) { boost::mutex::scoped_lock locker( mutexDatas ); while( datas.empty() ) condDatas.wait( locker ); int val = datas.front(); if( val == -1 ) break; datas.pop(); locker.unlock(); { boost::mutex::scoped_lock locker( mutexCout ); std::cout << "Process thread " << val << std::endl; } sleep( 3 ); { boost::mutex::scoped_lock locker( mutexCout ); std::cout << "Stop thread " << val << std::endl; } }} // Помещение данных в очередь// -1 специальное сообщение обозначающее, что данных больше не будетvoid push_data( int v ){ boost::mutex::scoped_lock locker( mutexDatas ); datas.push( v ); condDatas.notify_one();} int main( int, char ** ){ boost::thread th1( &thread_func ); boost::thread th2( &thread_func ); boost::thread th3( &thread_func ); for( int i = 0; i < 20; ++i ) push_data( i * 10 ); sleep( 5 ); for( int i = 0; i < 20; ++i ) push_data( i * 10 ); push_data( -1 ); th1.join(); th2.join(); th3.join(); return 0;}
C++ (Qt)//--------------------------------------------------------------------------- #ifndef proc_threadH#define proc_threadH //--------------------------------------------------------------------------- #include <QObject>#include <QThread>#include <QWaitCondition>#include <QMutex> /*-----------------------------------------------------------------------------*/ class TThreadData;class TCommonThreadData;class TProcThread; /*-----------------------------------------------------------------------------*/ class TThreadData {//////}; class TCommonThreadData : public QObject { Q_OBJECT protected: int RunThreads; int ThreadsCount; QAtomicInt Stopped; QMutex Mutex; QWaitCondition Wait; QList<TThreadData*> NewData; public: TCommonThreadData(int ThreadsCount, QObject* parent=NULL); virtual ~TCommonThreadData(); public: inline bool stopped(void) const {return Stopped;} void wait (void); TThreadData* nextThreadData(void); void completeThreadData(TThreadData* ReadyData); protected: void stopAll(void); void addThreadData(TThreadData* ThreadData); protected slots: void doCompleteThreadData(TThreadData* ReadyData); signals: void signalCompleteThreadData(TThreadData*);}; class TProcThread : public QThread { Q_OBJECT protected: TCommonThreadData* CommonData; public: TProcThread(TCommonThreadData* CommonData, QObject* parent=NULL); virtual ~TProcThread(); protected: virtual void run (void); protected: inline void wait (void) {CommonData->wait();} inline bool stopped(void) const {return CommonData->stopped();} protected: void calculate(TThreadData* ThreadData);}; /*-----------------------------------------------------------------------------*/ #endif
C++ (Qt)#include "proc_thread.h"#include <QMutexLocker> /*************************** * class TCommonThreadData ***************************/ TCommonThreadData::TCommonThreadData(int ThreadsCount, QObject* parent) : QObject(parent) { this->ThreadsCount = ThreadsCount; RunThreads = 0; Stopped = 0; connect(this , SIGNAL(signalCompleteThreadData(TThreadData*)), this , SLOT(doCompleteThreadData(TThreadData*)), Qt::QueuedConnection); for(int i=0; i<ThreadsCount; i++) {new TProcThread(this);}} TCommonThreadData::~TCommonThreadData() {} TThreadData* TCommonThreadData::nextThreadData(void) { QMutexLocker MutexLocker(&Mutex); if(NewData.isEmpty()) {return NULL;} RunThreads++; return NewData.takeFirst();} void TCommonThreadData::completeThreadData(TThreadData* ReadyData) { emit signalCompleteThreadData(ReadyData);} void TCommonThreadData::doCompleteThreadData(TThreadData* ReadyData) { // // Work with ReadyData // // delete ReadyData; // QMutexLocker MutexLocker(&Mutex); RunThreads--; if(!Stopped) { int Count = (NewData.count() <= ThreadsCount-RunThreads)?NewData.count():ThreadsCount-RunThreads; for(int i=0; i<Count; i++) {Wait.wakeOne();} return; } if(!RunThreads) { // // All Threads Completed // }} void TCommonThreadData::addThreadData(TThreadData* ThreadData) { QMutexLocker MutexLocker(&Mutex); NewData.append(ThreadData); if(Stopped) {return;} Wait.wakeOne();} void TCommonThreadData::wait (void) { Mutex.lock(); Wait.wait(&Mutex); Mutex.unlock();} void TCommonThreadData::stopAll(void) { Stopped.ref(); Mutex.lock(); Wait.wakeAll(); Mutex.unlock();} /*************************** * class TProcThread ***************************/ TProcThread::TProcThread(TCommonThreadData* CommonData, QObject* parent) : QThread(parent) { this->CommonData = CommonData; connect(this , SIGNAL(finished()), this , SLOT(deleteLater())); start();} TProcThread::~TProcThread() {} void TProcThread::run (void) { TThreadData* ThreadData; for(;;) { if(stopped()) {return;} wait(); if(stopped()) {return;} ThreadData = CommonData->nextThreadData(); if(!ThreadData) {continue;} calculate(ThreadData); CommonData->completeThreadData(ThreadData); }} void TProcThread::calculate(TThreadData* ThreadData) { // // Calculate ThreadData // }
void CThreadControl::PutJob( void * data ){ if (!data) return; CSemaphore::Wait(mClientJobSemaphore); mJobList.PutJob((CRTJob *) data); CSemaphore::Signal(mClientJobSemaphore);}bool CThreadControl::GetJob( void ){ CSemaphore::Wait(mClientJobSemaphore); CRTJob * job = mJobList.GetJob(); // do nothing (but return true) if no jobs if (!job) return true; switch (job->mID) { // main thread is waiting for all jobs ready case MPJobEnd: // if all done then open semaphore for main thread if (++mNumReady >= mThread.Count()) CSemaphore::Signal(mHostEndSemaphore); // unlock queue semaphore CSemaphore::Signal(mClientJobSemaphore); // wait on mClientEndSemaphore CSemaphore::Wait(mClientEndSemaphore); if (mNumReady) { if (--mNumReady > 0) CSemaphore::Signal(mClientEndSemaphore); else CSemaphore::Signal(mHostBegSemaphore); } break; // terminate all jobs case MPJobQuit: // unlock queue semaphore and return false to terminate the thread CSemaphore::Signal(mClientJobSemaphore); return false; // it is a job to calculate default: // unlock queue semaphore and perform the job CSemaphore::Signal(mClientJobSemaphore); job->Eval(); break; } return true;}void CThreadControl::WaitJobs( void ){ int i, limit = mThread.Count(); for (i = 0; i < limit; ++i) PutJob(mJobList.Alloc(MPJobEnd)); CSemaphore::Wait(mHostEndSemaphore); CSemaphore::Signal(mClientEndSemaphore);}void * theThreadFunc( void * ){ while (theThreadControl.GetJob()) {} return 0;}
C++ (Qt)//--------------------------------------------------------------------------- #ifndef proc_threadH#define proc_threadH //--------------------------------------------------------------------------- #include <QObject>#include <QThread>#include <QWaitCondition>#include <QMutex> /*-----------------------------------------------------------------------------*/ class TThreadData;class TCommonThreadData;class TProcThread; /*-----------------------------------------------------------------------------*/ class TThreadData {//////}; class TCommonThreadData : public QObject { Q_OBJECT protected: int RunThreads; int ThreadsCount; bool NoMoreNewData; QAtomicInt Stopped; QMutex Mutex; QWaitCondition Wait; QList<TThreadData*> NewData; public: TCommonThreadData(int ThreadsCount, QObject* parent=NULL); virtual ~TCommonThreadData(); public: inline bool stopped(void) const {return Stopped;} void wait (void); TThreadData* nextThreadData(void); void completeThreadData(TThreadData* ReadyData); protected: void noMoreNewData(void); void addThreadData(TThreadData* ThreadData); void onCalculatedData(TThreadData* ReadyData); void onCompletedAllData(void); protected slots: void doCompleteThreadData(TThreadData* ReadyData); signals: void signalCompleteThreadData(TThreadData*);}; class TProcThread : public QThread { Q_OBJECT protected: TCommonThreadData* CommonData; public: TProcThread(TCommonThreadData* CommonData, QObject* parent=NULL); virtual ~TProcThread(); protected: virtual void run (void); protected: inline void wait (void) {CommonData->wait();} inline bool stopped(void) const {return CommonData->stopped();} protected: void calculate(TThreadData* ThreadData);}; /*-----------------------------------------------------------------------------*/ #endif
C++ (Qt)#include "proc_thread.h"#include <QMutexLocker> /*************************** * class TCommonThreadData ***************************/ TCommonThreadData::TCommonThreadData(int ThreadsCount, QObject* parent) : QObject(parent) { this->ThreadsCount = ThreadsCount; RunThreads = 0; Stopped = 0; NoMoreNewData = false; connect(this , SIGNAL(signalCompleteThreadData(TThreadData*)), this , SLOT(doCompleteThreadData(TThreadData*)), Qt::QueuedConnection); for(int i=0; i<ThreadsCount; i++) {new TProcThread(this);}} TCommonThreadData::~TCommonThreadData() {} TThreadData* TCommonThreadData::nextThreadData(void) { QMutexLocker MutexLocker(&Mutex); if(NewData.isEmpty()) {return NULL;} RunThreads++; return NewData.takeFirst();} void TCommonThreadData::completeThreadData(TThreadData* ReadyData) { emit signalCompleteThreadData(ReadyData);} void TCommonThreadData::doCompleteThreadData(TThreadData* ReadyData) { onCalculatedData(ReadyData); QMutexLocker MutexLocker(&Mutex); RunThreads--; if(!NewData.isEmpty()) { Wait.wakeOne(); } else if (NoMoreNewData) { if(!Stopped) { Stopped.ref(); Wait.wakeAll(); } if(!RunThreads) {onCompletedAllData();} } } void TCommonThreadData::addThreadData(TThreadData* ThreadData) { if(NoMoreNewData) { // throw ERROR usage addThreadData } QMutexLocker MutexLocker(&Mutex); NewData.append(ThreadData); Wait.wakeOne();} void TCommonThreadData::wait (void) { Mutex.lock(); Wait.wait(&Mutex); Mutex.unlock();} void TCommonThreadData::noMoreNewData(void) { if(NoMoreNewData) {return;} NoMoreNewData = true; QMutexLocker MutexLocker(&Mutex); if(NewData.isEmpty()) { Stopped.ref(); Wait.wakeAll(); if(!RunThreads) {onCompletedAllData();} }} void TCommonThreadData::onCalculatedData(TThreadData* ReadyData) { // // Work with ReadyData // // delete ReadyData; // } void TCommonThreadData::onCompletedAllData(void) { // // All Data Calculated // } /*************************** * class TProcThread ***************************/ TProcThread::TProcThread(TCommonThreadData* CommonData, QObject* parent) : QThread(parent) { this->CommonData = CommonData; connect(this , SIGNAL(finished()), this , SLOT(deleteLater())); start();} TProcThread::~TProcThread() {} void TProcThread::run (void) { TThreadData* ThreadData; for(;;) { if(stopped()) {return;} wait(); if(stopped()) {return;} ThreadData = CommonData->nextThreadData(); if(!ThreadData) { // throw ERROR Synchronization } calculate(ThreadData); CommonData->completeThreadData(ThreadData); }} void TProcThread::calculate(TThreadData* ThreadData) { // // Calculate ThreadData // }
C++ (Qt)#include <boost/noncopyable.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <iostream> #include <queue> #include <list> struct JobData{ JobData( int i ) : x( i ), y( i ) {} int x; int y; }; class JobManager{ public: bool begin( int numThread = 10 ); bool end(); void addJob( const JobData &data ); private: void threadFunc(); private: boost::mutex m_mutex; boost::condition m_cond; std::queue<JobData*> m_datas; typedef boost::shared_ptr<boost::thread> ThreadPtr; std::list<ThreadPtr> m_threadPool; }; bool JobManager::begin( int numThread ){ if( m_threadPool.size() ) { std::cout << "Begin already running." << std::endl; return false; } for( int i = 0; i < numThread; ++i ) { ThreadPtr thread( new boost::thread( boost::bind( &JobManager::threadFunc, this ) ) ); m_threadPool.push_back( thread ); } return true;} bool JobManager::end(){ if( !m_threadPool.size() ) { std::cout << "End already running." << std::endl; return false; } { boost::mutex::scoped_lock locker( m_mutex ); m_datas.push( 0 ); m_cond.notify_all(); } for_each( m_threadPool.begin(), m_threadPool.end(), boost::bind( &boost::thread::join, _1 ) ); m_threadPool.clear(); m_datas.pop(); return true;} void JobManager::addJob( const JobData &data ){ boost::mutex::scoped_lock locker( m_mutex ); m_datas.push( new JobData( data ) ); m_cond.notify_one(); } void JobManager::threadFunc(){ for(;;) { boost::mutex::scoped_lock locker( m_mutex ); while( m_datas.empty() ) m_cond.wait( locker ); JobData *val = m_datas.front(); if( !val ) break; JobData data( *val ); m_datas.pop(); delete val; locker.unlock(); // Выполняем расчет используя data // ... }}
C++ (Qt){ JobManager mng; // Запускаем 5 ниток mng.begin( 5 ); for( ... ) mng.addJob( JobData(...) ); mng.stop() // Запускаем 20 ниток mng.begin( 20 ) for( ... ) mng.addJob( JobData(...) ); mng.stop() return 0;}