Russian Qt Forum
Ноябрь 23, 2024, 10:05 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: 1 ... 3 4 [5]   Вниз
  Печать  
Автор Тема: Нагрузка на поток  (Прочитано 35345 раз)
kramer3d
Гость
« Ответ #60 : Февраль 26, 2010, 08:08 »

Если поток в настоящий момент загружен на 100%, какое условие должно выполнится, чтобы ты считал, что в пул ещё можно добавить соединение?
Если поток в настоящий момент загружен на 100%, это означает, что он выполняет какой-то код, а не крутит event-loop в холостую. Причем это довольно слабо характеризует нагрузку на event-loop потока, если только не производить подобные измерения регулярно, и не усреднять полученные значения.
Ты написал, что у тебя нет информации, сколько будет выполнятся внешняя функция обработчик (из dll например или SSL шифрование). То есть получается, что задача не решаема. Даже если ты будешь знать что в пуле только одно соединение, то всё равно не сможешь точно сказать, когда оно будет обработано и принять решение добавлять или нет. Я собственно предлагал, экспериментальным путём при помощи профайлера получить среднее время выполнения и потом эти данные использовать.
Обработано - это не значит "прожевано и забыто". Вообще, "обработано" в данном случае не совсем корректный термин. Более корректно употреблять термин "обслуживается", так как это соединение - это объект-QObject, который может получать события извне и должен на них реагировать. Экспериментальным путем при помощи профайлера можно, конечно, замерить ресурсоемкость всех методов, обрабатывающий события QObject-а, а так же всех методов, обрабатывающих события его членов, а потом эту относительную ресурсоемкость суммировать и вычислять такми образом суммарную нагрузку на поток. Но это даст нам скорее статическую картину, да и далеко не факт, что стопроцентно достоверную. И потом, что же, если я захочу изменить что-то в коде или добавить новой функциональности, мне придется все эти измерения заново проводить? И как быть с кроссплатформенностью? Ведь на разных ОС потоки выполняются и управляются по разному. И у Qt на разных ОС разная низкоуровневая начинка для управления потоками да и вообще, для эффективного обслуживания QObject.
Записан
kramer3d
Гость
« Ответ #61 : Февраль 26, 2010, 09:37 »

Если использовать сигналы для межпотокового общения, то в качестве очереди будет использоваться очередь событий потока получателя.
О том и речь. Сигналы QueuedConnection и события - это в сущности одно и тоже.
IMHO, пропускная способность потока принятия решения, будет перекрывать все потребности фронтэнда. Ведь ему нужно будет только принять решение и запустить или не запустить другой поток для выполнения работы.
Конечно, все эти мысли без конкретных спецификаций на протокол могут быть ошибочными.
В этом случае опять накладываются ограничения на масштабируемость. Т.е. я могу расширять фронтенд, занимающийся получением/отправкой данных, но не могу расширять модуль обработчиков, который не является многопоточным. Конечно, его пропускная способность с головой перекрывает потребности фронтенда, но только до поры, до времени. Т.е. когда в силу увеличившейся нагрузки на систему (аппаратные ресурсы у нас не ограничены, будем считать) я создам 50 (или 500, не важно) потоков во фронтенде, единственный поток обработчиков данных станет "бутылочным горлышком" и будет тормозить рост производительности всей системы.
Записан
BRE
Гость
« Ответ #62 : Февраль 26, 2010, 10:11 »

При обсуждении абстрактных вещей очень сложно говорить о конкретных задержках. Давай будет конкретизировать.
Есть класс Job, который описывает "задачу". Пока решаем, что этот класс наследуется от QThread. Все виды задач, которые может выполнять сервер, описывается своим классом наследником Job. Эти классы могут быть определены как в основной программе, так и во внешних разделяемых библиотеках. При старте системы все эти классы регистрируются в фабрике задач JobFactory.
За создание объекта задачи отвечает ее метод:
Job *JobFactory::build( CUserId id, const Command &cmd );
Если для указанной команды нет подходящей задачи, метод вернет 0. Время создания новой задачи константное.

Переходим в слот получения данных readyRead:
* читаем данные из сокета
* проверяем, что прошел весь пакет, если нет - выходим из слота
* формируем объект cmd из полученных данных
* создаем объект Job с помощью фабрики, если объект не создался - выходим из слота
* запускаем новый поток описываемый этим объектом Job

Дальше Job для своей работы может создать еще 100 потоков, работать не эффективно, зависнуть, но от него не будет зависеть работа остальной системы.
Записан
BigZ
Гость
« Ответ #63 : Февраль 26, 2010, 10:24 »

Если поток в настоящий момент загружен на 100%, это означает, что он выполняет какой-то код, а не крутит event-loop в холостую. Причем это довольно слабо характеризует нагрузку на event-loop потока, если только не производить подобные измерения регулярно, и не усреднять полученные значения.

Обработано - это не значит "прожевано и забыто". Вообще, "обработано" в данном случае не совсем корректный термин. Более корректно употреблять термин "обслуживается",

Тогда не совсем понятно, какую информацию ты хочешь получить.

Потери в твоей схеме
состоят из двух частей – потери при маршалинге событий в event-loop (то есть потери в Qt) и потери в твоих обработчиках. Могу тебя уверить, что event-loop Qt работает быстро и потерями в нём можно пренебречь. Но даже если этого не достаточно, думаю, ты можешь получить количество событий в очереди и на основе их числа принимать решения. Однако по сути это тебе ни чего не даст, так как если поток загружен на 100% и в очереди всего одно сообщение, то ты всё равно не можешь быть уверен, что след сообщение будет быстро обработано, так как твой обработчик может выполнятся час.
Правильным кажется в этой ситуации создавать новый поток если в данный момент все потоки загружены на 100%.

То есть тебе вначале нужно всё-таки определится какую характеристику ты хочешь получить, ну вот какой метод в Qt тебя бы устроил?
Записан
kramer3d
Гость
« Ответ #64 : Февраль 26, 2010, 11:09 »

При обсуждении абстрактных вещей очень сложно говорить о конкретных задержках. Давай будет конкретизировать.
[skip]
* запускаем новый поток описываемый этим объектом Job
Дальше Job для своей работы может создать еще 100 потоков, работать не эффективно, зависнуть, но от него не будет зависеть работа остальной системы.
Не, немного не в ту степь. Получается, что каждый следующий пакет данных будет обрабатываться в своем потоке. Threading overhead будет сумасшедший.
Тогда не совсем понятно, какую информацию ты хочешь получить.
Потери в твоей схеме
состоят из двух частей – потери при маршалинге событий в event-loop (то есть потери в Qt) и потери в твоих обработчиках. Могу тебя уверить, что event-loop Qt работает быстро и потерями в нём можно пренебречь. Но даже если этого не достаточно, думаю, ты можешь получить количество событий в очереди и на основе их числа принимать решения. Однако по сути это тебе ни чего не даст, так как если поток загружен на 100% и в очереди всего одно сообщение, то ты всё равно не можешь быть уверен, что след сообщение будет быстро обработано, так как твой обработчик может выполнятся час.
Правильным кажется в этой ситуации создавать новый поток если в данный момент все потоки загружены на 100%.
То есть тебе вначале нужно всё-таки определится какую характеристику ты хочешь получить, ну вот какой метод в Qt тебя бы устроил?
Да нет в Qt такого метода. Надо его самому реализовать, вопрос в том, как. Дело в том, что мне нужна не моментальная локальная загруженность потока, а некоторая интегральная характеристика его загруженности. Т.е. локальная загруженность может оказаться 100%, так как поток в настоящее время выполняет какой-нибудь обработчик. Но через полсекунды выполнение завершится, и поток снова полчаса будет стоять без дела. А я уже создал новый поток и отдал новый объект ему на обслуживание.
Предполагается, что обслуживаемые объекты генерируют более-менее постоянную нагрузку на поток, то есть характер операций, выполняемых объектом, меняется медленно (относительно). Но для каждого объекта эта нагрузка может быть разной, и, вообще говоря, она не поддается прогнозированию. Мне нужно считать некую интегральную характеристику загруженности. Мммм.. Ну, хоть статистику за период времени, чтоли. Т.е. полчаса стоял без дела, полсекунды работал - значит смело можно добавлять ему новые объекты. Поэтому-то я и завел речь об усредненной латентности цикла событий. Она, мне кажется, вполне может быть такой характеристикой, а чтобы ее посчитать, не надо через каждые три миллисекунды мерять загрузку потока, достаточно по изменению статуса объектов (обработалось событие, или объект удалился из потока) запускать в event loop событие с таймстемпом, и в обработчике мерять/усреднять время, которое оно простояло в очереди.

Друзья! Я сейчас доклепаю прототип, иллюстрирующий эту задачу, погоняю его немного и запощу код сюда. Тогда мы продолжим обсуждение уже в конкретной проблематике. А то толчем воду в ступе с абстракциями непонятными.
Записан
BigZ
Гость
« Ответ #65 : Февраль 26, 2010, 12:56 »

Дело в том, что мне нужна не моментальная локальная загруженность потока, а некоторая интегральная характеристика его загруженности. Т.е. локальная загруженность может оказаться 100%, так как поток в настоящее время выполняет какой-нибудь обработчик. Но через полсекунды выполнение завершится, и поток снова полчаса будет стоять без дела. А я уже создал новый поток и отдал новый объект ему на обслуживание.
Предполагается, что обслуживаемые объекты генерируют более-менее постоянную нагрузку на поток, то есть характер операций, выполняемых объектом, меняется медленно (относительно). Но для каждого объекта эта нагрузка может быть разной, и, вообще говоря, она не поддается прогнозированию. Мне нужно считать некую интегральную характеристику загруженности. Мммм.. Ну, хоть статистику за период времени, чтоли. Т.е. полчаса стоял без дела, полсекунды работал - значит смело можно добавлять ему новые объекты.

Ну тогда запускай вначале свой сервер в не оптимальном режиме и на первом этапе формируй базу статистики нагрузки, то есть, как бы обучаешь сервер. После скажем часа работы можно начинать перераспределять задания с учётом накопленных измерений.
То есть если ты такое сделаешь, то мы получим первый в мире потоковый сервер с искусственным интеллектом и способностью обучатся Улыбающийся

P.S. Можно даже сказать с применением нано-технологий... Улыбающийся
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #66 : Февраль 26, 2010, 13:33 »

Ну тогда запускай вначале свой сервер в не оптимальном режиме и на первом этапе формируй базу статистики нагрузки, то есть, как бы обучаешь сервер. После скажем часа работы можно начинать перераспределять задания с учётом накопленных измерений.
То есть если ты такое сделаешь, то мы получим первый в мире потоковый сервер с искусственным интеллектом и способностью обучатся Улыбающийся

P.S. Можно даже сказать с применением нано-технологий... Улыбающийся
Ну может слишком громко сказано об интеллекте, но мысль хорошая, практичная. Засекать время каждого пакета (годится и для новых) и затем опираться на эту статистику. Понятно что она не идеальна (напр. ОС отдал все на back-end) но она ловит "пропорции". Реализация не сложна, без тестового режима можно обойтись, до тех пор пока возникнут проблемы с загрузкой - статистика накопится.
Записан
BRE
Гость
« Ответ #67 : Февраль 26, 2010, 13:35 »

Не, немного не в ту степь. Получается, что каждый следующий пакет данных будет обрабатываться в своем потоке.
Да, каждое новое задание будет обрабатываться в новом потоке.
Не обязательно запускать каждый раз новый поток, а потом его убивать. Можно воспользоваться пулом уже запущенных потоков (пример QThreadPool). Эту идею я уже тоже описывал.
При старте запускаются для примера 50 потоков, которые запираются на мьютексе и не забирают ресурсы системы. Появилось новое задание, взяли уже запущенный поток, выполнили в нем работу (очень быстро или очень медленно, не важно) и вернули обратно в пул. Никакого оверхеда на создание/завершение потока не будет.
Если в пуле потоки закончились, создали еще 50 новых. Добавлением/удалением потоков из пула. может заниматься отдельный поток.

В общем, пока я каких-то проблем с подобных алгоритмом не вижу...  Строит глазки
Записан
kramer3d
Гость
« Ответ #68 : Февраль 26, 2010, 15:26 »

Да, каждое новое задание будет обрабатываться в новом потоке.
Не обязательно запускать каждый раз новый поток, а потом его убивать. Можно воспользоваться пулом уже запущенных потоков (пример QThreadPool). Эту идею я
[skip]
В общем, пока я каких-то проблем с подобных алгоритмом не вижу...  Строит глазки
Т.е. ты настаиваешь на отказе от эвент-лупа. Разумно, в принципе, от него в данном случае отдни проблемы.
Реализовать, например так:
Код:
class Functor
{
public:
  void operator()(Data* data)
  {
    if ( data )
      ... //process data;
  }
};

class Job : private QThread {
  Q_OBJECT
public:
  Job() : exit(false) {}
  ~Job() {}
  void process(Data* d);
  void stop()
protected:
  void run();
private:
  Functor functor;
  Data *data;
  QMutex mutex;
  QWaitCondition nextDataAvailable;
  bool exit;
};

void Job::process(Data *d)
{
  QMutexLocker locker(&mutex);
  data = d;
  functor = JobFactory::getFunctorForDataType(d->typeId());
  if ( isRunning() )
    nextDataAvailable.wakeAll();
  else
    start();
}

void Job::run()
{
  while (!exit)
  {
  QMutexLocker locker(&mutex);
  functor(data);
  nextDataAvailable.wait(&mutex);
  }  
}

void Job::stop()
{
   exit = true;
   nextDataAvailable.wakeAll();
}

Будет работать? Вроде будет. Не хватает только функции isBusy().
Т.е. получается, один пакет данных - один поток. А если от пятисот клиентов придет пятьсот пакетов данных - создадим 250000 потоков? Можно конечно пришедший пакет положить в общую очередь, а потом его оттуда достать. Будет работать? Вроде будет. Буду думать, короче. Граблей там разложено еще не мало.

А задача с потоками с Event Loop'ом так и остается нерешенной. Ну может, не спорьте, в жизни потребоваться ворочать миллион QObjecto'ов.
Я тут в виде прототипа набросал класс LatencyChecker, который работает по моей схеме (точней по гибридной - твоей и моей), т.е. запускает в луп событие с таймстепмом и меряет время простоя в очереди. А интервал запуска таких событий можно установить произвольный, и вся логика вынесена наружу - пусть поток сам решает, как и чего ему у себя мерять. Короче, я с ним еще поиграюсь и выложу сюда на суд общественности. Но уже завтра. А точней, наверное, в понедельник. Надо бы и отдохнуть от трудов праведных. Улыбающийся
« Последнее редактирование: Февраль 26, 2010, 15:29 от kramer3d » Записан
BRE
Гость
« Ответ #69 : Февраль 26, 2010, 19:15 »

Писал прямо здесь, могут быть опечатки...  Улыбающийся
Это наброски адаптированные для Qt.
Здесь нет увеличения количества потоков, но сама идея должна быть понятна.

Код
C++ (Qt)
class Job
{
public:
virtual ~Job() {}
 
protected:
// Функция будет выполняться в рабочем потоке
virtual void run() = 0;
 
// Прячем конструктор, что бы никому не захотелось его переопределять и выполнять в нем "тяжелые" операции
Job() {}
void setup( UserId id, const Command &cmd )
{
m_id = id;
m_cmd = cmd;
}
 
UserId m_id; // Id клиента
Command m_cmd; // Команда со всеми аргументами, параметрами, ...
 
// Дружим его с JobFactory, что бы он имел доступ к закрытым членам Job
friend class JobFactory;
};
 
// Конкретная реализация задачи
class MyJob : public Job
{
protected:
virtual void run()
{
// Имеем доступ к UserId и Command
 
// Выполняем операцию...
}
};
 
class JobFactory
{
public:
Job *build( UserId id, const Command &cmd )
{
Job *job = new MyJob; // Создаем объект-задачу в зависимости от cmd
if( job )
job->setup( id, cmd );
return job;
}
};
 

Код
C++ (Qt)
class ThreadPool
{
public:
ThreadPool( int prepareThreadNum = 50 )
{
for( int i = 0; i < prepareThreadNum; ++i )
{
JobThread *th = new JobThread( *this );
th->start();
m_threads.append( th );
}
}
 
void addJob( Job *job )
{
Q_ASSERT( job );
 
QMutexLocker lock( &m_mutex );
m_jobs.enqueue( job ); // Добавили в очередь задачу
m_cond.wakeOne(); // Будим один поток
}
 
void quit()
{
QMutexLocker lock( &m_mutex );
m_jobs.enqueue( 0 ); // Добавили в очередь нулевой указатель - признак завершения всех потоков
m_cond.wakeAll(); // Будим все потоки, что бы они завершились
 
// Дожидаемся завершения всех потоков и удаляем их объекты
}
 
private:
QMutex m_mutex;
QWaitCondition m_cond;
QQueue<Job*> m_jobs;
QList<JobThread*> m_threads;
 
friend class JobThread; // Разрешаем доступ к закрытым общим переменным.
};
 
// Внутренний класс, про него незнает никто кроме ThreadPool
class JobThread : public QThread
{
public:
JobThread( ThreadPool &pool ) : m_pool( pool ) {}
 
protected:
virtual void run()
{
for(;;)
{
QMutexLocker lock( &m_pool.m_mutex );
while( m_pool.m_jobs.empty() )
m_pool.m_cond.wait( &m_pool.m_mutex );
 
// В очереди есть задачи - проснулись или взялись за следующую задачу
// Общий мьютекс m_mutex в этот момент залочен
// Проверяем, что лежит в голове очереди, если 0, то это признак завершения (его мы из очереди не вынимаем, что бы его увидели все запущенные потоки)
if( !m_pool.m_jobs.head() )
break; // Получили пустой
 
// Вынимаем указатель на задачу из очереди
Job *job = m_pool.m_jobs.dequeue();
 
// Разблокируем мьютекс
m_pool.m_mutex.unlock();
 
// Запускаем задачу
job->run();
 
// Удаляем задачу из памяти
delete job;
}
}
 
private:
ThreadPool &m_pool;
};
 

Код
C++ (Qt)
void Client::readyRead()
{
// Определяем, читаем, ...
 
UserId id = ...;
Command cmd = ...;
Job *job = m_jobFactory.build( id, cmd );
if( job )
{
m_poolThread.addJob( job );
}
}
 
« Последнее редактирование: Февраль 26, 2010, 20:51 от BRE » Записан
Страниц: 1 ... 3 4 [5]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.217 секунд. Запросов: 23.