Есть сервер, в котором каждое клиентское подключение обрабатывается в отдельном потоке.
Клиент посылает файл на сервер, где тот обрабатывается и по команде клиента с сервера посылается обратно.
Сделал так.
1.Принимаем данные с клиента и запихиваем их в файл.
Сначала с клиента передается посылка- инфа о файле(ИМЯ,РАЗМЕР),
в посылке типа
затем идут сами данные с файла - посылки типа
после удачного приема файла сервер высылает посылку-команду об успешном приеме.
2.Кидаем указатель на файл в буффер(список) принятых файлов.
3.Если в буффере принятых файлов есть что обрабатывать, то обрабатываем.
4.После обработки файла перекидываем указатель в буффер(список) на передачу, удалив указатель из буффера(список) принятых файлов.
5.По команде
с клиента передаем файл обратно на клиент.
(Все тоже самое что и п. №1, только выполняет сервер)
Обработка файла происходит в отдельном потоке.
Если при приеме команды от клиента в буффере на передачу есть что-то, то отсылаем.
Когда работает один клиент все замечательно.
Когда, подключено несколько клиентов, то при последовательной передаче файлов, т.е. сначала один клиент передал файл, потом другой передал свой файл работает тоже норм.
Но когда параллельно идет передача файлов с нескольких клиентов у меня не проходит проверка, что файл полностью передан.
if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка
C++ (Qt)
case TYPE_DATA_FILE:
if(headPack.sizePack + sizeof(head_pack) <= pData.size())
{
pData = pData.mid(sizeof(head_pack));
if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
m_pReceiveFile->close();
}
pData = pData.mid(headPack.sizePack);
{
head_pack headpck;
headpck.sizePack = sizeof(int);
headpck.typePack = TYPE_INFO_PROCESS;
headpck.sizePack2 = sizeof(int);
QByteArray ba_progress;
ba_progress.append((char*)&headpck,sizeof(head_pack));
//std::cout<<"blockSize = "<<blockSize;
ba_progress.append((char*)(&blockSize),sizeof(int));
m_pTcpSocket->write(ba_progress);
m_pTcpSocket->waitForBytesWritten(1);
}
if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка
{
head_pack headpck;
headpck.sizePack = 0;
headpck.typePack = CMD_SUCCESS_RECEIVE;
headpck.sizePack2 = 0;
QByteArray ba_success;
ba_success.append((char*)&headpck,sizeof(head_pack));
m_pTcpSocket->write(ba_success);
m_pTcpSocket->waitForBytesWritten(1);
blockSize = 0;
mutexListRxFiles.lock();
m_listRxFiles.append(m_pReceiveFile);
mutexListRxFiles.unlock();
m_pReceiveFile = NULL;
}
Получается что счетчик принятых байт не равняется размеру передаваемого файла.
Не пойму в чем может быть причина? Как преодалеть это
Вот код программы
C++ (Qt)
//TcpServer
void TcpServer::incomingConnection(int socketDescriptor)
{
ThreadServer *thread = new ThreadServer(socketDescriptor,0);
connect(thread,SIGNAL(fin()),this,SLOT(removeThread()));
listThreads.append(thread);
thread->start();
}
//ThreadServer
ThreadServer::ThreadServer(int socketdescriptor, QObject *parent) :
QThread(parent),
descriptor(socketdescriptor),
m_iPingCounting(0),
m_iTimerCnt(0),
m_pReceiveFile(NULL),
m_pTransmiteFile(NULL),
m_pThreadProcessData(NULL)
{
std::cout<<__FUNCTION__;
m_listTxFiles.clear();
m_listRxFiles.clear();
//moveToThread(this);
}
ThreadServer::~ThreadServer()
{
std::cout<<__FUNCTION__;
if(m_pThreadProcessData){
m_pThreadProcessData->setStopThread(true);
m_pThreadProcessData->quit();
m_pThreadProcessData->wait();
delete m_pThreadProcessData;
}
while(!m_listRxFiles.isEmpty()){
delete m_listRxFiles.at(0);
m_listRxFiles.removeAt(0);
}
while(!m_listTxFiles.isEmpty()){
delete m_listTxFiles.at(0);
m_listTxFiles.removeAt(0);
}
}
void ThreadServer::run()
{
m_pTcpSocket = new QTcpSocket();
if(!m_pTcpSocket->setSocketDescriptor(descriptor)){
Quit();
return;
}
QHostAddress addr = m_pTcpSocket->peerAddress();
QString str = QString("Ïîòîê ṗ %1 , óñòàíîâëåíî ñîåäèíåíèå ñ êëèåíòîì àäŵåñ %2\n").arg(descriptor).arg(addr.toString());
QByteArray ba;
ba.append(str);
std::cout << ba.data();
m_pThreadProcessData = new ThreadProcessData(&m_listRxFiles,&mutexListRxFiles,&m_listTxFiles,&mutexListTxFiles);
m_pThreadProcessData->start();
while(1){
msleep(5);
if(!TryPing()){
Quit();
return;
}
ParsDataRes();
}
}
void ThreadServer::Quit()
{
delete m_pTcpSocket;
emit fin();
}
bool ThreadServer::TryPing()
{
if(m_iTimerCnt <= MAX_TIMER_CNT){
m_iTimerCnt++;
//std::cout<<"timercnt = "<<m_iTimerCnt<<"\n";
return true;
}
else{
m_iPingCounting++;
//std::cout<<"pingcounting = "<<m_iPingCounting<<"\n";
if(m_iPingCounting <= MAX_PING_COUNTING){
//CMD_PING
head_pack headpck;
headpck.sizePack = 0;
headpck.typePack = CMD_PING;
headpck.sizePack2 = 0;
QByteArray ba_ping;
ba_ping.append((char*)&headpck,sizeof(head_pack));
m_pTcpSocket->write(ba_ping);
m_pTcpSocket->waitForBytesWritten(1);
return true;
}
else{
std::cout<<"!!!EXIT!!!"<<"\n";
return false;
}
}
}
void ThreadServer::ParsDataRes()
{
static int blockSize = 0;
m_pTcpSocket->waitForReadyRead(1);
pData.append( m_pTcpSocket->readAll() );
while(pData.size() != 0)
{
if(pData.size() < sizeof(head_pack))
return ;
m_iTimerCnt = m_iPingCounting = 0;
head_pack headPack;
memcpy((void*)(&headPack), (void *)(&pData.data()[0]), sizeof(head_pack));
if ( headPack.sizePack != headPack.sizePack2 )
{
pData = pData.mid( 1 );
continue;
}
switch(headPack.typePack)
{
case TYPE_INFO_FILE:
if(headPack.sizePack + sizeof(head_pack) <= pData.size())
{
pData = pData.mid(sizeof(head_pack));
if( sizeof(sfiledata) <= headPack.sizePack ){
memcpy((char *)&sfiledata,&pData.data()[0],headPack.sizePack);
pData = pData.mid(headPack.sizePack);
m_pReceiveFile = new QFile;
m_pReceiveFile->setFileName(QString(sfiledata.pFileName));
}
else{
}
}
else{
return;
}
break;
case TYPE_DATA_FILE:
if(headPack.sizePack + sizeof(head_pack) <= pData.size())
{
pData = pData.mid(sizeof(head_pack));
if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
m_pReceiveFile->close();
}
pData = pData.mid(headPack.sizePack);
{
head_pack headpck;
headpck.sizePack = sizeof(int);
headpck.typePack = TYPE_INFO_PROCESS;
headpck.sizePack2 = sizeof(int);
QByteArray ba_progress;
ba_progress.append((char*)&headpck,sizeof(head_pack));
//std::cout<<"blockSize = "<<blockSize;
ba_progress.append((char*)(&blockSize),sizeof(int));
m_pTcpSocket->write(ba_progress);
m_pTcpSocket->waitForBytesWritten(1);
}
if(sfiledata.nFileSize == blockSize)
{
head_pack headpck;
headpck.sizePack = 0;
headpck.typePack = CMD_SUCCESS_RECEIVE;
headpck.sizePack2 = 0;
QByteArray ba_success;
ba_success.append((char*)&headpck,sizeof(head_pack));
m_pTcpSocket->write(ba_success);
m_pTcpSocket->waitForBytesWritten(1);
blockSize = 0;
mutexListRxFiles.lock();
m_listRxFiles.append(m_pReceiveFile);
mutexListRxFiles.unlock();
m_pReceiveFile = NULL;
}
}
else{
return;
}
break;
case CMD_PING:
if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
pData = pData.mid(sizeof(head_pack));
m_iTimerCnt = m_iPingCounting = 0;
}
else{
return;
}
break;
case CMD_GET_READY_FILE:
if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
pData = pData.mid(sizeof(head_pack));
m_pTransmiteFile = NULL;
mutexListTxFiles.lock();
if(!m_listTxFiles.isEmpty()){
m_pTransmiteFile = m_listTxFiles.at(0);
m_listTxFiles.removeAt(0);
}
mutexListTxFiles.unlock();
TransmiteFile(m_pTransmiteFile);
}
else{
return;
}
break;
case CMD_SUCCESS_RECEIVE:
if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
pData = pData.mid(sizeof(head_pack));
if(m_pTransmiteFile){
m_pTransmiteFile->remove();
delete m_pTransmiteFile;
m_pTransmiteFile = NULL;
}
}
else{
return;
}
break;
default:
pData = pData.mid( 1 );
continue;
};
}
return ;
}
void ThreadServer::TransmiteFile(QFile *file)
{
//std::cout<<"TransmiteFile\n";
if(file == NULL){
head_pack headpck;
headpck.sizePack = 0;
headpck.typePack = CMD_ANS_EMPTY_LIST;
headpck.sizePack2 = 0;
QByteArray ba_empty_list;
ba_empty_list.append((char*)&headpck,sizeof(head_pack));
m_pTcpSocket->write(ba_empty_list);
m_pTcpSocket->waitForBytesWritten(1);
return;
}
{
sfiledata.nFileSize = file->size();
for(int i = 0; i < max_char; i++)
sfiledata.pFileName[i] = file->fileName().toAscii().data()[i];
QByteArray ba;
head_pack headPack;
headPack.sizePack = sizeof(sFileData);
headPack.typePack = TYPE_INFO_FILE;
headPack.sizePack2 = sizeof(sFileData);
ba.append((char *)(&headPack),sizeof(head_pack));
ba.append((char*)(&sfiledata),sizeof(sFileData));
m_pTcpSocket->write(ba);
m_pTcpSocket->waitForBytesWritten(1);
}
{
if(file->open(QFile::ReadWrite)){
file->seek(0);
while(!file->atEnd())
{
const QByteArray buf = file->read(MAX_SIZE_FRAME); //64Êáàéò
head_pack headPack;
headPack.sizePack = buf.size();
headPack.typePack = TYPE_DATA_FILE;
headPack.sizePack2 = buf.size();
QByteArray ba;
ba.append((char *)(&headPack),sizeof(headPack));
ba.append(buf);
m_pTcpSocket->write(ba);
m_pTcpSocket->waitForBytesWritten(1);
}
file->close();
}
}
}
//ThreadProcessData
ThreadProcessData::ThreadProcessData(QList<QFile *> *inFiles, QMutex *mutexInFiles, QList<QFile *> *outFiles, QMutex *mutexOutFiles, QObject *parent) :
QThread(parent),
p_listInFiles(inFiles),
p_mutexInFiles(mutexInFiles),
p_listOutFiles(outFiles),
p_mutexOutFiles(mutexOutFiles),
m_bStopThread(false)
{
//moveToThread(this);
}
ThreadProcessData::~ThreadProcessData()
{
}
void ThreadProcessData::setStopThread(bool flag)
{
mutexStopThread.lock();
m_bStopThread = flag;
mutexStopThread.unlock();
}
bool ThreadProcessData::stopThread()
{
bool flag;
mutexStopThread.lock();
flag = m_bStopThread ;
mutexStopThread.unlock();
return flag;
}
void ThreadProcessData::run()
{
while(!stopThread()){
msleep(5);
if(!isEmptyListInFiles()){
QFile *processFile;
p_mutexInFiles->lock();
processFile = p_listInFiles->at(0);
p_mutexInFiles->unlock();
if(processFile->open(QIODevice::ReadWrite))
{
int lAB = 0;
int valB = 0;
int valG = 0;
int valR = 0;
int correction = 100;//koef correction
int L = 256;
int b[L];
QImage image(processFile->fileName());
for(int i = 0; i < image.size().height(); i++){
for(int j = 0; j < image.size().width(); j++){
QRgb rgb = image.pixel(QPoint(j,i));
QColor color(rgb);
valB = color.blue();
valG = color.green();
valR = color.red();
lAB += (int)(valR * 0.299 + valG * 0.587 + valB * 0.114);
}
}
lAB /= image.size().height() * image.size().width();
double k = 1.0 + correction / 100.0;
for (int i = 0; i < L; i++)
{
int delta = (int)i - lAB;
int temp = (int)(lAB + k *delta);
if (temp < 0) temp = 0;
if (temp >= 255) temp = 255;
b[i] = (unsigned char)temp;
}
//
for (int j = 0; j < image.byteCount(); j++)
{
unsigned char value = image.bits()[j];
image.bits()[j] = (unsigned char)b[value];
}
QImageWriter writer(processFile->fileName());
writer.write(image);
}
processFile->close();
p_mutexInFiles->lock();
p_listInFiles->removeAt(0);
p_mutexInFiles->unlock();
p_mutexOutFiles->lock();
p_listOutFiles->append(processFile);
p_mutexOutFiles->unlock();
}
}
}
bool ThreadProcessData::isEmptyListInFiles()
{
bool isEmpty;
p_mutexInFiles->lock();
isEmpty = p_listInFiles->isEmpty();
p_mutexInFiles->unlock();
return isEmpty;
}