QList<QThread*> clients; ... void DispatcherThread::run(){ QMapIterator<QString,int> itrNumber(*numberOfThreads); // содержит число запускаемых потоков по типам (tcp,udp и т.д.) //создание пула потоков while (itrNumber.hasNext()) { itrNumber.next(); if (itrNumber.key() == "tcp") { for (int i=0; i<itrNumber.value()-1; i++) { typesList << "tcp"; occupiedList << false; // флаг поток занят (обрабатывает поток) signatures << RequestSignature(); // данные для запроса (пустой конструктор) TcpClientThread *tcpth = new TcpClientThread(); connect(tcpth,SIGNAL(data()),this,SLOT(slotData())); connect(tcpth,SIGNAL(timeout()),this,SLOT(slotTimeout())); connect(tcpth,SIGNAL(nodata()),this,SLOT(slotNoData())); clients << tcpth; } } else if (itrNumber.key() == другие типы) { } } while (!stopped) { mutex.lock(); if (!requestQueue->isEmpty())// очередь запросов { RequestSignature signature = requestQueue->dequeue(); mutex.unlock(); QString abonent = signature.getAbonent(); QString ip = abonents->ips->value(abonent); QString mac = abonents->macs->value(abonent); quint16 port = abonents->ports->value(abonent); QString type = abonents->types->value(abonent); QString message = signature.getMessage(); ConfigProtocol* pProtocol = protocols->value(signature.getProtocol()); bool findNotOccupied = false; // если есть свободный поток заданного типа, то занимаем его и запускаем на обработку данных for (int i=0; i<typesList.size(); i++) { if (type == typesList[i]) { if (!occupiedList[i]) { if (type == "tcp") { ((TcpClientThread*)(clients[i]))->setProcess(ip,port,message,pProtocol); ((TcpClientThread*)(clients[i]))->start(); } else if (type == другие типы) { ... } else { break; } signatures[i] = signature; occupiedList[i] = true; findNotOccupied = true; break; } } } // если свободного потока нет, то ставим в очередь ожидающих if (!findNotOccupied) { waitRequestQueue[type].enqueue(signature); } } else { mutex.unlock(); } }} void DispatcherThread::slotData(){ QThread *thread = (QThread*)sender(); int index = clients.indexOf(thread); if (index != -1) { processRequestData(index); }}void DispatcherThread::slotTimeout(){ QThread *thread = (QThread*)sender(); int index = clients.indexOf(thread); if (index != -1) { processRequestData(index); }}void DispatcherThread::slotNoData(){ QThread *thread = (QThread*)sender(); int index = clients.indexOf(thread); if (index != -1) { processRequestData(index); }} void DispatcherThread::processRequestData(int aIndex){ QThread *thread = clients[aIndex]; QMap<QString,QString> data; if (typesList[aIndex] == "tcp") { data = ((TcpClientThread*)(thread))->getData(); } else if (typesList[aIndex] == другие типы) { ... } else { return; } // дальнейшая обработка данных ... if (typesList[aIndex] == "tcp") { ((TcpClientThread*)(thread))->clearData(); } else if (typesList[aIndex] == другие типы) { ... } else { return; } // просматриваем очередь ожидающих и если она не пуста, берем один запрос и его обрабатываем if (waitRequestQueue[typesList[aIndex]].size() > 0) { RequestSignature signature = waitRequestQueue[typesList[aIndex]].dequeue(); QString abonent = signature.getAbonent(); QString ip = abonents->ips->value(abonent); QString mac = abonents->macs->value(abonent); quint16 port = abonents->ports->value(abonent); QString type = abonents->types->value(abonent); QString message = signature.getMessage(); ConfigProtocol* pProtocol = protocols->value(signature.getProtocol()); if (type == "tcp") { ((TcpClientThread*)(clients[aIndex]))->setProcess(ip,port,message,pProtocol); ((TcpClientThread*)(clients[aIndex]))->start(); } else if (type == дргуие типы) { ... } else { return; } signatures[aIndex] = signature; occupiedList[aIndex] = true; } else { occupiedList[aIndex] = false; }}
void TcpClientThread::run(){ QTcpSocket socket; socket.connectToHost(ip,port); if (socket.waitForConnected(Global::NormalTimeout)) { emit message(tr("Успешное подключение (TCP) к ip '")+ip); QByteArray outBlock = Type::toByteArray(protocol->mesReqTypes->value(message)); socket.write(outBlock); if (protocol->mesAnsProperties->value(message).size() > 0) { if (socket.waitForReadyRead(Global::NormalTimeout)) { if (socket.bytesAvailable() == Type::size(protocol->mesAnsTypes->value(message))) { QByteArray inBlock = socket.readAll(); setData(dataBuffer,inBlock); emit data(); } else { emit message(tr("(TCP) Размер ответного пакета не соответствует ожидаемому")); emit nodata(); } } else { emit message(tr("Таймаут ожидания ответа от (TCP) ip '")+ip); emit timeout(); } } } else { emit logMessage(tr("Таймаут подключения к (TCP) ip '")+ip); emit timeout(); } socket.disconnectFromHost(); socket.close();}
void DispatcherServerThread::run(){ for (int i=0; i<confWorkplace->serversServers->size(); i++) { if (confWorkplace->serversTypes->value(i) == "tcp") { QString serverName = confWorkplace->serversServers->value(i); uint port = confWorkplace->serversPorts->value(i); TcpServer *server = new TcpServer(servers->value(serverName),protocols,0); if (!server->listen(QHostAddress::Any,port)) { emit setMessage(tr("Не удалось запустить сервер TCP '"); continue; } emit setMessage(tr("Сервер TCP запущен.")); } } exec();}void TcpServer::incomingConnection(int socketDescriptor){ TcpServerThread *thread = new TcpServerThread(socketDescriptor,server,protocols, messagesSizes,messagesEvals,dataToThreads,this); connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater())); thread->start();}
void TcpServerThread::run(){ QTcpSocket tcpSocket; if (!tcpSocket.setSocketDescriptor(socketDescriptor)) { return; } if (tcpSocket.waitForReadyRead(Global::NormalTimeout)) { QByteArray inBlock = tcpSocket.readAll(); QByteArray outBlock = processing(inBlock); tcpSocket.write(outBlock); tcpSocket.waitForBytesWritten(Global::NormalTimeout); emit setMessage(tr("Отправлен ответ (TCP) на ip '")+tcpSocket.peerAddress().toString()+ tr("' порт '")+QString("%1").arg(tcpSocket.peerPort())+ tr("' (сообщение '")+currentMessage.second+ tr("' протокола '")+currentMessage.first+"')"); } tcpSocket.disconnectFromHost(); tcpSocket.close();}
C++ (Qt) if (tcpSocket.waitForReadyRead(Global::NormalTimeout)) { QByteArray inBlock = tcpSocket.readAll();
C++ (Qt)tcpSocket.waitForReadyRead(Global::NormalTimeout)