Добрый день, форумчане!
У меня стоит задача написать небольшой многопоточный сервер, обслуживающий 50-100 одновременных соединений.
На той стороне специальные устройства с симками, которые периодически подключаются к серверу и присылают данные по определенному протоколу. Сервер получает данные и пишет их в базу в определенном формате.
Пока решил писать 1 клиент - 1 поток через QTcpServer и QThread.
Уже есть написанное и более/менее работающее решение. Столкнулся с проблемой что много соединений виснет в состоянии close_wait, а поток при этом виснет на функции waitForReadyRead.
Вот некоторые участки кода:
class threadedTcpServer : public QTcpServer
{
Q_OBJECT
public:
threadedTcpServer(QObject *parent = 0);
public slots:
void startThreadedTcpServer(const QHostAddress & address = QHostAddress::Any, quint16 port = 0);
void errorFromThread(QString);
void infoFromThread(QString);
signals:
void error(QString);
void info(QString);
protected:
int threadCount;
void incomingConnection(int socketDescriptor);
QSqlDatabase pgconn;
};
void threadedTcpServer::incomingConnection(int socketDescriptor) {
emit(info("device open socket "+QString::number(socketDescriptor)));
this->threadCount++;
threadedtcpserverthread *thread = new threadedtcpserverthread(this->pgconn,socketDescriptor,this->threadCount,this);
connect(thread,SIGNAL(finished()), thread, SLOT(deleteLater()));
connect(thread,SIGNAL(info(QString)),this,SLOT(infoFromThread(QString)));
connect(thread,SIGNAL(error(QString)),this,SLOT(errorFromThread(QString)));
thread->start();
}
class threadedtcpserverthread : public QThread
{
Q_OBJECT
public:
threadedtcpserverthread(const QSqlDatabase &dbconn,const int &socketDescriptor,const int &threadID, QObject *parent);
void run();
public slots:
void socketStateChanged(QAbstractSocket::SocketState state);
signals:
void error(QString);
void info(QString);
protected:
int socketDescriptor;
int threadID;
QSqlDatabase dataBase;
void closeSocket(QTcpSocket *tcpSocket);
QTcpSocket *tcpSocket;
};
void threadedtcpserverthread::run() {
emit(info("started thread "+QString::number(this->threadID)));
// подключаемся к сокету
this->tcpSocket = new QTcpSocket(this);
connect(this->tcpSocket,SIGNAL(stateChanged(QAbstractSocket::SocketState)),this,SLOT(socketStateChanged(QAbstractSocket::SocketState)));
if (!this->tcpSocket->setSocketDescriptor(this->socketDescriptor)) {
emit(error("failed to open socket "+QString::number(this->threadID)));
return;
}
// читаем первый пакет
packet *pkg = new packet(this);
if (!pkg->readFromSocket(this->tcpSocket)) {
this->closeSocket(this->tcpSocket);
emit(error("failed read first package "+QString::number(this->threadID)));
return;
}
// еще проверки
...
// отправляем контрольную сумму
pkg->sendCRC(this->tcpSocket);
emit(info("fpread "+QString::number(pkg->countOfLines())+" lines from "+QString::number(deviceID)));
QMap<unsigned int,unsigned short> validCodes;
while (query.next()) validCodes[query.value(0).toInt()] = query.value(1).toInt();
bool breakFlag=false;
while(true){
packet *_pkg = new packet(this);
if (!_pkg->readFromSocket(this->tcpSocket) || !_pkg->countOfLines()) break;
this->dataBase.transaction();
// записываем данные в базу, выставляем breakFlag при ошибке
...
if (breakFlag) {
this->dataBase.rollback();
break;
}
this->dataBase.commit();
_pkg->sendCRC(this->tcpSocket);
emit(info("readed "+QString::number(_pkg->countOfLines())+" lines from "+QString::number(deviceID)));
}
emit(info("close thread "+QString::number(this->threadID)));
this->dataBase.close();
this->closeSocket(this->tcpSocket);
}
void threadedtcpserverthread::closeSocket(QTcpSocket *tcpSocket) {
tcpSocket->disconnectFromHost();
tcpSocket->waitForDisconnected();
}
class packet : public QObject
{
Q_OBJECT
public:
packet(QObject *parent);
bool readFromSocket(QTcpSocket *tcpSocket);
void sendCRC(QTcpSocket *tcpSocket);
unsigned short countOfLines();
void clean();
packetLine* getLine(const unsigned short &index);
QByteArray toHex() const;
protected:
char header;
unsigned short length;
bool isNoSendedInArh;
short checksumm;
void initLines(const QByteArray &taggedData);
QList<packetLine*> arLines;
QByteArray msg;
static const QByteArray codes8bit; //[22];
static const QByteArray codes16bit; //[27];
static const QByteArray codes32bit; //[13];
//static const char codes32bit[13];
private:
};
bool packet::readFromSocket(QTcpSocket *tcpSocket) {
this->clean();
if (!tcpSocket->isValid() || !tcpSocket->waitForReadyRead()) return false;
QByteArray buf;
buf = tcpSocket->read(1);
this->header = buf[0];
this->msg+=buf;
buf = tcpSocket->read(2);
this->length = ((buf[1] & 0x7F) << 8) + buf[0];
this->isNoSendedInArh = (buf[1] & 0x80) >> 7;
this->msg+=buf;
if (this->length) {
buf = tcpSocket->read(this->length);
this->initLines(buf);
this->msg+=buf;
}
buf = tcpSocket->read(2);
this->msg+=buf;
this->checksumm=buf[1]<<8 + buf[0];
return true;
}
Таким образом
void threadedTcpServer::incomingConnection(int socketDescriptor) создает поток, передает ему дескриптор на сокет и запускает поток.
threadedtcpserverthread::run() создает packet и читает данные из сокета.
bool packet::readFromSocket(QTcpSocket *tcpSocket) собственно и есть проблемная фунция.
Она на проверке if (!tcpSocket->isValid() || !tcpSocket->waitForReadyRead()) return false; частенько "подвешивает" ветку.
Tcpview показывает что данный сокет в состоянии close_wait. Хотел узнать: как с этим бороться?
доброго времени суток.
неужели действительно никто не сталкивался?
я уже несколько дней нахожусь в тщетных поисках: перечитал асистант, гугл, форум и ничего не понимаю.
мне хотябы понять в чем проблема: я непонятно сформулировал тему/мой код с ошибкой/мой код изначально неверный/проблемы в реализации на стороне клиентов.
совершенно не понимаю куда двигаться дальше.
буду очень благодарен за любую информацию.
Решено.
Проблема оказалась в строке:
this->length = ((buf[1] & 0x7F) << 8) + buf[0];
которая должна выглядеть так:
this->length = ((buf[1] & 0x7F) << 8) + (buf[0]&0xFF);
ну а суть проблемы примерно такая:
1. неверно считывалась длина пакета и как следствие пакет читался не полностью
2. начинался новый цикл ожидания данных от клиента
3. данные никогда не приходили, по причине "непустого" сокета
4. через некоторое время клиент самостоятельно разрывал "подвисшее" соединение, устанавливая close_wait