Russian Qt Forum
Ноябрь 24, 2024, 17:06 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1]   Вниз
  Печать  
Автор Тема: Таймауты в многопоточном клиент-серверном приложении.  (Прочитано 8853 раз)
pawok11
Гость
« : Март 02, 2010, 17:23 »

Есть клиент-серверное приложение. В клиенте создается несколько потоков, от 1-2 до 5-7.
В каждом потоке производится tcp-подключение, отсылка пакета запроса и прием пакета ответа.

На клиенте есть очередь запросов. Каждый запрос характеризуется ip, портом и данными для отправки. Таких запросов порядка 15 (число запросов может быть больше).
Запросы генерируются по времени (обязательное условие генерации одинаковых запросов раз в определенный промежуток времени). Если запрос есть в очереди запросов, то он туда не ставиться.

Создается пул потоков (например 5) для обработки этой очереди запросов. Если поток свободный, в поток передается запрос на обработку.
Если свободных потоков нет, то запрос ставиться в очередь ожидающих на отправку.

Диспетчер пула клиентов
Код:

    QList<QThread*> clients;
    ...
   
void DispatcherThread::run()
{
    QMapIterator<QString,int> itrNumber(*numberOfThreads); // содержит число запускаемых потоков по типам (tcp,udp и т.д.)
    //создание пула потоков
    while (itrNumber.hasNext())
    {
        itrNumber.next();
        if (itrNumber.key() == "tcp")
        {
            for (int i=0; i<itrNumber.value()-1; i++)
            {
                typesList << "tcp";
                occupiedList << false;  // флаг поток занят (обрабатывает поток)
                signatures << RequestSignature();  // данные для запроса (пустой конструктор)

                TcpClientThread *tcpth = new TcpClientThread();
                connect(tcpth,SIGNAL(data()),this,SLOT(slotData()));
                connect(tcpth,SIGNAL(timeout()),this,SLOT(slotTimeout()));
                connect(tcpth,SIGNAL(nodata()),this,SLOT(slotNoData()));
                clients << tcpth;
            }
        }
        else if (itrNumber.key() == другие типы)
        {
        }
    }
    while (!stopped)
    {       
        mutex.lock();
       
        if (!requestQueue->isEmpty())// очередь запросов
        {
            RequestSignature signature = requestQueue->dequeue();
            mutex.unlock();

            QString abonent = signature.getAbonent();
            QString ip = abonents->ips->value(abonent);
            QString mac = abonents->macs->value(abonent);
            quint16 port = abonents->ports->value(abonent);
            QString type = abonents->types->value(abonent);
            QString message = signature.getMessage();
            ConfigProtocol* pProtocol = protocols->value(signature.getProtocol());

            bool findNotOccupied = false;           
            // если есть свободный поток заданного типа, то занимаем его и запускаем на обработку данных
            for (int i=0; i<typesList.size(); i++)
            {
                if (type == typesList[i])
                {
                    if (!occupiedList[i])
                    {
                        if (type == "tcp")
                        {                           
                            ((TcpClientThread*)(clients[i]))->setProcess(ip,port,message,pProtocol);                           
                            ((TcpClientThread*)(clients[i]))->start();
                        }
                        else if (type == другие типы)
                        {
                            ...
                        }                       
                        else
                        {
                            break;
                        }
                        signatures[i] = signature;
                        occupiedList[i] = true;
                        findNotOccupied = true;
                        break;
                    }
                }
            }
            // если свободного потока нет, то ставим в очередь ожидающих
            if (!findNotOccupied)
            {
                waitRequestQueue[type].enqueue(signature);
            }
        }
        else
        {
            mutex.unlock();
        }
    }
}
       
void DispatcherThread::slotData()
{
    QThread *thread = (QThread*)sender();
    int index = clients.indexOf(thread);
    if (index != -1)
    {
        processRequestData(index);
    }
}

void DispatcherThread::slotTimeout()
{
    QThread *thread = (QThread*)sender();
    int index = clients.indexOf(thread);
    if (index != -1)
    {
        processRequestData(index);
    }
}

void DispatcherThread::slotNoData()
{
    QThread *thread = (QThread*)sender();
    int index = clients.indexOf(thread);
    if (index != -1)
    {
        processRequestData(index);
    }
}       

void DispatcherThread::processRequestData(int aIndex)
{
    QThread *thread = clients[aIndex];
    QMap<QString,QString> data;
    if (typesList[aIndex] == "tcp")
    {
        data = ((TcpClientThread*)(thread))->getData();
    }
    else if (typesList[aIndex] == другие типы)
    {
        ...
    }
    else
    {
        return;
    }
   
    // дальнейшая обработка данных
    ...       

    if (typesList[aIndex] == "tcp")
    {
        ((TcpClientThread*)(thread))->clearData();
    }
    else if (typesList[aIndex] == другие типы)
    {
        ...
    }
    else
    {
        return;
    }

    // просматриваем очередь ожидающих и если она не пуста, берем один запрос и его обрабатываем
    if (waitRequestQueue[typesList[aIndex]].size() > 0)
    {
        RequestSignature signature = waitRequestQueue[typesList[aIndex]].dequeue();
        QString abonent = signature.getAbonent();
        QString ip = abonents->ips->value(abonent);
        QString mac = abonents->macs->value(abonent);
        quint16 port = abonents->ports->value(abonent);
        QString type = abonents->types->value(abonent);
        QString message = signature.getMessage();
        ConfigProtocol* pProtocol = protocols->value(signature.getProtocol());
        if (type == "tcp")
        {
            ((TcpClientThread*)(clients[aIndex]))->setProcess(ip,port,message,pProtocol);
            ((TcpClientThread*)(clients[aIndex]))->start();
        }
        else if (type == дргуие типы)
        {
          ...
        }
        else
        {
            return;
        }
        signatures[aIndex] = signature;
        occupiedList[aIndex] = true;
    }
    else
    {
        occupiedList[aIndex] = false;
    }
}

Поток обработки клиента
Код:
void TcpClientThread::run()
{   
    QTcpSocket socket;
    socket.connectToHost(ip,port);

    if (socket.waitForConnected(Global::NormalTimeout))
    {       
        emit message(tr("Успешное подключение (TCP) к ip '")+ip);
        QByteArray outBlock = Type::toByteArray(protocol->mesReqTypes->value(message));
        socket.write(outBlock);

        if (protocol->mesAnsProperties->value(message).size() > 0)
        {
            if (socket.waitForReadyRead(Global::NormalTimeout))
            {
                if (socket.bytesAvailable() == Type::size(protocol->mesAnsTypes->value(message)))
                {
                    QByteArray inBlock = socket.readAll();                   
                    setData(dataBuffer,inBlock);
                    emit data();
                }
                else
                {
                    emit message(tr("(TCP) Размер ответного пакета не соответствует ожидаемому"));
                    emit nodata();
                }
            }
            else
            {
                emit message(tr("Таймаут ожидания ответа от (TCP) ip '")+ip);
                emit timeout();
            }
        }     
    }
    else
    {
        emit logMessage(tr("Таймаут подключения к (TCP) ip '")+ip);
        emit timeout();
    }
   
    socket.disconnectFromHost();
    socket.close();
}

Со стороны сервера запущен QTcpServer. Вообще таких серверов может быть несколько, работающих по разному принципу. На каждое входящее сообщение создается свой собственный поток обработки.

Диспетчер сервера
Код:
void DispatcherServerThread::run()
{
    for (int i=0; i<confWorkplace->serversServers->size(); i++)
    {
        if (confWorkplace->serversTypes->value(i) == "tcp")
        {
            QString serverName = confWorkplace->serversServers->value(i);
            uint port = confWorkplace->serversPorts->value(i);

            TcpServer *server = new TcpServer(servers->value(serverName),protocols,0);           
            if (!server->listen(QHostAddress::Any,port))
            {
                emit setMessage(tr("Не удалось запустить сервер TCP '");
                continue;
            }
            emit setMessage(tr("Сервер TCP запущен."));
        }
    }
    exec();
}

void TcpServer::incomingConnection(int socketDescriptor)
{
    TcpServerThread *thread = new TcpServerThread(socketDescriptor,server,protocols,
                                                  messagesSizes,messagesEvals,dataToThreads,this);
    connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
    thread->start();
}


Поток обработки входящего соединения на сервере
Код:
void TcpServerThread::run()
{
    QTcpSocket tcpSocket;
    if (!tcpSocket.setSocketDescriptor(socketDescriptor))
    {
        return;
    }   

    if (tcpSocket.waitForReadyRead(Global::NormalTimeout))
    {
        QByteArray inBlock = tcpSocket.readAll();

        QByteArray outBlock = processing(inBlock);
        tcpSocket.write(outBlock);
        tcpSocket.waitForBytesWritten(Global::NormalTimeout);
        emit setMessage(tr("Отправлен ответ (TCP) на ip '")+tcpSocket.peerAddress().toString()+
                        tr("' порт '")+QString("%1").arg(tcpSocket.peerPort())+
                        tr("' (сообщение '")+currentMessage.second+
                        tr("' протокола '")+currentMessage.first+"')");
    }

    tcpSocket.disconnectFromHost();
    tcpSocket.close();

}

Ситуация складывается следующая. в первое время все работает нормально. Через какое-то время в клиенте начинает срабатывать таймаут подключения. Потом соединение проходит и данные приходят. Такое "мигание" (то есть данные, то таймуат) продолжается какое-то время где около часа. Потом все время начинает срабатывать таймаут подключения.

Почему начинают срабатывать таймуаты?
Записан
BRE
Гость
« Ответ #1 : Март 02, 2010, 17:39 »

Что бросилось в глаза.
Код
C++ (Qt)
   if (tcpSocket.waitForReadyRead(Global::NormalTimeout))
   {
       QByteArray inBlock = tcpSocket.readAll();
 

Это не гарантирует, что пришел весь пакет с данными.
Одна сторона послала 10Кб, вторая получила первую партию блока (2Кб), произойдет выход из waitForReadyRead и ты вычитываешь только часть данных.
Записан
pawok11
Гость
« Ответ #2 : Март 02, 2010, 17:52 »

Пока порядок передаваемых данных 100-200 байт и это не должно влиять.

А где можно почитать про то как это обработать?
Записан
niXman
Гость
« Ответ #3 : Март 02, 2010, 18:23 »

pawok11, для вашей задачи абсолютно не подходят блокирующие сокеты. т.к. уничтожают всю идею.
я про это:
Код
C++ (Qt)
tcpSocket.waitForReadyRead(Global::NormalTimeout)
и подобное.

наверное можно каким-то боком, используя Qt, нормально решить задачу. но что-то сомневаюсь. много оверхеда нужно написать.
смотрите лучше в сторону boost.asio. эта либа спроектирована именно для задач подобного рода.
Записан
SABROG
Гость
« Ответ #4 : Март 02, 2010, 23:15 »

Если свободных потоков нет, то запрос ставиться в очередь ожидающих на отправку.

Неправильное решение как минимум по двум причинам. Как только установлено соединение начинается отсчет, обычно 60 секунд. Если в течении этого времени никаких данных не приходит, то сокет отключается операционной системой, дабы не подвергать машину возможной атаки (syn-flood например) и дать другим клиентам возможность подключиться. Причем таймауты могут стоять как на стороне сервера так и на стороне клиента. Мне например пришлось редактировать настройки Firefox и сервера на работе, чтобы браузер мог дождаться данных от PHP скрипта. Правда я столкнулся с неприятной особенностью таймаутов, если поставить слишком большой значение, скажем 1 час, то сервер все равно никогда не ответит, даже если будет готов через 1 секунду. До сервера может запрос тупо не дойти. Причем если поставлю таймаут 5 минут, то соединение обрывается до того как PHP скрипт успевает полностью отработать. Да еще архитектура кривая, не поддерживает докачку и каждый раз начинает всё заново. Короче без нормального собственного протокола будут проблемы. Если нужно долго поддерживать связь между двумя сокетами, то нужно реализовывать PING-PONG опкоды, чтобы каждая из сторон знала, что его собеседник еще жив. Если PONG долго не приходит, то вот это уже явный таймаут.

Никто не оставляет открытым подключение, чтобы оно сидело и ждало своей очереди. Обычно реализуют переподключение (дозвон) и количество попыток (или бесконечный). Поэтому для твоей задачи достаточно жестоко закрывать сокеты "лишних" клиентов. Но для начала нужно им послать рекомендуемое время для переподключения основанное на позиции в очереди, чтобы все клиенты не ломанулись сразу на сервак и тем самым его угробили. У клиентов сделать настройки "использовать рекомендуемое время переподключения", "использовать пользовательское время переподключения". void QTcpServer::setMaxPendingConnections ( int numConnections ) - не используй, ОС может мурыжить сокеты еще долго не закрывая, лучше уж сделать это самому.

Конечно от "waitForReadyRead" надо отказаться в пользу событий. Неблокирующие сокеты используют метод WSAAsyncSelect, который использует систему сообщений Windows. Qt использует WSAAsyncSelect для получения сообщений от ОС и Overlapped I/O API, который в теории позволяет подключаться до 10000 клиентам на один сервер. Для Windows asio/boost::asio используют этот же механизм, но они не используют WSAAsyncSelect, а вместо него используют IOCP (Completion Port). Насколько я знаю это более предпочтительно, особенно для приложений без окон. Т.е. разница в доставке сообщений от ОС. Топик конечно старый, но на всякий случай прилагаю, человек спрашивает чем лучше IOCP по сравнению WSAAsyncSelect. Как бы там ни было система доставки сообщений не влияет на скорость скачивания, так как не приходит на каждый байт данных, при соответствующем размере буффера. Тем не менее я собираюсь написать suggestion троллям на перевод в сторону IOCP. Если у тебя хватит запалу, то я бы с удовольствием посмотрел на реализацию одного и того же сервера с применением Qt и asio. Еще бы понять где найти столько клиентов со всего мира, чтобы проверить нагрузку, скорость, стабильность и тому подобное.
Записан
pawok11
Гость
« Ответ #5 : Март 03, 2010, 11:45 »

Цитировать
Если свободных потоков нет, то запрос ставиться в очередь ожидающих на отправку.

Я, наверное, не четко сформулировал. "очередь ожидающих на отправку" это просто очередь, где храняться данные (ip, порт и данные для отправки). Здесь не создаются сокеты. Сокеты создаются только в потоке, когда появляется свободный поток.

Весь механиз кратко описать можно так. Появляется свободный поток, создается соединение, отсылается запрос, получается ответ, соединение закрывается, высылается сигнал, что поток свободен.

Не совсем правильно так делать, потому что соединение почти все вермя происходит с одними и теми же абонентами, но так как помимо tcp по такой же схеме используется еще, например, udp (и др.), то в целях универсализации была выбрана такая схема.

Еще важно, что заранее не известен адрес сервера, к которому будет обращаться клиент.
« Последнее редактирование: Март 03, 2010, 12:58 от pawok11 » Записан
Страниц: [1]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.086 секунд. Запросов: 21.