Russian Qt Forum

Qt => Работа с сетью => Тема начата: sLiva от Апрель 29, 2009, 00:38



Название: Работа QTcpServer в QThread
Отправлено: sLiva от Апрель 29, 2009, 00:38
Всем здравствуйте!

Qt SDK 2009.02
openSUSE 11.1 / Vista

Помогите с реализацией следующей схемы:
Хотелось бы, чтобы объект QTcpServer существовал и работал в отдельном потоке. Для этого создаю наследника QThread:
Код:
class ServerThread : public QThread
{
    Q_OBJECT
    Q_DISABLE_COPY(ServerThread)

public:
    explicit ServerThread(QObject *parent = 0);
    virtual ~ServerThread();

public slots:
    void startServer();

protected:
    virtual void run();

private:
    QMutex mutex;

    Server *server; // TCP сервер
};

//------------------------------------------------------------------------------
ServerThread::ServerThread(QObject *parent) :
        QThread(parent),
        server(0)
{
}

ServerThread::~ServerThread()
{
    Q_CHECK_PTR(server);

    server->deleteLater();

    quit();
    wait();
}

void ServerThread::run()
{
    qDebug() << "Начало работы серверного потока TID = "
            << QThread::currentThreadId();

    server = new Server();

    exec();

    qDebug() << "Завершение работы серверного потока TID = "
            << QThread::currentThreadId();
}

void ServerThread::startServer()
{
    qDebug() << "Запуск TCP сервера TID = " << QThread::currentThreadId();

    QMutexLocker locker(&mutex);

    if (!server->isListening()) {
        if (!server->listen(QHostAddress::Any, 12345)) {
            qDebug() << "Ошибка: " << server->errorString();
            server->close();
        }
    } else {
        return;
    }

    qDebug() << "TCP сервер запущен";
}

Связываю сигнал с кнопки в GUI со слотом startServer()

Вывод debug консоли:
Код:
Главный поток TID = 0x1058 
Начало работы серверного потока TID = 0x1518
Запуск TCP сервера TID = 0x1058
QObject: Cannot create children for a parent that is in a different thread.
(Parent is Server(0x61db2e0), parent's thread is ServerThread(0x22fdb0), current thread is QThread(0x61d08a0)
TCP сервер запущен

При завершении программы в висте в консоль еще много чего лезет потом крэшется. В линухе нормально завершается.

В принципе проблема то ясна. Слот принадлежит классу  ServerThread, экземпляр класса создается в основном потоке, вызывается тоже из основного потока, кстати, пробовал как прямой вызов слота, таки и через сигналы с двумя типа коннектов, результат тот же. Сообщение “ QObject: Cannot create children for a parent that is in a different thread. ” вываливается из-за того что в методе QTcpServer::listen() создается сокет и ему передается в качестве родителя сервер, сервер создан в потоке ServerThread а сокет получается, создается в основном вот и ошибка.

Как вариант решения можно в слоте ServerThread::startServer() не напрямую обращатся к объекту server, а эмитить сигнал, связанный (Qt::DirectConnection) со слотом server, но такая схема мне не очень нравится, получается много лишнего.

В общем, то и вопрос как лучше организовать такую схему?




Название: Re: Работа QTcpServer в QThread
Отправлено: alex12 от Апрель 29, 2009, 08:23
Сервер Modbus TCP. Я сделал так: Отдельный поток для обработки сетевых соединений. Но один на все соединения, а не собственный на каждое. Очередь получается автоматически при обработке сигналов от сокетов. Для хранения информации о состоянии конкретного соединения делается QMap< QSocket*, ... >.

Оказалось нужно обязательно использовать волшебную функция QObject::moveToThread( QThread* ).

Код привожу весь, поэтому много букв...

Код:
#ifndef __SERVER_H__
#define __SERVER_H__

#include <QObject>
#include <QThread>
#include <QByteArray>

class AbstractSerialPort;
class QTcpServer;
class QTcpSocket;


//=============================================================================
//
//=============================================================================
typedef struct ModbusTcpServerThreadItem
{
  QByteArray req,ans;
};

//=============================================================================
//
//=============================================================================
class ModbusTcpServerThread : public QThread
{
  friend class ModbusTcpServer;
  Q_OBJECT
protected:
  void run();
public slots:
  void newConnection();
  void readyRead();
  void disconnected();
private:
  AbstractSerialPort *sp;
  int mb_tcp_port;
  QTcpServer *server;
  QMap< QTcpSocket*, ModbusTcpServerThreadItem  > map;
};

//=============================================================================
//
//=============================================================================
class ModbusTcpServer : public QObject
{
  Q_OBJECT
public:
  ModbusTcpServer( QObject *parent = 0, AbstractSerialPort *sp = 0, int mb_tcp_port=502 );
  ~ModbusTcpServer();

private:
  ModbusTcpServerThread *sp_thread;

  AbstractSerialPort *sp;
  int mb_tcp_port;
};

#endif

Код:
#include <QtCore>
#include <QtNetwork>

#include "server.h"
#include "abstractserialport.h"
#include "crc.h"

#include "console.h"

//==============================================================================
// массив ==> строка шестнадцатеричных чисел
//==============================================================================
static QString QByteArray2QString( const QByteArray &ba, int mode = 1 )
{
int i, len = ba.size();
const char* p = ba.data();
QString s;
QString str;
if( mode == 0 )
{ str = QString("(length=%1) ").arg( ba.size(),4 );
}

for( i=0; i<len; i++ )
{ s.sprintf(" %2.2X", *(unsigned char*)(p+i) );
str = str + s;
}
return str;
}


//==============================================================================
//
//==============================================================================
void emulator( const QByteArray &req, QByteArray &ans )
{
  static unsigned char base[100000];

  ans.resize( 300 );

  int addr = ( (unsigned char)req[3] << 8 ) | (unsigned char)req[4];
  int len  = (unsigned char)req[5];
  int ans_len = 0;
  int i;

  ans[0] = req[0];
  ans[1] = req[1];
  ans[2] = req[2];
  ans[3] = req[3];
  ans[4] = req[4];

  switch( req[2] & 0x0F )
  {
    case( 0x00 ): // read
      ans[3] = len;
      for(i=0; i<len; i++) ans[4+i] = base[addr+i];
      ans_len = len + 6;
     break;
    case( 0x01 ): // set
      ans[5] = len;
      for(i=0; i<len; i++) base[addr+i] = req[6+i];
      ans_len = len + 8;
      break;
    case( 0x03 ): // and
      ans[5] = len;
      for(i=0; i<len; i++) base[addr+i] &= req[6+i];
      ans_len = len + 8;
      break;
    case( 0x05 ): // or
      ans[5] = len;
      for(i=0; i<len; i++) base[addr+i] |= req[6+i];
      ans_len = len + 8;
      break;
    case( 0x07 ): // xor
      ans[5] = len;
      for(i=0; i<len; i++) base[addr+i] ^= req[6+i];
      ans_len = len + 8;
    break;
  }

  if( ans_len > 2)
  { ans.resize( ans_len - 2 );
    CRC::appendCRC16( ans );
  }

  ans.resize( ans_len );
}

//#############################################################################
//
//#############################################################################
void ModbusTcpServerThread::run()
{
  bool ok;

  server = new QTcpServer(this);
  ok = server->listen( QHostAddress::Any, mb_tcp_port );

  QString message = ok ? QString("Server started on port %1.\n").arg(mb_tcp_port)
                       : QString("Unable to listen port %1.\n").arg(mb_tcp_port);
  Console::Print( Console::Information, message );

  connect( server, SIGNAL( newConnection() ), this, SLOT( newConnection() ) );
  exec();

  delete server;
}

//=============================================================================
//
//=============================================================================
void ModbusTcpServerThread::newConnection()
{
  QTcpSocket *socket = server->nextPendingConnection();
  if( !socket ) return;
  Console::Print( Console::Information, QString("Connected from %1\n")
      .arg( socket->peerAddress().toString() ) );
  map.insert( socket, ModbusTcpServerThreadItem() );
  connect( socket, SIGNAL( readyRead() ), this, SLOT( readyRead() ) );
  connect( socket, SIGNAL( disconnected() ), this, SLOT( disconnected() ) );
}

//=============================================================================
//
//=============================================================================
void ModbusTcpServerThread::readyRead()
{
  Console::setMessageTypes( Console::AllTypes );

  QTcpSocket *socket = (QTcpSocket*) qobject_cast<QTcpSocket*>( sender() );
  if( !socket ) return;

  int j;

  ModbusTcpServerThreadItem &item = map[socket];
  QByteArray &tcp_req = item.req;
  QByteArray &tcp_ans = item.ans;


  tcp_req.append( socket->readAll() );

  //Console::Print( "###:" + QByteArray2QString( req ) + "\n" );

  if( tcp_req.size() < 7 ) return;

  if( tcp_req.size() > 300 || tcp_req[2] || tcp_req[3] || tcp_req[4] )
  { map.remove( socket );
    socket->deleteLater();
    return;
  }

  int len = (unsigned char)tcp_req[5];

  if( tcp_req.size() == len + 6 )
  {
    Console::Print( Console::ModbusPacket, "TCP Req:" + QByteArray2QString( tcp_req ) + "\n" );

    QByteArray mb_ans;
    QByteArray mb_req = tcp_req.mid(6,len);
    CRC::appendCRC16( mb_req );
    Console::Print( Console::ModbusPacket, "MB  Req:" + QByteArray2QString( mb_req ) + "\n" );

    emulator( mb_req, mb_ans );

    Console::Print( Console::ModbusPacket, "MB  Ans:" + QByteArray2QString( mb_ans ) + "\n" );
    mb_ans.chop(2);

    tcp_ans.resize(0);
    tcp_ans.append( (char) 0 );
    tcp_ans.append( (char) 0 );
    tcp_ans.append( (char) 0 );
    tcp_ans.append( (char) 0 );
    tcp_ans.append( (char) 0 );
    tcp_ans.append( (char) mb_ans.size() );
    tcp_ans.append( mb_ans );

    Console::Print( Console::ModbusPacket, "TCP Ans:" + QByteArray2QString( tcp_ans ) + "\n" );

    int j = socket->write( tcp_ans );
    if( j != tcp_ans.size() )
    { Console::Print(  Console::Error, "!:\n" );
    }
    socket->flush();

    tcp_req.resize(0);

  } else
  { Console::Print( Console::ModbusPacket, "################3\n" );
    delete socket;
    return;
  }
}

//=============================================================================
//
//=============================================================================
void ModbusTcpServerThread::disconnected()
{
  QTcpSocket *socket = (QTcpSocket*)qobject_cast<QTcpSocket*>( sender() );
  if( !socket ) return;

  Console::Print(  Console::Information, QString("Disconnected %1\n")
  .arg( socket->peerAddress().toString() ) );

  map.remove( socket );
  socket->deleteLater();
}

//#############################################################################
//
//#############################################################################
ModbusTcpServer::ModbusTcpServer( QObject *parent, AbstractSerialPort *sp,  int mb_tcp_port )
  : QObject( parent ), sp( sp ), mb_tcp_port( mb_tcp_port )
{
  sp_thread = new ModbusTcpServerThread;

  sp_thread->sp = sp;
  sp_thread->mb_tcp_port = mb_tcp_port;
  sp_thread->moveToThread( sp_thread );
  sp_thread->start();
}

//=============================================================================
//
//=============================================================================
ModbusTcpServer::~ModbusTcpServer()
{
  sp_thread->exit();
  bool ok = sp_thread->wait(5000);

  if( !ok )
  { sp_thread->terminate();
    sp_thread->wait(5000);
  }
  Console::Print( Console::Information, QString("Server stoped on port %1.\n").arg(mb_tcp_port) );
  delete sp_thread;
}


Название: Re: Работа QTcpServer в QThread
Отправлено: pastor от Апрель 29, 2009, 10:29
Можно убрать вовсе метод startServer и писать все в run, примерно так:

Код
C++ (Qt)
void ServerThread::run()
{
   qDebug() << "Начало работы серверного потока TID = "
           << QThread::currentThreadId();
 
   Server server;
   //тут сожно создать коннекты сигналов от сервера со слотами-обратотчиками этих сигналов
 
   qDebug() << "Запуск TCP сервера TID = " << QThread::currentThreadId();
 
   if (!server.listen(QHostAddress::Any, 12345)) {
       qDebug() << "Ошибка: " << server.errorString();
       server.close();
       return;
   }
 
   qDebug() << "TCP сервер запущен TID = " << QThread::currentThreadId();
 
   exec();
 
   qDebug() << "Завершение работы серверного потока TID = "
           << QThread::currentThreadId();
}


Название: Re: Работа QTcpServer в QThread
Отправлено: sLiva от Апрель 29, 2009, 17:32
Сервер Modbus TCP. Я сделал так: Отдельный поток для обработки сетевых соединений. Но один на все соединения, а не собственный на каждое. Очередь получается автоматически при обработке сигналов от сокетов. Для хранения информации о состоянии конкретного соединения делается QMap< QSocket*, ... >.

Оказалось нужно обязательно использовать волшебную функция QObject::moveToThread( QThread* ).

Код привожу весь, поэтому много букв...

Спасибо как вариант, но управлять (запускать, останавливать, менять настройки...) сервером тут тоже нельзя, вижу метод listen() находится в run()

Можно убрать вовсе метод startServer и писать все в run, примерно так:


Если сделать так, то управлять сервером извне будет не возможно, помимо метода startServer будут еще методы остановки, перезапуска, смены настроек и т.п. Как раз и хотелось бы чтобы была возможность управления объектом сервера из главного потока (GUI), а возможно и из других



Название: Re: Работа QTcpServer в QThread
Отправлено: Ruzzz от Август 12, 2009, 22:07
alex12, в QTcpServer уже реализована очередь, смотрите и добавление в нее делается в  void QTcpServer::incomingConnection ( int socketDescriptor ) [virtual protected], которую мы можем переопределить и реализовать свою :) или например создавать потоки (это есть в примере Threaded Fortune Server). Чтобы не мудрить со своими очередями, достаточно просто ловить сигнал newConnection() (как в примере Fortune Server) от QTcpServer, который посылается из incomingConnection если мы ее не переопределили. Вроде так, надеюсь чушь не спорол :) Ну самому интересна сейчас эта темы, изучаю.

Тоже есть вопросы:

В хелпе к QTcpServer там где Detailed Description, есть такое «Call nextPendingConnection() to accept the pending connection as a connected QTcpSocket. The function returns a pointer to a QTcpSocket in QAbstractSocket::ConnectedState that you can use for communicating with the client.» Так вот не пойму последнее предложение «… to a QTcpSocket in QAbstractSocket::ConnectedState that …» — Может это ошибка и они хотели написать конкретное состояние? :)

Еще не понимаю вот что, часто используют connect(clientConnection, SIGNAL(disconnected()), clientConnection, SLOT(deleteLater())); так может просто давать возможность указывать при создании, или после set'ом каким-нибудь? ну это к разработчикам =)

Вообщем как я понял используя tcpServer->listen() мы запускаем цикл и это операция не блокирующая, то есть используются скорее всего потоки, покрайней мере один на все соединения. Далее вызывается incomingConnection, и если мы ее не переопределили, то в ней идет работа с очередью и посылается сигнал newConnection — вообщем все это не блокирующие :)

Единственная блокирующая функция это waitForNewConnection, для создания своей очереди. А так за нас многое уже сделано. Остается только решать задачи с присоединившимися сокетами. Вот еще что хотелось бы для себя определить. Что если не создавать потоки для каждого входящего, то как это отразится? Интересно как реализована очередь, с помощью потоков? Понимаю что можна в исходниках это найти, но я там пока путаюсь :)


Название: Re: Работа QTcpServer в QThread
Отправлено: Ruzzz от Август 12, 2009, 22:51
Еще вот это интересно void QTcpServerPrivate::readNotification()