class Functor{public: void operator()(Data* data) { if ( data ) ... //process data; }};class Job : private QThread { Q_OBJECTpublic: Job() : exit(false) {} ~Job() {} void process(Data* d); void stop()protected: void run();private: Functor functor; Data *data; QMutex mutex; QWaitCondition nextDataAvailable; bool exit;};void Job::process(Data *d){ QMutexLocker locker(&mutex); data = d; functor = JobFactory::getFunctorForDataType(d->typeId()); if ( isRunning() ) nextDataAvailable.wakeAll(); else start();}void Job::run(){ while (!exit) { QMutexLocker locker(&mutex); functor(data); nextDataAvailable.wait(&mutex); } }void Job::stop(){ exit = true; nextDataAvailable.wakeAll();}
C++ (Qt)class Job{public: virtual ~Job() {} protected: // Функция будет выполняться в рабочем потоке virtual void run() = 0; // Прячем конструктор, что бы никому не захотелось его переопределять и выполнять в нем "тяжелые" операции Job() {} void setup( UserId id, const Command &cmd ) { m_id = id; m_cmd = cmd; } UserId m_id; // Id клиента Command m_cmd; // Команда со всеми аргументами, параметрами, ... // Дружим его с JobFactory, что бы он имел доступ к закрытым членам Job friend class JobFactory;}; // Конкретная реализация задачиclass MyJob : public Job{protected: virtual void run() { // Имеем доступ к UserId и Command // Выполняем операцию... }}; class JobFactory{public: Job *build( UserId id, const Command &cmd ) { Job *job = new MyJob; // Создаем объект-задачу в зависимости от cmd if( job ) job->setup( id, cmd ); return job; }};
C++ (Qt)class ThreadPool{public: ThreadPool( int prepareThreadNum = 50 ) { for( int i = 0; i < prepareThreadNum; ++i ) { JobThread *th = new JobThread( *this ); th->start(); m_threads.append( th ); } } void addJob( Job *job ) { Q_ASSERT( job ); QMutexLocker lock( &m_mutex ); m_jobs.enqueue( job ); // Добавили в очередь задачу m_cond.wakeOne(); // Будим один поток } void quit() { QMutexLocker lock( &m_mutex ); m_jobs.enqueue( 0 ); // Добавили в очередь нулевой указатель - признак завершения всех потоков m_cond.wakeAll(); // Будим все потоки, что бы они завершились // Дожидаемся завершения всех потоков и удаляем их объекты } private: QMutex m_mutex; QWaitCondition m_cond; QQueue<Job*> m_jobs; QList<JobThread*> m_threads; friend class JobThread; // Разрешаем доступ к закрытым общим переменным.}; // Внутренний класс, про него незнает никто кроме ThreadPoolclass JobThread : public QThread{public: JobThread( ThreadPool &pool ) : m_pool( pool ) {} protected: virtual void run() { for(;;) { QMutexLocker lock( &m_pool.m_mutex ); while( m_pool.m_jobs.empty() ) m_pool.m_cond.wait( &m_pool.m_mutex ); // В очереди есть задачи - проснулись или взялись за следующую задачу // Общий мьютекс m_mutex в этот момент залочен // Проверяем, что лежит в голове очереди, если 0, то это признак завершения (его мы из очереди не вынимаем, что бы его увидели все запущенные потоки) if( !m_pool.m_jobs.head() ) break; // Получили пустой // Вынимаем указатель на задачу из очереди Job *job = m_pool.m_jobs.dequeue(); // Разблокируем мьютекс m_pool.m_mutex.unlock(); // Запускаем задачу job->run(); // Удаляем задачу из памяти delete job; } } private: ThreadPool &m_pool;};
C++ (Qt)void Client::readyRead(){ // Определяем, читаем, ... UserId id = ...; Command cmd = ...; Job *job = m_jobFactory.build( id, cmd ); if( job ) { m_poolThread.addJob( job ); }}