Добрый день, форумчане!
У меня стоит задача написать небольшой многопоточный сервер, обслуживающий 50-100 одновременных соединений.
На той стороне специальные устройства с симками, которые периодически подключаются к серверу и присылают данные по определенному протоколу. Сервер получает данные и пишет их в базу в определенном формате.
Пока решил писать 1 клиент - 1 поток через QTcpServer и QThread.
Уже есть написанное и более/менее работающее решение. Столкнулся с проблемой что много соединений виснет в состоянии close_wait, а поток при этом виснет на функции waitForReadyRead.
Вот некоторые участки кода:
class threadedTcpServer : public QTcpServer
{
Q_OBJECT
public:
threadedTcpServer(QObject *parent = 0);
public slots:
void startThreadedTcpServer(const QHostAddress & address = QHostAddress::Any, quint16 port = 0);
void errorFromThread(QString);
void infoFromThread(QString);
signals:
void error(QString);
void info(QString);
protected:
int threadCount;
void incomingConnection(int socketDescriptor);
QSqlDatabase pgconn;
};
void threadedTcpServer::incomingConnection(int socketDescriptor) {
emit(info("device open socket "+QString::number(socketDescriptor)));
this->threadCount++;
threadedtcpserverthread *thread = new threadedtcpserverthread(this->pgconn,socketDescriptor,this->threadCount,this);
connect(thread,SIGNAL(finished()), thread, SLOT(deleteLater()));
connect(thread,SIGNAL(info(QString)),this,SLOT(infoFromThread(QString)));
connect(thread,SIGNAL(error(QString)),this,SLOT(errorFromThread(QString)));
thread->start();
}
class threadedtcpserverthread : public QThread
{
Q_OBJECT
public:
threadedtcpserverthread(const QSqlDatabase &dbconn,const int &socketDescriptor,const int &threadID, QObject *parent);
void run();
public slots:
void socketStateChanged(QAbstractSocket::SocketState state);
signals:
void error(QString);
void info(QString);
protected:
int socketDescriptor;
int threadID;
QSqlDatabase dataBase;
void closeSocket(QTcpSocket *tcpSocket);
QTcpSocket *tcpSocket;
};
void threadedtcpserverthread::run() {
emit(info("started thread "+QString::number(this->threadID)));
// подключаемся к сокету
this->tcpSocket = new QTcpSocket(this);
connect(this->tcpSocket,SIGNAL(stateChanged(QAbstractSocket::SocketState)),this,SLOT(socketStateChanged(QAbstractSocket::SocketState)));
if (!this->tcpSocket->setSocketDescriptor(this->socketDescriptor)) {
emit(error("failed to open socket "+QString::number(this->threadID)));
return;
}
// читаем первый пакет
packet *pkg = new packet(this);
if (!pkg->readFromSocket(this->tcpSocket)) {
this->closeSocket(this->tcpSocket);
emit(error("failed read first package "+QString::number(this->threadID)));
return;
}
// еще проверки
...
// отправляем контрольную сумму
pkg->sendCRC(this->tcpSocket);
emit(info("fpread "+QString::number(pkg->countOfLines())+" lines from "+QString::number(deviceID)));
QMap<unsigned int,unsigned short> validCodes;
while (query.next()) validCodes[query.value(0).toInt()] = query.value(1).toInt();
bool breakFlag=false;
while(true){
packet *_pkg = new packet(this);
if (!_pkg->readFromSocket(this->tcpSocket) || !_pkg->countOfLines()) break;
this->dataBase.transaction();
// записываем данные в базу, выставляем breakFlag при ошибке
...
if (breakFlag) {
this->dataBase.rollback();
break;
}
this->dataBase.commit();
_pkg->sendCRC(this->tcpSocket);
emit(info("readed "+QString::number(_pkg->countOfLines())+" lines from "+QString::number(deviceID)));
}
emit(info("close thread "+QString::number(this->threadID)));
this->dataBase.close();
this->closeSocket(this->tcpSocket);
}
void threadedtcpserverthread::closeSocket(QTcpSocket *tcpSocket) {
tcpSocket->disconnectFromHost();
tcpSocket->waitForDisconnected();
}
class packet : public QObject
{
Q_OBJECT
public:
packet(QObject *parent);
bool readFromSocket(QTcpSocket *tcpSocket);
void sendCRC(QTcpSocket *tcpSocket);
unsigned short countOfLines();
void clean();
packetLine* getLine(const unsigned short &index);
QByteArray toHex() const;
protected:
char header;
unsigned short length;
bool isNoSendedInArh;
short checksumm;
void initLines(const QByteArray &taggedData);
QList<packetLine*> arLines;
QByteArray msg;
static const QByteArray codes8bit; //[22];
static const QByteArray codes16bit; //[27];
static const QByteArray codes32bit; //[13];
//static const char codes32bit[13];
private:
};
bool packet::readFromSocket(QTcpSocket *tcpSocket) {
this->clean();
if (!tcpSocket->isValid() || !tcpSocket->waitForReadyRead()) return false;
QByteArray buf;
buf = tcpSocket->read(1);
this->header = buf[0];
this->msg+=buf;
buf = tcpSocket->read(2);
this->length = ((buf[1] & 0x7F) << 8) + buf[0];
this->isNoSendedInArh = (buf[1] & 0x80) >> 7;
this->msg+=buf;
if (this->length) {
buf = tcpSocket->read(this->length);
this->initLines(buf);
this->msg+=buf;
}
buf = tcpSocket->read(2);
this->msg+=buf;
this->checksumm=buf[1]<<8 + buf[0];
return true;
}
Таким образом
void threadedTcpServer::incomingConnection(int socketDescriptor) создает поток, передает ему дескриптор на сокет и запускает поток.
threadedtcpserverthread::run() создает packet и читает данные из сокета.
bool packet::readFromSocket(QTcpSocket *tcpSocket) собственно и есть проблемная фунция.
Она на проверке if (!tcpSocket->isValid() || !tcpSocket->waitForReadyRead()) return false; частенько "подвешивает" ветку.
Tcpview показывает что данный сокет в состоянии close_wait. Хотел узнать: как с этим бороться?