Russian Qt Forum
Ноябрь 22, 2024, 20:14 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1]   Вниз
  Печать  
Автор Тема: Многопотчный сервер  (Прочитано 3800 раз)
demaker
Птица говорун
*****
Offline Offline

Сообщений: 962


Просмотр профиля
« : Август 25, 2017, 18:05 »

Есть сервер, в котором каждое клиентское подключение обрабатывается в отдельном потоке.
Клиент посылает файл на сервер, где тот обрабатывается и по команде клиента с сервера посылается обратно.

Сделал так.
1.Принимаем данные с клиента и запихиваем их в файл.
Сначала с клиента передается посылка- инфа о файле(ИМЯ,РАЗМЕР),
в посылке типа
Код:
  TYPE_INFO_FILE 
затем идут сами данные с файла - посылки типа
Код:
 TYPE_DATA_FILE 
после удачного приема файла сервер высылает посылку-команду об успешном приеме.
Код:
 CMD_SUCCESS_RECEIVE 

2.Кидаем указатель на файл в буффер(список) принятых файлов.

3.Если  в буффере принятых файлов есть что обрабатывать, то обрабатываем.

4.После обработки файла перекидываем указатель в буффер(список) на передачу, удалив указатель из буффера(список) принятых файлов.

5.По команде
Код:
CMD_GET_READY_FILE
с клиента передаем файл обратно на клиент.
(Все тоже самое что и п. №1, только выполняет сервер)

Обработка файла происходит в отдельном потоке.
Если при приеме команды от клиента в буффере на передачу есть что-то, то отсылаем.

Когда работает один клиент все замечательно.
Когда, подключено несколько клиентов, то при последовательной передаче файлов, т.е. сначала один клиент передал файл, потом другой передал свой файл работает тоже норм.

Но когда параллельно идет передача файлов с нескольких клиентов у меня не проходит проверка, что файл полностью передан.
Код:
if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка

Код
C++ (Qt)
case TYPE_DATA_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
 
               if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
                    blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
                    m_pReceiveFile->close();
               }
 
               pData = pData.mid(headPack.sizePack);
 
               {
                   head_pack headpck;
                   headpck.sizePack = sizeof(int);
                   headpck.typePack = TYPE_INFO_PROCESS;
                   headpck.sizePack2 = sizeof(int);
                   QByteArray ba_progress;
                   ba_progress.append((char*)&headpck,sizeof(head_pack));
                   //std::cout<<"blockSize = "<<blockSize;
                   ba_progress.append((char*)(&blockSize),sizeof(int));
                   m_pTcpSocket->write(ba_progress);
                   m_pTcpSocket->waitForBytesWritten(1);
               }
 
               if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка
               {
                   head_pack headpck;
                   headpck.sizePack = 0;
                   headpck.typePack = CMD_SUCCESS_RECEIVE;
                   headpck.sizePack2 = 0;
                   QByteArray ba_success;
                   ba_success.append((char*)&headpck,sizeof(head_pack));
                   m_pTcpSocket->write(ba_success);
                   m_pTcpSocket->waitForBytesWritten(1);
 
                   blockSize = 0;        
 
                   mutexListRxFiles.lock();
                   m_listRxFiles.append(m_pReceiveFile);
                   mutexListRxFiles.unlock();
                   m_pReceiveFile = NULL;
               }
 

Получается что счетчик принятых байт не равняется размеру передаваемого файла.
Не пойму в чем может быть причина? Как преодалеть это Непонимающий

Вот код программы
Код
C++ (Qt)
 
//TcpServer
void TcpServer::incomingConnection(int socketDescriptor)
{
   ThreadServer *thread = new ThreadServer(socketDescriptor,0);
   connect(thread,SIGNAL(fin()),this,SLOT(removeThread()));
   listThreads.append(thread);
   thread->start();
}
 
 
//ThreadServer
ThreadServer::ThreadServer(int socketdescriptor, QObject *parent) :
   QThread(parent),
   descriptor(socketdescriptor),
   m_iPingCounting(0),
   m_iTimerCnt(0),
   m_pReceiveFile(NULL),
   m_pTransmiteFile(NULL),
   m_pThreadProcessData(NULL)
{
   std::cout<<__FUNCTION__;
 
   m_listTxFiles.clear();
   m_listRxFiles.clear();
 
   //moveToThread(this);
 
}
 
ThreadServer::~ThreadServer()
{
   std::cout<<__FUNCTION__;
 
   if(m_pThreadProcessData){
       m_pThreadProcessData->setStopThread(true);
       m_pThreadProcessData->quit();
       m_pThreadProcessData->wait();
       delete m_pThreadProcessData;
   }
 
   while(!m_listRxFiles.isEmpty()){
       delete m_listRxFiles.at(0);
       m_listRxFiles.removeAt(0);
   }
 
   while(!m_listTxFiles.isEmpty()){
       delete m_listTxFiles.at(0);
       m_listTxFiles.removeAt(0);
   }
 
}
 
void ThreadServer::run()
{
   m_pTcpSocket = new QTcpSocket();
 
   if(!m_pTcpSocket->setSocketDescriptor(descriptor)){
       Quit();
       return;
   }
 
   QHostAddress addr = m_pTcpSocket->peerAddress();
   QString str = QString("Ïîòîê ṗ %1 , óñòàíîâëåíî ñîåäèíåíèå ñ êëèåíòîì àäŵåñ %2\n").arg(descriptor).arg(addr.toString());
   QByteArray ba;
   ba.append(str);
   std::cout << ba.data();  
 
   m_pThreadProcessData = new ThreadProcessData(&m_listRxFiles,&mutexListRxFiles,&m_listTxFiles,&mutexListTxFiles);
   m_pThreadProcessData->start();
 
   while(1){
       msleep(5);
 
       if(!TryPing()){
           Quit();
           return;
       }
       ParsDataRes();        
   }
}
 
void ThreadServer::Quit()
{
   delete m_pTcpSocket;
   emit fin();
}
 
bool ThreadServer::TryPing()
{
   if(m_iTimerCnt <= MAX_TIMER_CNT){
       m_iTimerCnt++;
       //std::cout<<"timercnt = "<<m_iTimerCnt<<"\n";
       return true;
   }
   else{
       m_iPingCounting++;
       //std::cout<<"pingcounting = "<<m_iPingCounting<<"\n";
       if(m_iPingCounting <= MAX_PING_COUNTING){
           //CMD_PING
           head_pack headpck;
           headpck.sizePack = 0;
           headpck.typePack = CMD_PING;
           headpck.sizePack2 = 0;
           QByteArray ba_ping;
           ba_ping.append((char*)&headpck,sizeof(head_pack));
           m_pTcpSocket->write(ba_ping);
           m_pTcpSocket->waitForBytesWritten(1);
           return true;
       }
       else{
           std::cout<<"!!!EXIT!!!"<<"\n";
           return false;
       }
   }
}
 
void ThreadServer::ParsDataRes()
{
   static int blockSize = 0;
 
   m_pTcpSocket->waitForReadyRead(1);
   pData.append( m_pTcpSocket->readAll() );
 
   while(pData.size() != 0)
   {
       if(pData.size() < sizeof(head_pack))
           return ;
 
       m_iTimerCnt = m_iPingCounting = 0;
 
       head_pack headPack;
       memcpy((void*)(&headPack), (void *)(&pData.data()[0]), sizeof(head_pack));
 
       if ( headPack.sizePack != headPack.sizePack2 )
       {
           pData = pData.mid( 1 );
           continue;
       }
 
       switch(headPack.typePack)
       {
 
       case TYPE_INFO_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
               if( sizeof(sfiledata) <= headPack.sizePack ){
                   memcpy((char *)&sfiledata,&pData.data()[0],headPack.sizePack);
                   pData = pData.mid(headPack.sizePack);
 
                   m_pReceiveFile = new QFile;
                   m_pReceiveFile->setFileName(QString(sfiledata.pFileName));
 
               }
               else{                  
               }                
           }
           else{
               return;
           }
           break;
 
       case TYPE_DATA_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
 
               if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
                    blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
                    m_pReceiveFile->close();
               }
 
               pData = pData.mid(headPack.sizePack);
 
               {
                   head_pack headpck;
                   headpck.sizePack = sizeof(int);
                   headpck.typePack = TYPE_INFO_PROCESS;
                   headpck.sizePack2 = sizeof(int);
                   QByteArray ba_progress;
                   ba_progress.append((char*)&headpck,sizeof(head_pack));
                   //std::cout<<"blockSize = "<<blockSize;
                   ba_progress.append((char*)(&blockSize),sizeof(int));
                   m_pTcpSocket->write(ba_progress);
                   m_pTcpSocket->waitForBytesWritten(1);
               }
 
               if(sfiledata.nFileSize == blockSize)
               {
                   head_pack headpck;
                   headpck.sizePack = 0;
                   headpck.typePack = CMD_SUCCESS_RECEIVE;
                   headpck.sizePack2 = 0;
                   QByteArray ba_success;
                   ba_success.append((char*)&headpck,sizeof(head_pack));
                   m_pTcpSocket->write(ba_success);
                   m_pTcpSocket->waitForBytesWritten(1);
 
                   blockSize = 0;        
 
                   mutexListRxFiles.lock();
                   m_listRxFiles.append(m_pReceiveFile);
                   mutexListRxFiles.unlock();
                   m_pReceiveFile = NULL;
               }
           }
           else{
               return;
           }
           break;
 
       case CMD_PING:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
               pData = pData.mid(sizeof(head_pack));              
 
               m_iTimerCnt = m_iPingCounting = 0;
           }
           else{
               return;
           }
           break;
 
       case CMD_GET_READY_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
               pData = pData.mid(sizeof(head_pack));
 
               m_pTransmiteFile = NULL;
               mutexListTxFiles.lock();
               if(!m_listTxFiles.isEmpty()){
                   m_pTransmiteFile = m_listTxFiles.at(0);
                   m_listTxFiles.removeAt(0);
               }
               mutexListTxFiles.unlock();
               TransmiteFile(m_pTransmiteFile);
           }
           else{
               return;
           }
           break;
 
       case CMD_SUCCESS_RECEIVE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
                 pData = pData.mid(sizeof(head_pack));
 
 
                 if(m_pTransmiteFile){
                     m_pTransmiteFile->remove();
                     delete m_pTransmiteFile;
                     m_pTransmiteFile = NULL;
                 }
           }
           else{
               return;
           }
           break;
       default:
           pData = pData.mid( 1 );
           continue;
       };
   }
   return ;
 
}
 
void ThreadServer::TransmiteFile(QFile *file)
{
   //std::cout<<"TransmiteFile\n";
   if(file == NULL){
 
       head_pack headpck;
       headpck.sizePack = 0;
       headpck.typePack = CMD_ANS_EMPTY_LIST;
       headpck.sizePack2 = 0;
       QByteArray ba_empty_list;
       ba_empty_list.append((char*)&headpck,sizeof(head_pack));
       m_pTcpSocket->write(ba_empty_list);
       m_pTcpSocket->waitForBytesWritten(1);
       return;
   }
 
   {
       sfiledata.nFileSize = file->size();
       for(int i = 0; i < max_char; i++)
           sfiledata.pFileName[i] = file->fileName().toAscii().data()[i];
 
       QByteArray ba;
       head_pack headPack;
       headPack.sizePack = sizeof(sFileData);
       headPack.typePack = TYPE_INFO_FILE;
       headPack.sizePack2 = sizeof(sFileData);
       ba.append((char *)(&headPack),sizeof(head_pack));
       ba.append((char*)(&sfiledata),sizeof(sFileData));
       m_pTcpSocket->write(ba);
       m_pTcpSocket->waitForBytesWritten(1);
   }
 
   {
   if(file->open(QFile::ReadWrite)){
       file->seek(0);
       while(!file->atEnd())
       {
         const QByteArray buf = file->read(MAX_SIZE_FRAME); //64Êáàéò
         head_pack headPack;
         headPack.sizePack = buf.size();
         headPack.typePack = TYPE_DATA_FILE;
         headPack.sizePack2 = buf.size();
 
         QByteArray ba;
         ba.append((char *)(&headPack),sizeof(headPack));
         ba.append(buf);
         m_pTcpSocket->write(ba);
         m_pTcpSocket->waitForBytesWritten(1);
 
       }
       file->close();
   }
   }
}
 
//ThreadProcessData
ThreadProcessData::ThreadProcessData(QList<QFile *> *inFiles, QMutex *mutexInFiles, QList<QFile *> *outFiles, QMutex *mutexOutFiles, QObject *parent) :
   QThread(parent),
   p_listInFiles(inFiles),
   p_mutexInFiles(mutexInFiles),
   p_listOutFiles(outFiles),
   p_mutexOutFiles(mutexOutFiles),
   m_bStopThread(false)
{    
   //moveToThread(this);
}
 
ThreadProcessData::~ThreadProcessData()
{
}
 
void ThreadProcessData::setStopThread(bool flag)
{
   mutexStopThread.lock();
   m_bStopThread = flag;
   mutexStopThread.unlock();
}
 
bool ThreadProcessData::stopThread()
{
   bool flag;
   mutexStopThread.lock();
   flag = m_bStopThread ;
   mutexStopThread.unlock();
   return flag;
}
 
 
void ThreadProcessData::run()
{
   while(!stopThread()){        
       msleep(5);
       if(!isEmptyListInFiles()){
           QFile *processFile;
           p_mutexInFiles->lock();
           processFile = p_listInFiles->at(0);
           p_mutexInFiles->unlock();
 
           if(processFile->open(QIODevice::ReadWrite))
           {
               int lAB = 0;
               int valB = 0;
               int valG = 0;
               int valR = 0;
 
               int correction = 100;//koef correction
               int L = 256;
               int b[L];
 
               QImage image(processFile->fileName());
 
               for(int i = 0; i < image.size().height(); i++){
                   for(int j = 0; j < image.size().width(); j++){
                       QRgb rgb = image.pixel(QPoint(j,i));
                       QColor color(rgb);
                       valB = color.blue();
                       valG = color.green();
                       valR = color.red();
                       lAB += (int)(valR * 0.299 + valG * 0.587 + valB * 0.114);
                   }
               }                
 
               lAB /= image.size().height() * image.size().width();
 
 
               double k = 1.0 + correction / 100.0;
 
               for (int i = 0; i < L; i++)
               {
                   int delta = (int)i - lAB;
                   int temp  = (int)(lAB + k *delta);
 
                   if (temp < 0)   temp = 0;
                   if (temp >= 255)    temp = 255;
 
                   b[i] = (unsigned char)temp;
               }
 
               //
               for (int j = 0; j < image.byteCount(); j++)
               {
                   unsigned char value = image.bits()[j];
                   image.bits()[j] = (unsigned char)b[value];
               }
 
               QImageWriter writer(processFile->fileName());
               writer.write(image);
 
           }
           processFile->close();
 
           p_mutexInFiles->lock();
           p_listInFiles->removeAt(0);
           p_mutexInFiles->unlock();
 
           p_mutexOutFiles->lock();
           p_listOutFiles->append(processFile);
           p_mutexOutFiles->unlock();
       }      
   }
}
 
bool ThreadProcessData::isEmptyListInFiles()
{
   bool isEmpty;
   p_mutexInFiles->lock();
   isEmpty = p_listInFiles->isEmpty();
   p_mutexInFiles->unlock();
   return isEmpty;
}
 
« Последнее редактирование: Август 26, 2017, 19:48 от demaker » Записан
demaker
Птица говорун
*****
Offline Offline

Сообщений: 962


Просмотр профиля
« Ответ #1 : Август 31, 2017, 14:38 »

Выяснил в чем ошибка.
Скажите, если есть класс потока.
В нем есть метод, в котором объявленна статическая переменная.
То для каждого экземпляра класса данная переменная будет своя или одна для всех(экземпляров класса)?
Код
C++ (Qt)
class Thread: public QThread
{
...
protected:
      void run();
 
private:
     void foo();
 
}
 
void Thread::foo()
{
 
  static int value = 0;
  ....
 
  value++;
 
}
 
void Thread::run()
{
   forever(){
      msleep(5);
      foo();
  }
 
}
 

В моем случае получилось что одна для всех экземпляров класса. Почему  Непонимающий не знаю.
Поэтому проверка и не проходила.
Хотя я думал что для каждого экземпляра класса будет своя переменная.

Записан
sergek
Гипер активный житель
*****
Offline Offline

Сообщений: 872


Мы должны приносить пользу людям.


Просмотр профиля
« Ответ #2 : Август 31, 2017, 16:24 »

Так статическая же. Создается одна при компиляции.
А потоки используют одно адресное пространство.
Записан

Qt 5.13.0 Qt Creator 5.0.1
Win10, Ubuntu 20.04
demaker
Птица говорун
*****
Offline Offline

Сообщений: 962


Просмотр профиля
« Ответ #3 : Август 31, 2017, 18:31 »

Так статическая же. Создается одна при компиляции.
А потоки используют одно адресное пространство.
Да Обеспокоенный балбес я!
Записан
Страниц: [1]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.394 секунд. Запросов: 22.