diff --git a/reporting/CMakeLists.txt b/reporting/CMakeLists.txt index 46b9d93b..8ce1de54 100644 --- a/reporting/CMakeLists.txt +++ b/reporting/CMakeLists.txt @@ -10,8 +10,8 @@ IF ( BUILD_REPORTING ) SET( HPPS ConsoleReporting.hpp FileReporting.hpp NiceHeaderMarshaller.hpp ReportingComponent.hpp TableMarshaller.hpp) # Reporting to a socket - SET( SOCKET_SRCS command.cpp datasender.cpp socket.cpp socketmarshaller.cpp TcpReporting.cpp) - SET( SOCKET_HPPS command.hpp datasender.hpp socket.hpp socketmarshaller.hpp TcpReporting.hpp) + SET( SOCKET_SRCS tcpreportingsession.cpp command.cpp TcpReporting.cpp) + SET( SOCKET_HPPS tcpreportingsession.hpp command.hpp TcpReporting.hpp) if(NOT OROCOS_TARGET STREQUAL "win32") set(SRCS ${SRCS} ${SOCKET_SRCS}) diff --git a/reporting/ReportingComponent.cpp b/reporting/ReportingComponent.cpp index 83fefec2..fddda0b6 100644 --- a/reporting/ReportingComponent.cpp +++ b/reporting/ReportingComponent.cpp @@ -215,6 +215,10 @@ namespace OCL bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM) { + // If we were to allow this, we would provoce double deallocation upon destruction of the pair + if(headerM == bodyM) + return false; + boost::shared_ptr header(headerM); boost::shared_ptr body(bodyM); if ( !header && !body) diff --git a/reporting/TcpReporting.cpp b/reporting/TcpReporting.cpp index 26e9f230..ba1533f8 100644 --- a/reporting/TcpReporting.cpp +++ b/reporting/TcpReporting.cpp @@ -25,230 +25,102 @@ * * ***************************************************************************/ -#include -#include -#include -#include - #include "TcpReporting.hpp" #include #include -#include -#include "socket.hpp" -#include "socketmarshaller.hpp" - -using RTT::Logger; -using RTT::os::Mutex; +#include +#include +#include +#include "EmptyMarshaller.hpp" -#include "ocl/Component.hpp" ORO_LIST_COMPONENT_TYPE(OCL::TcpReporting); -namespace OCL -{ - /** - * ListenThread is a thread which waits for new incoming connections - * from clients. - */ - class ListenThread - : public RTT::Activity - { - private: - bool inBreak; - static ListenThread* _instance; - RTT::SocketMarshaller* _marshaller; - unsigned short _port; - bool _accepting; - int _sock; - - bool listen() - { - _sock = ::socket(PF_INET, SOCK_STREAM, 0); - if( _sock < 0 ) - { - Logger::log() << Logger::Error << "Socket creation failed." << Logger::endl; - return false; - } - - struct sockaddr_in localsocket; - struct sockaddr remote; - int adrlen = sizeof(remote); - - localsocket.sin_family = AF_INET; - localsocket.sin_port = htons(_port); - localsocket.sin_addr.s_addr = INADDR_ANY; - if( ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) ) < 0 ) - { - /* bind can fail when there is a legitimate server when a - previous run of orocos has crashed and the kernel does - not have freed the port yet. TRY_OTHER_PORTS can - select another port if the bind fails. */ - #define TRY_OTHER_PORTS - // TODO: remove #define - #ifdef TRY_OTHER_PORTS - int i = 1; - int r = -1; - while( errno == EADDRINUSE && i < 5 && r < 0 ) - { - localsocket.sin_port = htons(_port + i); - r = ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) ); - i++; - } - if( r >= 0 ) - { - Logger::log() << Logger::Info << "Port occupied, use port " << (_port+i-1) << " instead." << Logger::endl; - } else { - #endif - if( errno == EADDRINUSE ) - { - Logger::log() << Logger::Error << "Binding of port failed: address already in use." << Logger::endl; - } else { - Logger::log() << Logger::Error << "Binding of port failed with errno " << errno << Logger::endl; - } - ::close(_sock); - return false; - #ifdef TRY_OTHER_PORTS - } - #endif - } - - if( ::listen(_sock, 2) < 0 ) - { - Logger::log() << Logger::Info << "Cannot listen on socket" << Logger::endl; - ::close(_sock); - return true; - } - while(_accepting) - { - int socket = ::accept( _sock, &remote, - reinterpret_cast(&adrlen) ); - if( socket == -1 ) - { - return false; - } - if( _accepting ) - { - Logger::log() << Logger::Info << "Incoming connection" << Logger::endl; - _marshaller->addConnection( new Orocos::TCP::Socket(socket) ); - } - } - return true; - } - - ListenThread( RTT::SocketMarshaller* marshaller, unsigned short port ) - : Activity(10), _marshaller(marshaller) - { - inBreak = false; - removeInstance(); - _accepting = true; - _port = port; - Logger::log() << Logger::Info << "Starting server on port " << port << Logger::endl; - this->Activity::start(); - } - - // This method should only be called when theadCreationLock is locked. - void removeInstance() - { - if( _instance ) - { - delete _instance; - } - } - - public: - ~ListenThread() - { - _accepting = false; - } - - virtual void loop() - { - if( !inBreak ) - { - if( !listen() ) - { - Logger::log() << Logger::Error << "Could not listen on port " << _port << Logger::endl; - } else { - Logger::log() << Logger::Info << "Shutting down server" << Logger::endl; - } - } - } - - virtual bool breakLoop() - { - inBreak = true; - _accepting = false; - ::close( _sock ); - // accept still hangs until a new connection has been established - int sock = ::socket(PF_INET, SOCK_STREAM, 0); - if( sock > 0 ) - { - struct sockaddr_in socket; - socket.sin_family = AF_INET; - socket.sin_port = htons(_port); - socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - ::connect( sock, (struct sockaddr*)&socket, sizeof(socket) ); - ::close( sock ); - } - return true; - } - - static void createInstance( RTT::SocketMarshaller* marshaller, unsigned short port = 3142 ) - { - // The lock is needed to avoid problems when createInstance is called by two - // different threads (which in reality should not occur very often). - //ListenThread* _oinst = ListenThread::_instance; - ListenThread::_instance = new ListenThread( marshaller, port ); - //delete _oinst; - } - - static void destroyInstance() - { - ListenThread::_instance->breakLoop(); - } - }; - ListenThread* ListenThread::_instance = 0; +namespace OCL { + +void TcpReporting::registerAccept() { + pending_socket = new tcp::socket(io_service); + acceptor.async_accept(*pending_socket, + boost::bind(&TcpReporting::handleAccept, this, + asio::placeholders::error)); +} + +void TcpReporting::handleAccept(const asio::error_code& error) { + if (error) { + delete pending_socket; + stop(); + } else { + TcpReportingSession* session = new TcpReportingSession(pending_socket); + pending_socket = NULL; + addMarshaller(0, session); + sessions.push_back(session); + registerAccept(); + } +} + +TcpReporting::TcpReporting(std::string fr_name /*= "Reporting"*/) + : ReportingComponent(fr_name), acceptor(io_service), port(3142) { + addProperty("port", port); } -namespace OCL -{ - TcpReporting::TcpReporting(std::string fr_name /*= "Reporting"*/) - : ReportingComponent( fr_name ), - port_prop("port","port to listen/send to",3142) - { - _finishing = false; - this->properties()->addProperty( port_prop); - } - - TcpReporting::~TcpReporting() - { - } - - const RTT::PropertyBag* TcpReporting::getReport() - { - makeReport2(); - return &report; - } - - bool TcpReporting::configureHook(){ - port=port_prop.value(); - return true; - } - - bool TcpReporting::startHook() - { - RTT::Logger::In in("TcpReporting::startup"); - fbody = new RTT::SocketMarshaller(this); - this->addMarshaller( 0, fbody ); - ListenThread::createInstance( fbody, port ); - return ReportingComponent::startHook(); - } - - void TcpReporting::stopHook() - { - _finishing = true; - ListenThread::destroyInstance(); - fbody->shutdown(); - ReportingComponent::stopHook(); - this->removeMarshallers(); - } +TcpReporting::~TcpReporting() {} + +bool TcpReporting::startHook() { + addMarshaller(0, new RTT::EmptyMarshaller()); + tcp::endpoint endpoint(tcp::v4(), port); + asio::error_code error; + + acceptor.open(endpoint.protocol(), error); + if (error) { + RTT::log(RTT::Error) << "failed to open acceptor: " << error.message() + << RTT::endlog(); + return false; + } + + acceptor.set_option(tcp::acceptor::reuse_address(true)); + + acceptor.bind(endpoint, error); + if (error) { + RTT::log(RTT::Error) << "failed to bind acceptor: " << error.message() + << RTT::endlog(); + return false; + } + + acceptor.listen(asio::socket_base::max_connections, error); + if (error) { + RTT::log(RTT::Error) << "failed to listen acceptor: " << error.message() + << RTT::endlog(); + return false; + } + registerAccept(); + + return ReportingComponent::startHook(); +} + +void TcpReporting::stopHook() { + ReportingComponent::stopHook(); // This flushes all connections + io_service.poll(); + + // Properly terminate the sessions + for (size_t i = 0; i < sessions.size(); i++) { + sessions[i]->terminate(); + } + io_service.poll(); // we need this to invoke the session terminations + + removeMarshallers(); + + acceptor.close(); + io_service.poll(); + io_service.reset(); +} + +void TcpReporting::updateHook() { + asio::error_code error; + io_service.poll(error); + if (error) { + RTT::log(RTT::Error) << "error in update hook: " << error.message() + << RTT::endlog(); + stop(); + } + ReportingComponent::updateHook(); +} } diff --git a/reporting/TcpReporting.hpp b/reporting/TcpReporting.hpp index 1532ce1b..04d32d49 100644 --- a/reporting/TcpReporting.hpp +++ b/reporting/TcpReporting.hpp @@ -27,185 +27,168 @@ #ifndef ORO_COMP_TCP_REPORTING_HPP #define ORO_COMP_TCP_REPORTING_HPP + #include "ReportingComponent.hpp" +#include "tcpreportingsession.hpp" #include - -namespace RTT -{ - class SocketMarshaller; -} - -namespace OCL -{ - namespace TCP - { - class TcpReportingInterpreter; - class Socket; - } - - - /** - \brief A component which writes data reports to a tcp/ip - socket. It can serve different clients. It uses a ASCI-based - protocol. - - \section usage Usage - \subsection authent Authentication of the client: - The server accepts different kinds of commands. Before these - commands are available for the client, the client has to - authenticate itself. - - \b Send: - \verbatim "VERSION 1.0\n" \endverbatim - - \b Receive: - - if succesfull: \verbatim "101 OK\n" \endverbatim - - if failed: \verbatim "106 not supported\n" \endverbatim - - \subsection help Getting Help: - The client can get the available commands - - \b Send: - \verbatim "HELP\n" \endverbatim - - \b Receive: - \verbatim - "Use HELP \n - CommandName1\n - CommandName2\n - ... - CommandNameN\n." - \endverbatim - . - and the syntax for each command. - - \b Send: - \verbatim"HELP CommandName\n" \endverbatim - - \b Receive: - \verbatim - "Name: CommandName\n - Usage: CommandName CommandSyntax\n" - \endverbatim - - \subsection headers Getting a list of available data: - The client can get the names of all the available data. - - \b Send: - \verbatim "HEADERS\n" \endverbatim - - \b Receive: - \verbatim - "305 DataName1\n - 305 DataName2\n - ... - 305 DataNameN\n - 306 End of list\n" - \endverbatim - - \subsection subscribe Subscribe to Data: - The client has to send the server the names of the available - data he wants to get. Only the subscribed data will be send to - the client. - - \b Send: - \verbatim "SUBSCRIBE DataNameX\n" \endverbatim - - \b Receive: - - if succesfull: \verbatim "302 DataNameX\n" \endverbatim - - if failed: \verbatim "301 DataNameX\n" \endverbatim - - \subsection unsubscribe Unsubscribe to Data: - The client can cancel a subscription. - - \b Send: - \verbatim "UNSUBSCRIBE DataNameX\n" \endverbatim - - \b Receive: - - if succesfull: \verbatim "303 DataNameX\n" \endverbatim - - if failed: \verbatim "304 DataNameX\n" \endverbatim - - \subsection subs Getting a list of the subscriptions: - The client can ask for the subscriptions he has made. - - \b Send: - \verbatim "SUBS\n" \endverbatim - - \b Receive: - \verbatim - "305 DataNameX1\n - 305 DataNameX2\n - ... - 305 DataNameXN\n - 306 End of list\n" - \endverbatim - - \subsection silence Start/stop streaming the data: - The client can start and stop the streaming of the subscribed - data. - - \b Send: - -To start: \verbatim "SILENCE OFF\n" \endverbatim - -To stop: \verbatim "SILENCE ON\n" \endverbatim - - \b Receive: - \verbatim "107 SILENCE ON/OFF\n" \endverbatim - - \subsection quit Close the connection with the server: - The client can close the connection with the server. - - \b Send: - \verbatim "QUIT\n" \endverbatim - or - \verbatim "EXIT\n" \endverbatim - - \b Receive: - \verbatim "104 Bye Bye\n" \endverbatim - - \subsection error When an error occures: - When an error occurs because of wrong syntax the server will - answer with an error message. - - \b Send: wrong command syntax - - \b Receive: - \verbatim "102 Syntax: CommandNameX CommandSyntaxX\n" \endverbatim - - \subsection streaming The streaming data: - When the streaming is started the server will send the - following message at each timeframe. - - \b Receive: - \verbatim - "201 framenr --- begin of frame\n - 202 DataNameX1\n - 205 DataValueX1\n - 202 DataNameX2\n - 205 DataValueX2\n - ... - 202 DataNameXN\n - 205 DataValueXN\n - 203 framenr --- end of frame\n" - \endverbatim - */ - class TcpReporting - : public ReportingComponent - { - private: - /** - * Flag to indicate that shutdown() has been called and - * changes in the list of reported data should not be - * reported to the client anymore. - */ - bool _finishing; - unsigned int port; - RTT::Property port_prop; - protected: - /** - * marsh::MarshallInterface - */ - RTT::SocketMarshaller* fbody; - - public: - /** - * Create a reporting component which starts up a server - * - * @param fr_name Name of the TCP reporting component. - * @param port Port to listen on. - */ - TcpReporting(std::string fr_name = "ReportingComponent"); - ~TcpReporting(); - - bool configureHook(); - bool startHook(); - - void stopHook(); - - /** - * Return a property bag. - */ - const RTT::PropertyBag* getReport(); - }; - +#include +#include +#include + +/** + \brief A component which writes data reports to a tcp/ip + socket. It can serve different clients. It uses a ASCI-based + protocol. + + \section usage Usage + \subsection authent Authentication of the client: + The server accepts different kinds of commands. Before these + commands are available for the client, the client has to + authenticate itself. + - \b Send: + \verbatim "VERSION 1.0\n" \endverbatim + - \b Receive: + - if succesfull: \verbatim "101 OK\n" \endverbatim + - if failed: \verbatim "106 not supported\n" \endverbatim + + \subsection help Getting Help: + The client can get the available commands + - \b Send: + \verbatim "HELP\n" \endverbatim + - \b Receive: + \verbatim + "Use HELP \n + CommandName1\n + CommandName2\n + ... + CommandNameN\n." + \endverbatim + . + and the syntax for each command. + - \b Send: + \verbatim"HELP CommandName\n" \endverbatim + - \b Receive: + \verbatim + "Name: CommandName\n + Usage: CommandName CommandSyntax\n" + \endverbatim + + \subsection headers Getting a list of available data: + The client can get the names of all the available data. + - \b Send: + \verbatim "HEADERS\n" \endverbatim + - \b Receive: + \verbatim + "305 DataName1\n + 305 DataName2\n + ... + 305 DataNameN\n + 306 End of list\n" + \endverbatim + + \subsection subscribe Subscribe to Data: + The client has to send the server the names of the available + data he wants to get. Only the subscribed data will be send to + the client. + - \b Send: + \verbatim "SUBSCRIBE DataNameX\n" \endverbatim + - \b Receive: + - if succesfull: \verbatim "302 DataNameX\n" \endverbatim + - if failed: \verbatim "301 DataNameX\n" \endverbatim + + \subsection unsubscribe Unsubscribe to Data: + The client can cancel a subscription. + - \b Send: + \verbatim "UNSUBSCRIBE DataNameX\n" \endverbatim + - \b Receive: + - if succesfull: \verbatim "303 DataNameX\n" \endverbatim + - if failed: \verbatim "304 DataNameX\n" \endverbatim + + \subsection subs Getting a list of the subscriptions: + The client can ask for the subscriptions he has made. + - \b Send: + \verbatim "SUBS\n" \endverbatim + - \b Receive: + \verbatim + "305 DataNameX1\n + 305 DataNameX2\n + ... + 305 DataNameXN\n + 306 End of list\n" + \endverbatim + + \subsection silence Start/stop streaming the data: + The client can start and stop the streaming of the subscribed + data. + - \b Send: + -To start: \verbatim "SILENCE OFF\n" \endverbatim + -To stop: \verbatim "SILENCE ON\n" \endverbatim + - \b Receive: + \verbatim "107 SILENCE ON/OFF\n" \endverbatim + + \subsection quit Close the connection with the server: + The client can close the connection with the server. + - \b Send: + \verbatim "QUIT\n" \endverbatim + or + \verbatim "EXIT\n" \endverbatim + - \b Receive: + \verbatim "104 Bye Bye\n" \endverbatim + + \subsection error When an error occures: + When an error occurs because of wrong syntax the server will + answer with an error message. + - \b Send: wrong command syntax + - \b Receive: + \verbatim "102 Syntax: CommandNameX CommandSyntaxX\n" \endverbatim + + \subsection streaming The streaming data: + When the streaming is started the server will send the + following message at each timeframe. + - \b Receive: + \verbatim + "201 framenr --- begin of frame\n + 202 DataNameX1\n + 205 DataValueX1\n + 202 DataNameX2\n + 205 DataValueX2\n + ... + 202 DataNameXN\n + 205 DataValueXN\n + 203 framenr --- end of frame\n" + \endverbatim + */ +namespace OCL { + +using asio::ip::tcp; + +class TcpReporting : public ReportingComponent { + private: + unsigned int port; + asio::io_service io_service; + tcp::acceptor acceptor; + std::vector sessions; + tcp::socket* pending_socket; + + void registerAccept(); + void handleAccept(const asio::error_code& error); + + public: + /** + * Create a reporting component which starts up a server + * + * @param fr_name Name of the TCP reporting component. + * @param port Port to listen on. + */ + TcpReporting(std::string fr_name = "ReportingComponent"); + ~TcpReporting(); + + bool startHook(); + void stopHook(); + void updateHook(); + + +}; } #endif diff --git a/reporting/command.cpp b/reporting/command.cpp index b6b909e0..76bf3263 100644 --- a/reporting/command.cpp +++ b/reporting/command.cpp @@ -24,655 +24,391 @@ * * ***************************************************************************/ -#include /* std::transform is needed for upper-case conversion */ #include #include /* strtoull */ #include #include #include +#include #include "NiceHeaderMarshaller.hpp" #include "TcpReporting.hpp" #include "command.hpp" -#include "socket.hpp" -#include "datasender.hpp" -#include "socketmarshaller.hpp" - -using OCL::TCP::RealCommand; -using OCL::TCP::Socket; -using OCL::TCP::TcpReportingInterpreter; - -namespace -{ - /** - * base::Buffer to prefix each line with '300 ' - */ - class prefixbuf : public std::streambuf - { - private: - std::streambuf* _opt; - bool newline; - - public: - prefixbuf( std::streambuf* opt ) : _opt(opt) - { - setp(0, 0); - setg(0, 0, 0); - newline = true; - } - - ~prefixbuf() - { - sync(); - } - - protected: - int overflow(int c) - { - if( c != EOF ) - { - if( newline ) - { - if( _opt->sputn("300 ", 4) != 4 ) - { - return EOF; - } - newline = false; - } - int ret = _opt->sputc(c); - newline = ( c == '\n' ); - return ret; - } - return 0; - } - - int sync() - { - return _opt->pubsync(); - } - }; - - /** - * Prefix output stream. - */ - class prefixstr : public std::ostream - { - public: - prefixstr(std::ostream& opt) - : std::ostream( new prefixbuf( opt.rdbuf() ) ) - { - } - - ~prefixstr() - { - delete rdbuf(); - } - }; - - /* std::toupper is implemented as a macro and cannot be used by - std::transform without a wrapper function. */ - char to_upper (const char c) - { - return toupper(c); + +namespace OCL { + +/** + * Output a XML file (prefixed with '300 ') containing the header to the socket. + */ +class HeaderCommand : public Command { + protected: + void maincode(int, std::string*) { + if (_parent->getSession()->lastSerializedPropertyBag) { + std::vector available_items = + _parent->getSession()->lastSerializedPropertyBag->list(); + for (unsigned int i = 0; i < available_items.size(); i++) { + ostream() << "305 " << available_items[i] << std::endl; + } + ostream() << "306 End of list" << std::endl; } + } - /** - * Output a XML file (prefixed with '300 ') containing the header to the socket. - */ - class HeaderCommand : public RealCommand - { - protected: - void maincode( int, std::string* ) - { - std::vector list = _parent->getConnection()->getMarshaller()->getReporter()->getReport()->list(); - for(unsigned int i=0;i &cmds = _parent->giveCommands(); - socket() << "Use HELP " << std::endl; - for( unsigned int i = 0; i < cmds.size(); i++ ) - { - if( cmds[i] == cmds[i]->getRealCommand( cmds ) ) - { - socket() << cmds[i]->getName() << '\n'; - } - } - socket() << '.' << std::endl; - } - - /** - * Print help for with to the socket. - */ - void printHelpFor( const std::string& name, const RealCommand* command ) - { - socket() << "Name: " << name << std::endl; - socket() << "Usage: " << name; - if( command->getSyntax() ) - { - socket() << " " << command->getSyntax(); - } - socket() << std::endl; - } - - /** - * Print help for the given command to the socket. - */ - void printHelpFor( const std::string& cmd ) - { - const std::vector &cmds = _parent->giveCommands(); - for( unsigned int i = 0; i < cmds.size(); i++ ) - { - if( cmds[i]->getName() == cmd ) - { - printHelpFor( cmd, cmds[i]->getRealCommand( cmds ) ); - return; - } - } - printCommands(); - } - - void maincode( int argc, std::string* params ) - { - if( argc == 0 ) - { - printCommands(); - } else { - printHelpFor(params[0]); - } - } - public: - HelpCommand(TcpReportingInterpreter* parent) - : RealCommand( "HELP", parent, 0, 1, "[nothing | ]" ) - { - } - }; - - class ListCommand : public RealCommand - { - protected: - void maincode( int, std::string* ) - { - socket() << "103 none" << std::endl; - } - - public: - ListCommand(TcpReportingInterpreter* parent) - : RealCommand( "LISTEXTENSIONS", parent ) - { - } - }; - - class QuitCommand : public RealCommand - { - protected: - void maincode( int, std::string* ) - { - // The main marshaller is not notified about the connection - // being closed but will detect it and delete the DataSender - // in the next serialize() run because DataSender is a - // thread and this method is (indirectly) called by - // DataSender::loop. - socket().close(); - } - - public: - QuitCommand(TcpReportingInterpreter* parent) - : RealCommand( "QUIT", parent ) - { - } - }; - - class SetLimitCommand : public RealCommand - { - protected: - void maincode( int, std::string* args ) - { - int olderr = errno; - char* tailptr; - unsigned long long limit = strtoull( args[0].c_str(), &tailptr, 10 ); - if( *tailptr != '\0' || ( errno != olderr && errno == ERANGE ) ) - { - sendError102(); - } else { - _parent->getConnection()->setLimit(limit); - sendOK(); - } - } - - public: - SetLimitCommand(TcpReportingInterpreter* parent) - : RealCommand( "SETLIMIT", parent, 1, 1, "" ) - { - } - }; - - /** - * Disable/enable output of data on the socket. - */ - class SilenceCommand : public RealCommand - { - protected: - void maincode( int, std::string* args ) - { - toupper( args, 0 ); - if( args[0] == "ON" ) - { - _parent->getConnection()->silence(true); - } else if( args[0] == "OFF") { - _parent->getConnection()->silence(false); - } else { - sendError102(); - return; - } - socket() << "107 Silence " << args[0] << std::endl; - } - - public: - SilenceCommand(TcpReportingInterpreter* parent) - : RealCommand( "SILENCE", parent, 1, 1, "[ON | OFF]" ) - { - } - }; - - /** - * Add data stream to be printed - */ - class SubscribeCommand : public RealCommand - { - // TODO: id's elimneren voor subscribe command - // TODO: id's elimneren voor unsubscribe command - // TODO: id's elimneren - protected: - void maincode( int, std::string* args ) - { - if( _parent->getConnection()->addSubscription(args[0]) ) - { - socket() << "302 " << args[0] << std::endl; - } else { - socket() << "301 " << args[0] << std::endl; - } - } - - public: - SubscribeCommand(TcpReportingInterpreter* parent) - : RealCommand( "SUBSCRIBE", parent, 1, 1, "" ) - { - } - }; - - class SubscriptionsCommand : public RealCommand - { - protected: - void maincode( int, std::string* ) - { - _parent->getConnection()->listSubscriptions(); - } - - public: - SubscriptionsCommand(TcpReportingInterpreter* parent) - : RealCommand( "SUBS", parent ) - { - } - }; - - class UnsubscribeCommand : public RealCommand - { - protected: - void maincode( int, std::string* args ) - { - if( _parent->getConnection()->removeSubscription(args[0]) ) - { - socket() << "303 " << args[0] << std::endl; - } else { - socket() << "304 " << args[0] << std::endl; - } - } - - public: - UnsubscribeCommand(TcpReportingInterpreter* parent) - : RealCommand( "UNSUBSCRIBE", parent, 1, 1, "" ) - { - } - }; - - class VersionCommand : public RealCommand - { - protected: - void maincode( int, std::string* args ) - { - if( args[0] == "1.0" ) - { - _parent->setVersion10(); - sendOK(); - } else { - socket() << "106 Not supported" << std::endl; - } - } - - public: - VersionCommand(TcpReportingInterpreter* parent) - : RealCommand( "VERSION", parent, 1, 1, "1.0" ) - { - } - }; -} + public: + HeaderCommand(TcpReportingInterpreter* parent) : Command("HEADERS", parent) {} -namespace OCL -{ -namespace TCP -{ - /* - * The default Orocos Command objects are not used since we - * do not use Data Sources here. - */ - class RealCommand; - - Command::Command( std::string name ) - : _name( name ) - { - } + ~HeaderCommand() {} - Command::~Command() - { - } + void manualExecute() { maincode(0, 0); } +}; - Command* Command::find(const std::vector& cmds, const std::string& cmp) - { - for( unsigned int j = 0; j < cmds.size(); j++ ) - { - if( *cmds[j] == cmp ) - { - return cmds[j]; - } - } - return 0; +class HelpCommand : public Command { + protected: + /** + * Print list of available commands to the socket. + */ + void printCommands() { + ostream() << "Use HELP " << std::endl; + for (unsigned int i = 0; i < _parent->cmds.size(); i++) { + ostream() << _parent->cmds[i]->getName() << '\n'; } - - const std::string& Command::getName() const - { - return _name; + ostream() << '.' << std::endl; + } + + /** + * Print help for with to the socket. + */ + void printHelpFor(const std::string& name, Command* command) { + ostream() << "Name: " << name << std::endl; + ostream() << "Usage: " << name; + ostream() << " " << command->getSyntax(); + ostream() << std::endl; + } + + /** + * Print help for the given command to the socket. + */ + void printHelpFor(const std::string& cmd) { + const std::vector& cmds = _parent->giveCommands(); + for (unsigned int i = 0; i < cmds.size(); i++) { + if (cmds[i]->getName() == cmd) { + printHelpFor(cmd, cmds[i]); + return; + } } - - bool Command::is(std::string& cmd) const - { - return cmd == _name; + printCommands(); + } + + void maincode(int argc, std::string* params) { + if (argc == 0) { + printCommands(); + } else { + printHelpFor(params[0]); } - - bool Command::operator==(const std::string& cmp) const - { - return cmp == _name; + } + + public: + HelpCommand(TcpReportingInterpreter* parent) + : Command("HELP", parent, 0, 1, "[nothing | ]") {} +}; + +class ListCommand : public Command { + protected: + void maincode(int, std::string*) { ostream() << "103 none" << std::endl; } + + public: + ListCommand(TcpReportingInterpreter* parent) + : Command("LISTEXTENSIONS", parent) {} +}; + +class QuitCommand : public Command { + protected: + void maincode(int, std::string*) { _parent->getSession()->terminate(); } + + public: + QuitCommand(TcpReportingInterpreter* parent) : Command("QUIT", parent) {} +}; + +class SetLimitCommand : public Command { + protected: + void maincode(int, std::string* args) { + int olderr = errno; + char* tailptr; + unsigned long long limit = strtoull(args[0].c_str(), &tailptr, 10); + if (*tailptr != '\0' || (errno != olderr && errno == ERANGE)) { + sendError102(); + } else { + _parent->getSession()->frameLimit = limit; + ostream() << "101 OK" << std::endl; } - - bool Command::operator!=(const std::string& cmp) const - { - return cmp != _name; + } + + public: + SetLimitCommand(TcpReportingInterpreter* parent) + : Command("SETLIMIT", parent, 1, 1, "") {} +}; + +/** + * Disable/enable output of data on the socket. + */ +class SilenceCommand : public Command { + protected: + void maincode(int, std::string* args) { + boost::to_upper(args[0]); + if (args[0] == "ON") { + _parent->getSession()->silenced = true; + } else if (args[0] == "OFF") { + _parent->getSession()->silenced = false; + } else { + sendError102(); + return; } - - bool Command::operator<( const Command& cmp ) const - { - return _name < cmp.getName(); + ostream() << "107 Silence " << args[0] << std::endl; + } + + public: + SilenceCommand(TcpReportingInterpreter* parent) + : Command("SILENCE", parent, 1, 1, "[ON | OFF]") {} +}; + +/** + * Add data stream to be printed + */ +class SubscribeCommand : public Command { + protected: + void maincode(int, std::string* args) { + if (_parent->getSession()->addSubscription(args[0])) { + ostream() << "302 " << args[0] << std::endl; + } else { + ostream() << "301 " << args[0] << std::endl; } - - AliasCommand::AliasCommand( std::string name, std::string alias ) - : Command( name ), _alias( alias ) - { + } + + public: + SubscribeCommand(TcpReportingInterpreter* parent) + : Command("SUBSCRIBE", parent, 1, 1, "") {} +}; + +class SubscriptionsCommand : public Command { + protected: + void maincode(int, std::string*) { + for (boost::unordered_set::const_iterator elem = + _parent->getSession()->getSubscriptions().begin(); + elem != _parent->getSession()->getSubscriptions().end(); elem++) { + ostream() << "305 " << *elem << std::endl; } - - RealCommand* AliasCommand::getRealCommand(const std::vector& cmds) const - { - Command* ret = Command::find( cmds, _alias ); - if( !ret ) - { - return 0; - } - return ret->getRealCommand(cmds); + ostream() << "306 End of list" << std::endl; + } + + public: + SubscriptionsCommand(TcpReportingInterpreter* parent) + : Command("SUBS", parent) {} +}; + +class UnsubscribeCommand : public Command { + protected: + void maincode(int, std::string* args) { + if (_parent->getSession()->removeSubscription(args[0])) { + ostream() << "303 " << args[0] << std::endl; + } else { + ostream() << "304 " << args[0] << std::endl; } - - RealCommand::RealCommand( std::string name, TcpReportingInterpreter* parent, unsigned int minargs, - unsigned int maxargs, const char* syntax ) - : Command( name ), _parent( parent ), _minargs( minargs ), _maxargs( maxargs ), _syntax( syntax ) - { + } + + public: + UnsubscribeCommand(TcpReportingInterpreter* parent) + : Command("UNSUBSCRIBE", parent, 1, 1, "") {} +}; + +class VersionCommand : public Command { + protected: + void maincode(int, std::string* args) { + if (args[0] == "1.0") { + _parent->setVersion10(); + ostream() << "101 OK" << std::endl; + } else { + ostream() << "106 Not supported" << std::endl; } - - RealCommand::~RealCommand() - { + } + + public: + VersionCommand(TcpReportingInterpreter* parent) + : Command("VERSION", parent, 1, 1, "1.0") {} +}; + +/* + * The default Orocos Command objects are not used since we + * do not use Data Sources here. + */ +class Command; + +Command::Command(std::string name, TcpReportingInterpreter* parent) + : _name(name), _parent(parent), _minargs(0), _maxargs(0), _syntax("") {} + +Command::Command(std::string name, TcpReportingInterpreter* parent, + unsigned int minargs, unsigned int maxargs) + : _name(name), + _parent(parent), + _minargs(minargs), + _maxargs(maxargs), + _syntax("") {} + +Command::Command(std::string name, TcpReportingInterpreter* parent, + unsigned int minargs, unsigned int maxargs, std::string syntax) + : _name(name), + _parent(parent), + _minargs(minargs), + _maxargs(maxargs), + _syntax(syntax) {} + +Command* Command::find(const std::vector& cmds, + const std::string& cmp) { + for (unsigned int j = 0; j < cmds.size(); j++) { + if (*cmds[j] == cmp) { + return cmds[j]; } + } + return 0; +} - const char* RealCommand::getSyntax() const - { - return _syntax; - } +const std::string& Command::getName() const { return _name; } - bool RealCommand::sendError102() const - { - if( _syntax ) - { - socket() << "102 Syntax: " << _name << ' ' << _syntax << std::endl; - } else { - socket() << "102 Syntax: " << _name << std::endl; - } - return false; - } +bool Command::is(std::string& cmd) const { return cmd == _name; } - bool RealCommand::sendOK() const - { - socket() << "101 OK" << std::endl; - return true; - } +bool Command::operator==(const std::string& cmp) const { return cmp == _name; } - bool RealCommand::correctSyntax( unsigned int argc, std::string* ) - { - if( argc < _minargs || argc > _maxargs ) - { - return sendError102(); - } - return true; - } +bool Command::operator!=(const std::string& cmp) const { return cmp != _name; } - RealCommand* RealCommand::getRealCommand(const std::vector& cmds) const - { - return const_cast(this); - } +bool Command::operator<(const Command& cmp) const { + return _name < cmp.getName(); +} - void RealCommand::execute( int argc, std::string* args ) - { - if( correctSyntax( argc, args ) ) - { - maincode( argc, args ); - } - } +std::string& Command::getSyntax() { return _syntax; } - void RealCommand::toupper( std::string* args, int i ) const - { - std::transform( args[i].begin(), args[i].end(), args[i].begin(), to_upper ); - } +bool Command::sendError102() const { + ostream() << "102 Syntax: " << _name << ' ' << _syntax << std::endl; + _parent->getSession()->flushOstream(); + return false; +} - void RealCommand::toupper( std::string* args, int start, int stop ) const - { - for( int i = start; i <= stop; i++ ) - { - toupper( args, i ); - } - } +bool Command::correctSyntax(unsigned int argc) { + if (argc < _minargs || argc > _maxargs) { + return sendError102(); + } + return true; +} - Socket& RealCommand::socket() const - { - return _parent->getConnection()->getSocket(); - } +void Command::execute(int argc, std::string* args) { + if (correctSyntax(argc)) { + maincode(argc, args); + _parent->getSession()->flushOstream(); + } +} - TcpReportingInterpreter::TcpReportingInterpreter(Datasender* parent) - : _parent( parent ) - { - addCommand( new VersionCommand(this) ); - addCommand( new HelpCommand(this) ); - addCommand( new QuitCommand(this) ); - addCommand( new AliasCommand( "EXIT", "QUIT" ) ); - } +std::ostream& Command::ostream() const { + return _parent->getSession()->getOstream(); +} - TcpReportingInterpreter::~TcpReportingInterpreter() - { - for( std::vector::iterator i = cmds.begin(); - i != cmds.end(); - i++ ) - { - delete *i; - } - } +TcpReportingInterpreter::TcpReportingInterpreter(TcpReportingSession* parent) + : _parent(parent) { + addCommand(new VersionCommand(this)); + addCommand(new HelpCommand(this)); + addCommand(new QuitCommand(this)); +} - void TcpReportingInterpreter::addCommand( Command* command ) - { - // this method has a complexity of O(n) because we want - // the list to be sorted. - commands.lock(); - std::vector::iterator i = cmds.begin(); - while( i != cmds.end() && *command < **i ) { - i++; - } - - // avoid duplicates - if( i != cmds.end() && *command == (*i)->getName() ) - { - return; - } - cmds.insert( i, command ); - commands.unlock(); - } +TcpReportingInterpreter::~TcpReportingInterpreter() { + for (std::vector::iterator i = cmds.begin(); i != cmds.end(); i++) { + delete *i; + } +} - const std::vector& TcpReportingInterpreter::giveCommands() const - { - return cmds; - } +void TcpReportingInterpreter::addCommand(Command* command) { + // this method has a complexity of O(n) because we want + // the list to be sorted. - Datasender* TcpReportingInterpreter::getConnection() const - { - return _parent; - } + std::vector::iterator i = cmds.begin(); + while (i != cmds.end() && *command < **i) { + i++; + } - void TcpReportingInterpreter::process() - { - std::string ipt = getConnection()->getSocket().readLine(); - - if( ipt.empty() ) - { - return; - } - - /* parseParameters returns data by reference */ - std::string cmd; - std::string* params; - - unsigned int argc = parseParameters( ipt, cmd, ¶ms ); - - std::transform( cmd.begin(), cmd.end(), cmd.begin(), to_upper ); - - /* search the command to be executed */ - bool correct = false; - commands.lock(); - Command* obj = Command::find( cmds, cmd ); - if( obj ) /* command found */ - { - RealCommand* rcommand = obj->getRealCommand(cmds); - if( rcommand ) /* alias is correct */ - { - rcommand->execute( argc, params ); - correct = true; - } - } else { - Logger::log() << Logger::Error << "Invalid command: " << ipt << Logger::endl; - } - commands.unlock(); - - if( !correct ) - { - getConnection()->getSocket() << "105 Command not found" << std::endl; - } - } + // avoid duplicates + if (i != cmds.end() && *command == (*i)->getName()) { + return; + } + cmds.insert(i, command); +} - unsigned int TcpReportingInterpreter::parseParameters( - std::string& ipt, std::string& cmd, std::string** params ) - { - unsigned int argc = 0; - std::string::size_type pos = ipt.find_first_of("\t ", 0); - while( pos != std::string::npos ) - { - pos = ipt.find_first_of("\t ", pos + 1); - argc++; - } - if( argc > 0 ) - { - *params = new std::string[argc]; - pos = ipt.find_first_of("\t ", 0); - cmd = ipt.substr(0, pos); - unsigned int npos; - for( unsigned int i = 0; i < argc; i++ ) - { - npos = ipt.find_first_of("\t ", pos + 1); - (*params)[i] = ipt.substr(pos+1,npos - pos - 1); - pos = npos; - } - } else { - cmd = ipt; - *params = 0; - } - return argc; - } +const std::vector& TcpReportingInterpreter::giveCommands() const { + return cmds; +} - void TcpReportingInterpreter::removeCommand( const char* ipt ) - { - commands.lock(); - std::vector::iterator i = cmds.begin(); - while( i != cmds.end() && **i != ipt ) { - i++; - } - if( i == cmds.end() ) - { - Logger::log() << Logger::Error << "TcpReportingInterpreter::removeCommand: removing unknown command" << ipt << Logger::endl; - } else { - Command* todel = *i; - cmds.erase(i); - delete todel; - } - commands.unlock(); - } +TcpReportingSession* TcpReportingInterpreter::getSession() const { + return _parent; +} - void TcpReportingInterpreter::setVersion10() - { - commands.lock(); - removeCommand( "VERSION" ); - addCommand( new ListCommand(this) ); - addCommand( new HeaderCommand(this) ); - addCommand( new SilenceCommand(this) ); - addCommand( new SetLimitCommand(this) ); - addCommand( new SubscribeCommand(this) ); - addCommand( new UnsubscribeCommand(this) ); - addCommand( new SubscriptionsCommand(this) ); - commands.unlock(); - _parent->silence( false ); - } +void TcpReportingInterpreter::processLine(std::string line) { + if (line.empty()) { + return; + } + + /* parseParameters returns data by reference */ + std::string cmd; + std::string* params; + + unsigned int argc = parseParameters(line, cmd, ¶ms); + + boost::to_upper(cmd); + + /* search the command to be executed */ + bool correct = false; + Command* command = Command::find(cmds, cmd); + if (command) /* command found */ + { + command->execute(argc, params); + correct = true; + } else { + Logger::log() << Logger::Error << "Invalid command: " << line + << Logger::endl; + } + + if (!correct) { + getSession()->getOstream() << "105 Command not found" << std::endl; + getSession()->flushOstream(); + } } + +unsigned int TcpReportingInterpreter::parseParameters(std::string& ipt, + std::string& cmd, + std::string** params) { + unsigned int argc = 0; + std::string::size_type pos = ipt.find_first_of("\t ", 0); + while (pos != std::string::npos) { + pos = ipt.find_first_of("\t ", pos + 1); + argc++; + } + if (argc > 0) { + *params = new std::string[argc]; + pos = ipt.find_first_of("\t ", 0); + cmd = ipt.substr(0, pos); + unsigned int npos; + for (unsigned int i = 0; i < argc; i++) { + npos = ipt.find_first_of("\t ", pos + 1); + (*params)[i] = ipt.substr(pos + 1, npos - pos - 1); + pos = npos; + } + } else { + cmd = ipt; + *params = 0; + } + return argc; } +void TcpReportingInterpreter::setVersion10() { + // removeCommand("VERSION"); //TODO: implement + addCommand(new ListCommand(this)); + addCommand(new HeaderCommand(this)); + addCommand(new SilenceCommand(this)); + addCommand(new SetLimitCommand(this)); + addCommand(new SubscribeCommand(this)); + addCommand(new UnsubscribeCommand(this)); + addCommand(new SubscriptionsCommand(this)); +} +} diff --git a/reporting/command.hpp b/reporting/command.hpp index ab822c35..151c7d64 100644 --- a/reporting/command.hpp +++ b/reporting/command.hpp @@ -28,188 +28,125 @@ #define ORO_COMP_TCP_REPORTING_COMMAND_HPP #include #include - -namespace OCL -{ - -namespace TCP -{ - class Datasender; - class Socket; - class Command; - class RealCommand; - - /** - * Reads a line from the client and interprete it. - */ - class TcpReportingInterpreter - { - protected: - std::vector cmds; - RTT::os::MutexRecursive commands; - unsigned int parseParameters( std::string& ipt, std::string& cmd, std::string** params ); - Datasender* _parent; - - public: - /** - * After setup, the interpreter will only recognize the command - * 'VERSION 1.0' by default. - */ - TcpReportingInterpreter(Datasender* parent); - ~TcpReportingInterpreter(); - void process(); - - /** - * Get the marshaller associated with the current connection. - */ - Datasender* getConnection() const; - - /** - * Accept all valid commands (except 'VERSION 1.0') - */ - void setVersion10(); - - /** - * Return a reference to the command list. - */ - const std::vector& giveCommands() const; - - /** - * Add support for the given command. - */ - void addCommand( Command* command ); - - /** - * Remove support for the given command name. - */ - void removeCommand( const char* name ); - }; - - /** - * Command pattern - */ - class Command - { - protected: - std::string _name; - - public: - Command( std::string name ); - virtual ~Command(); - virtual bool is(std::string& cmd) const; - - /** - * Return a reference to the object which is really responsible - * for executing this command. This enables multiple names - * for the same command. - * Return 0 if no such command is founded. - */ - virtual RealCommand* getRealCommand(const std::vector& cmds) const = 0; - - /** - * Find the command with the given name in the vector. - */ - static Command* find(const std::vector& cmds, const std::string& cmp); - - /** - * Compare on name - */ - bool operator==(const std::string& cmp) const; - bool operator!=(const std::string& cmp) const; - bool operator<( const Command& cmp ) const; - - /** - * Get the name of this command. - */ - const std::string& getName() const; - }; - - /** - * Another name for a command - */ - class AliasCommand : public Command - { - private: - std::string _alias; - - public: - AliasCommand( std::string name, std::string alias ); - virtual ~AliasCommand() {} - virtual RealCommand* getRealCommand(const std::vector& cmds) const; - }; - - /** - * Real command which can be executed. - */ - class RealCommand : public Command - { - protected: - TcpReportingInterpreter* _parent; - unsigned int _minargs; - unsigned int _maxargs; - const char* _syntax; - - /** - * Main code to be implemented by children. - */ - virtual void maincode( int argc, std::string* args ) = 0; - - /** - * Send the correct syntax to the client. - * Return false. - */ - bool sendError102() const; - - /** - * Send the message 101 OK to the client. - * Return true. - */ - bool sendOK() const; - - /** - * Convert the parameter with the given index to upper-case. - * The caller has to make sure that the given index is a valid index. - */ - void toupper( std::string* args, int index ) const; - - /** - * Convert all parameters between the start and stop index to upper-case. - * The caller has to make sure that both start and stop are valid indexes. - * stop must be strictly greater than start. - */ - void toupper( std::string* args, int start, int stop ) const; - - /** - * Return the socket for this command. - * Fast shortcut for _parent->getConnection()->getSocket() - */ - inline Socket& socket() const; - - public: - RealCommand( std::string name, TcpReportingInterpreter* parent, unsigned int minargs = 0, unsigned int maxargs = 0, const char* syntax = 0); - virtual ~RealCommand(); - - /** - * Return true if the syntax is correct, false otherwise. - * Send an error message to the client on incorrect syntax. - */ - virtual bool correctSyntax( unsigned int argc, std::string* args ); - - /** - * Return syntax information - */ - const char* getSyntax() const; - - /** - * Returns this. - */ - virtual RealCommand* getRealCommand(const std::vector& cmds) const; - - /** - * Execute this command. - */ - void execute( int argc, std::string* args ); - }; -} +#include "tcpreportingsession.hpp" +#include + +namespace OCL { +class Command; +class RealCommand; +class TcpReportingSession; + +/** + * Reads a line from the client and interprete it. + */ +class TcpReportingInterpreter { + protected: + unsigned int parseParameters(std::string& ipt, std::string& cmd, + std::string** params); + TcpReportingSession* _parent; + + public: + /** + * After setup, the interpreter will only recognize the command + * 'VERSION 1.0' by default. + */ + TcpReportingInterpreter(TcpReportingSession* parent); + ~TcpReportingInterpreter(); + void processLine(std::string line); + std::vector cmds; + + /** + * Get the marshaller associated with the current connection. + */ + TcpReportingSession* getSession() const; + + /** + * Accept all valid commands (except 'VERSION 1.0') + */ + void setVersion10(); + + /** + * Return a reference to the command list. + */ + const std::vector& giveCommands() const; + + /** + * Add support for the given command. + */ + void addCommand(Command* command); +}; + +/** + * Command pattern + */ +class Command { + protected: + std::string _name; + TcpReportingInterpreter* _parent; + unsigned int _minargs; + unsigned int _maxargs; + std::string _syntax; + + /** + * Send the correct syntax to the client. + * Return false. + */ + bool sendError102() const; + + /** + * Return the socket for this command. + * Fast shortcut for _parent->getConnection()->getSocket() + */ + inline std::ostream& ostream() const; + + public: + Command(std::string name, TcpReportingInterpreter* parent, + unsigned int minargs, unsigned int maxargs, + std::string syntax); + Command(std::string name, TcpReportingInterpreter* parent, + unsigned int minargs, unsigned int maxargs); + Command(std::string name, TcpReportingInterpreter* parent); + + virtual bool is(std::string& cmd) const; + + /** + * Find the command with the given name in the vector. + */ + static Command* find(const std::vector& cmds, + const std::string& cmp); + + /** + * Compare on name + */ + bool operator==(const std::string& cmp) const; + bool operator!=(const std::string& cmp) const; + bool operator<(const Command& cmp) const; + + /** + * Get the name of this command. + */ + const std::string& getName() const; + std::string &getSyntax(); + + virtual void maincode(int argc, std::string* args) = 0; + + /** + * Return true if the syntax is correct, false otherwise. + * Send an error message to the client on incorrect syntax. + */ + bool correctSyntax(unsigned int argc); + + /** + * Execute this command. + */ + void execute(int argc, std::string* args); +}; + +/** + * Real command which can be executed. + */ +class RealCommand : public Command { + public: +}; } #endif diff --git a/reporting/datasender.cpp b/reporting/datasender.cpp deleted file mode 100644 index 9cfebfe7..00000000 --- a/reporting/datasender.cpp +++ /dev/null @@ -1,228 +0,0 @@ -/*************************************************************************** - - datasender.cpp - description - ------------------- - begin : Wed August 9 2006 - copyright : (C) 2006 Bas Kemper - (C) 2007 Ruben Smits //Changed subscription structure - email : kst@baskemper.be - first dot last at mech dot kuleuven dot be - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ - -#include -#include -#include -#include -#include -#include "socket.hpp" -#include "socketmarshaller.hpp" -#include "datasender.hpp" -#include "command.hpp" -#include "TcpReporting.hpp" -#include - -namespace OCL -{ -namespace TCP -{ - Datasender::Datasender(RTT::SocketMarshaller* _marshaller, Orocos::TCP::Socket* _os): - Activity(10), os( _os ), marshaller(_marshaller) - { - limit = 0; - curframe = 0; - reporter = marshaller->getReporter(); - silenced = true; - interpreter = new TcpReportingInterpreter(this); - } - - Datasender::~Datasender() - { - subscriptions.clear(); - delete interpreter; - delete os; - } - - void Datasender::loop() - { - *os << "100 Orocos 1.0 TcpReporting Server 1.0" << std::endl; - while( os->isValid() ) - { - interpreter->process(); - } - Logger::log() << Logger::Info << "Connection closed!" << Logger::endl; - } - - bool Datasender::breakloop() - { - os->close(); - return true; - } - - RTT::SocketMarshaller* Datasender::getMarshaller() const - { - return marshaller; - } - - Socket& Datasender::getSocket() const - { - return *os; - } - - bool Datasender::isValid() const - { - return os && os->isValid(); - } - - bool Datasender::addSubscription(const std::string name ) - { - lock.lock(); - log(Debug)<<"Datasender::addSubscription: "<getReport()->find(name)!=NULL){ - //check if subscription already exists - std::vector::const_iterator pos = - find(subscriptions.begin(),subscriptions.end(),name); - if(pos!=subscriptions.end()){ - Logger::In("DataSender"); - log(Info)<<"Already subscribed to "<removeConnection( this ); - } - - bool Datasender::removeSubscription( const std::string& name ) - { - lock.lock(); - //check if subscription exists - std::vector::iterator pos = - find(subscriptions.begin(),subscriptions.end(),name); - if(pos!=subscriptions.end()){ - Logger::In("DataSender"); - log(Info)<<"Removing subscription for "<::const_iterator elem=subscriptions.begin(); - elem!=subscriptions.end();elem++) - *os<<"305 "<< *elem<getName()<<"\n"; - Property* bag = dynamic_cast< Property* >( v ); - if ( bag ) - this->writeOut( bag->value() ); - else { - *os<<"205 " <getDataSource()<<"\n"; - } - - } - - void Datasender::writeOut(const PropertyBag &v) - { - for ( - PropertyBag::const_iterator i = v.getProperties().begin(); - i != v.getProperties().end(); - i++ ) - { - this->writeOut( *i ); - } - - } - - - void Datasender::checkbag(const PropertyBag &v) - { - log(Debug)<<"Let's check the subscriptions"<::iterator elem = subscriptions.begin(); - elem!=subscriptions.end();elem++){ - base::PropertyBase* prop = reporter->getReport()->find(*elem); - if(prop!=NULL){ - writeOut(prop); - }else{ - Logger::In("DataSender"); - log(Error)<<*elem<<" not longer available for reporting,"<< - ", removing the subscription."< limit && limit != 0 ) - { - *os << "204 Limit reached" << std::endl; - } - } - lock.unlock(); - } - -} -} diff --git a/reporting/datasender.hpp b/reporting/datasender.hpp deleted file mode 100644 index 26b733d6..00000000 --- a/reporting/datasender.hpp +++ /dev/null @@ -1,136 +0,0 @@ -/*************************************************************************** - - datasender.hpp - description - ------------------- - begin : Wed August 9 2006 - copyright : (C) 2006 Bas Kemper - (C) 2007 Ruben Smits //Changed subscription structure - email : kst@baskemper.be - first dot last at mech dot kuleuven dot be - - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ - -#ifndef ORO_COMP_TCP_DATASENDER -#define ORO_COMP_TCP_DATASENDER - -#include -#include -#include - -using RTT::os::Mutex; -using RTT::base::PropertyBase; -using RTT::Property; -using RTT::PropertyBag; - -namespace RTT -{ - class SocketMarshaller; -} -namespace OCL{ - - namespace TCP{ - using namespace RTT; - class TcpReportingInterpreter; - class Socket; - - /** - * This class manages the connection with one single client. It is - * responsible for sending data to the client and managing the - * state of the client. - * - * It has a thread responsible for reading data from the socket. - */ - class Datasender - : public RTT::Activity - { - private: - os::Mutex lock; - TcpReportingInterpreter* interpreter; - void checkbag(const PropertyBag &v); - void writeOut(base::PropertyBase* v); - void writeOut(const PropertyBag &v); - Socket* os; - OCL::TcpReporting* reporter; - unsigned long long limit; - unsigned long long curframe; - bool silenced; - RTT::SocketMarshaller* marshaller; - std::vector subscriptions; - - public: - Datasender(RTT::SocketMarshaller* marshaller, Socket* os); - virtual ~Datasender(); - - /** - * Returns true if the connection of the datasender is valid, - * false otherwise. - */ - bool isValid() const; - - /** - * Only frames up to frame will be processed. - */ - void setLimit(unsigned long long newlimit); - - /** - * Send data to the client. - */ - void serialize(const PropertyBag &v); - - /** - * Return the marshaller. - */ - RTT::SocketMarshaller* getMarshaller() const; - - bool addSubscription(const std::string name ); - bool removeSubscription( const std::string& name ); - - /** - * Write a list of the current subscriptions to the socket. - */ - void listSubscriptions(); - - /** - * Get socket associated with this datasender. - */ - Socket& getSocket() const; - - /** - * Data connection main loop - */ - virtual void loop(); - - /** - * Try to finish this thread. - */ - virtual bool breakloop(); - - /** - * Disable/enable output of data - */ - void silence(bool newstate); - - /** - * Remove this connection - */ - void remove(); - }; -} -} -#endif diff --git a/reporting/socket.cpp b/reporting/socket.cpp deleted file mode 100644 index 07ba3c87..00000000 --- a/reporting/socket.cpp +++ /dev/null @@ -1,297 +0,0 @@ -/*************************************************************************** - - Socket.cpp - Small socket wrapper - ------------------- - begin : Fri Aug 4 2006 - copyright : (C) 2006 Bas Kemper - email : kst@ .be - - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ - -#include -#include -#include -#include -#include -#include -#include "socket.hpp" - -using RTT::Logger; - -#define MSGLENGTH 100 -#if MSGLENGTH * 3 > BUFLENGTH - #error "MSGLENGTH too long" /* memcpy is used */ -#endif - -#if __APPLE__ -#define SEND_OPTIONS 0 -#else -#define SEND_OPTIONS MSG_NOSIGNAL -#endif - -namespace { - const unsigned int bufsize = 2048; - class sockbuf : public std::streambuf - { - private: - char* ptr; - OCL::TCP::Socket* mainClass; - - public: - sockbuf( OCL::TCP::Socket* m ) : mainClass(m) - { - char* ptr = new char[bufsize]; - setp(ptr, ptr + bufsize); // output buffer - setg(0, 0, 0); // input stream: not enabled -#if __APPLE__ - /* Linux uses MSG_NOSIGNAL on the ::send() calls, but Mac OS X - supports this as a socket option with SIG_NOSIGPIPE. Just - set the socket now with that option and not worry about the - MSG_NOSIGNAL's later in each of the send() calls. For - further details, see -http://lists.apple.com/archives/macnetworkprog/2002/Dec/msg00091.html -http://trac.wxwidgets.org/ticket/7150 -http://gobby.0x539.de/trac/browser/net6/trunk/src/socket.cpp?rev=224 - */ - int value = 1; - if (-1 == setsockopt( - mainClass->socket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value))) - { - Logger::log() << Logger::Error << "Error setting socket option. Continuing." << Logger::endl; - } - #endif - } - - ~sockbuf() - { - sync(); - delete[] ptr; - } - - int overflow(int c) - { - int ret = 0; - put_buffer(); - - if (c != EOF) - { - if (pbase() == epptr()) - { - put_char(c); - } else { - ret = sputc(c); - } - } else { - ret = EOF; - } - return ret; - } - - int sync() - { - put_buffer(); - return 0; - } - - void put_char(int chr) - { - Logger::log() << Logger::Error << "Socket::put_char is unimplemented" << Logger::endl; - } - - void put_buffer() - { - if (pbase() != pptr()) - { - int length = (pptr() - pbase()); - char *buffer = new char[length + 1]; - - strncpy(buffer, pbase(), length); - buffer[length] = '\0'; - - // empty strings are not sent - if( length && ::send ( mainClass->socket, buffer, length, SEND_OPTIONS ) == -1 ) - { - mainClass->rawClose(); - } - setp(pbase(), epptr()); - delete[] buffer; - } - } - }; -}; - -namespace OCL { -namespace TCP { - Socket::Socket( int socketID ) : - std::ostream( new sockbuf(this) ), - socket(socketID), begin(0), ptrpos(0), end(0) - { - } - - - Socket::~Socket() - { - if( isValid() ) - { - rawClose(); - } - } - - bool Socket::isValid() const - { - return socket >= 0; - } - - bool Socket::dataAvailable() - { - return isValid() && lineAvailable(); - } - - bool Socket::lineAvailable() - { - int flags = fcntl(socket,F_GETFL); - fcntl(socket,F_SETFL,flags | O_NONBLOCK); - int ret =recv(socket,buffer,MSGLENGTH,MSG_PEEK); - if(ret>0){ - //search for \n or \0 - for(unsigned int i=0;i= BUFLENGTH ) { - if( ptrpos - begin > MSGLENGTH ) { - Logger::log() << Logger::Error << "Message length violation" << Logger::endl; - rawClose(); - } else { - memcpy( buffer, &buffer[begin], end - begin); - } - end -= begin; - ptrpos -= begin; - begin = 0; - } - } - - std::string Socket::readLine() - { - if(dataAvailable()){ - if(0>recv(socket,buffer,sizeof(char[ptrpos+1]),MSG_WAITALL)) - return ""; - - return std::string(buffer,ptrpos); - } - return ""; - /* ugly C style code to read a line from the socket */ - - - /* - while(isValid()){ - // process remaining full lines in the buffer - if( lineAvailable() ){ - std::string ret(&buffer[begin]); - - if( begin == end - 1 ){ - // reset to start of buffer when everything is read - begin = 0; - end = 0; - ptrpos = 0; - } else { - ++ptrpos; - begin = ptrpos; - } - return ret; - } - - // move data back to the beginning of the buffer (should not occur very often) - checkBufferOverflow(); - - - // wait for additional input - int received = recv(socket, &buffer[end], MSGLENGTH, 0 ); - if( received == 0 || received == -1 ){ - rawClose(); - return ""; - } - end += received; - } - return ""; - */ - } - - void Socket::rawClose() - { - if( socket != -1 ) - { - ::close(socket); - } - socket = -1; - return; - } - - void Socket::close() - { - int _socket = socket; - socket = -1; - - if( _socket ) - { - // The user notification is sent non-blocking. - int flags = fcntl( _socket, F_GETFL, 0 ); - if( flags == -1 ) - { - flags = 0; - } - fcntl( _socket, F_SETFL, flags | O_NONBLOCK ); - ::send ( _socket, "104 Bye bye", 11, SEND_OPTIONS ); - ::close( _socket ); - } - } -}; -}; diff --git a/reporting/socket.hpp b/reporting/socket.hpp deleted file mode 100644 index 9b4d034e..00000000 --- a/reporting/socket.hpp +++ /dev/null @@ -1,106 +0,0 @@ -/*************************************************************************** - - Socket.h - Small socket wrapper - ------------------- - begin : Fri Aug 4 2006 - copyright : (C) 2006 Bas Kemper - email : kst@ .be - - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ -#ifndef ORO_COMP_SOCKET_H -#define ORO_COMP_SOCKET_H -#include - -namespace { - class sockbuf; -}; - -namespace OCL { -namespace TCP { - class Socket : public std::ostream { - friend class ::sockbuf; - private: - /** - * Socket ID - */ - int socket; - - /** - * Return true when a line which is already - * stored in the buffer is available. - * Terminate the line with \0 and adjust ptrpos to this - * position. - * - * Begin should be the beginning of the new line. - */ - bool lineAvailable(); - - /** - * Move all data in the buffer to the beginning of the - * buffer, if needed. - */ - void checkBufferOverflow(); - - /** - * Close socket without any message to the client. - */ - void rawClose(); - - /** - * Available data. - */ - /* buflength should be at least msglength * 2 in order to avoid problems with memcpy! */ - #define BUFLENGTH 2000 - char buffer[BUFLENGTH]; - int begin; - int ptrpos; - int end; - - public: - /** - * Create an incoming server socket. - * - * @param port Port to listen on. - */ - Socket( int socketID ); - ~Socket(); - - /** - * Check wether the state of the socket is valid or not. - */ - bool isValid() const; - - /** - * Check wether there is new data available. - */ - bool dataAvailable(); - - /** - * Read a line from the socket. - */ - std::string readLine(); - - /** - * Close the connection. Send a nice message to the user. - */ - void close(); - }; -}; -}; -#endif diff --git a/reporting/socketmarshaller.cpp b/reporting/socketmarshaller.cpp deleted file mode 100644 index efd8f706..00000000 --- a/reporting/socketmarshaller.cpp +++ /dev/null @@ -1,115 +0,0 @@ -/*************************************************************************** - - socketmarshaller.cpp - description - ------------------- - begin : Mon August 7 2006 - copyright : (C) 2006 Bas Kemper - email : kst@baskemper.be - - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ - -#include -#include -#include -#include -#include "TcpReporting.hpp" -#include "socketmarshaller.hpp" -#include "datasender.hpp" - -using RTT::Logger; - -namespace RTT -{ - SocketMarshaller::SocketMarshaller(OCL::TcpReporting* reporter) - : _reporter(reporter) - { - } - - SocketMarshaller::~SocketMarshaller() - { - closeAllConnections(); - } - - void SocketMarshaller::addConnection(OCL::TCP::Socket* os) - { - lock.lock(); - OCL::TCP::Datasender* conn = new OCL::TCP::Datasender(this, os); - _connections.push_front( conn ); - conn->Activity::start(); - lock.unlock(); - } - - void SocketMarshaller::closeAllConnections() - { - // TODO: locking, proper connection shutdown - while( !_connections.empty() ) - { - removeConnection( _connections.front() ); - } - } - - void SocketMarshaller::flush() - {} - - void SocketMarshaller::removeConnection(OCL::TCP::Datasender* sender) - { - lock.lock(); - _connections.remove( sender ); - sender->breakLoop(); - delete sender; - lock.unlock(); - } - - OCL::TcpReporting* SocketMarshaller::getReporter() const - { - return _reporter; - } - - void SocketMarshaller::serialize(RTT::base::PropertyBase*) - { - // This method is pure virtual in the parent class. - Logger::log() << Logger::Error << "Unexpected call to SocketMarshaller::serialize" << - Logger::endl; - } - - void SocketMarshaller::serialize(const PropertyBag &v) - { - lock.lock(); - // TODO: sending data does not run in parallel! - for( std::list::iterator it = _connections.begin(); - it != _connections.end(); ) - { - if( (*it)->isValid() ) - { - (*it)->serialize(v); - it++; - } else { - OCL::TCP::Datasender* torm = *it; - it++; - removeConnection( torm ); - } - } - lock.unlock(); - } - - void SocketMarshaller::shutdown() - { - closeAllConnections(); - } -} diff --git a/reporting/socketmarshaller.hpp b/reporting/socketmarshaller.hpp deleted file mode 100644 index becd8a23..00000000 --- a/reporting/socketmarshaller.hpp +++ /dev/null @@ -1,71 +0,0 @@ -/*************************************************************************** - - socketmarshaller.hpp - description - ------------------- - begin : Mon August 7 2006 - copyright : (C) 2006 Bas Kemper - email : kst@baskemper.be - - *************************************************************************** - * This library is free software; you can redistribute it and/or * - * modify it under the terms of the GNU Lesser General Public * - * License as published by the Free Software Foundation; either * - * version 2.1 of the License, or (at your option) any later version. * - * * - * This library is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * - * Lesser General Public License for more details. * - * * - * You should have received a copy of the GNU Lesser General Public * - * License along with this library; if not, write to the Free Software * - * Foundation, Inc., 59 Temple Place, * - * Suite 330, Boston, MA 02111-1307 USA * - * * - ***************************************************************************/ - -#ifndef ORO_COMP_SOCKET_MARSHALLER -#define ORO_COMP_SOCKET_MARSHALLER - -#include -#include -#include -#include - -namespace OCL -{ -class TcpReporting; -namespace TCP -{ - class Datasender; - class Socket; -} -} - -namespace RTT -{ - /** - * marsh::MarshallInterface which sends data to multiple sockets. - */ - class SocketMarshaller - : public marsh::MarshallInterface - { - private: - RTT::os::MutexRecursive lock; - std::list _connections; - OCL::TcpReporting* _reporter; - - public: - SocketMarshaller(OCL::TcpReporting* reporter); - ~SocketMarshaller(); - virtual void flush(); - virtual void serialize(RTT::base::PropertyBase*); - virtual void serialize(const PropertyBag &v); - void addConnection(OCL::TCP::Socket* os); - void removeConnection(OCL::TCP::Datasender* sender); - void closeAllConnections(); - void shutdown(); - OCL::TcpReporting* getReporter() const; - }; -} -#endif diff --git a/reporting/tcpreportingsession.cpp b/reporting/tcpreportingsession.cpp new file mode 100644 index 00000000..e06b8a5e --- /dev/null +++ b/reporting/tcpreportingsession.cpp @@ -0,0 +1,128 @@ +#include "tcpreportingsession.hpp" +#include +#include +#include +#include + +namespace OCL { +TcpReportingSession::TcpReportingSession(tcp::socket *socket) + : frameLimit(0), + sentFrames(0), + silenced(false), + socket(socket), + ostream(&output_buffer), + interpreter(this) { + registerRead(); + getOstream() << "100 Orocos 1.0 TcpReporting Server 1.0" << std::endl; + flushOstream(); +} + +TcpReportingSession::~TcpReportingSession() { + delete socket; +} + +void TcpReportingSession::terminate() { + if (socket->is_open()) { + socket->shutdown(tcp::socket::shutdown_send); + asio::error_code error; + socket->close(error); + if (error) { + RTT::log(RTT::Error) << "error closing socket: " << error.message() + << RTT::endlog(); + } + } +} + +std::ostream &TcpReportingSession::getOstream() { return ostream; } + +void TcpReportingSession::flushOstream() { + if (output_buffer.size() > 0) + asio::async_write(*socket, output_buffer.data(), + boost::bind(&TcpReportingSession::handleWrite, this, + asio::placeholders::error(), + asio::placeholders::bytes_transferred())); +} + +bool TcpReportingSession::addSubscription(std::string itemName) { + return subscriptions.insert(itemName).second; +} + +bool TcpReportingSession::removeSubscription(std::string itemName) { + return subscriptions.erase(itemName); +} + +boost::unordered_set &TcpReportingSession::getSubscriptions() { + return subscriptions; +} + +void TcpReportingSession::registerRead() { + asio::async_read_until(*socket, input_buffer, '\n', + boost::bind(&TcpReportingSession::handleLine, this, + asio::placeholders::error, + asio::placeholders::bytes_transferred)); +} + +void TcpReportingSession::handleLine(const asio::error_code &error, + std::size_t bytes_transferred) { + if (error) { + asio::error_code closing_error; + socket->close(closing_error); + if (closing_error) + RTT::log(RTT::Error) << "error closing socket: " + << closing_error.message() << RTT::endlog(); + + } else { + std::istream is(&input_buffer); + std::string line; + line.resize(bytes_transferred); + is.getline(&line[0], bytes_transferred, '\n'); + interpreter.processLine(line.substr(0, bytes_transferred - 1)); + registerRead(); + } +} + +void TcpReportingSession::handleWrite(const asio::error_code &error, + std::size_t bytes_transferred) { + if (error) { + terminate(); + RTT::log(RTT::Error) << "error writing data: " << error.message() + << RTT::endlog(); + } else { + output_buffer.consume(bytes_transferred); + } +} + +void TcpReportingSession::writeItemUpdate(RTT::base::PropertyBase *prop) { + if (subscriptions.count(prop->getName())) { + if (dynamic_cast *>(prop)) { + // TODO: maybe implement? + // serialize(bag->value()); + } else { + getOstream() << "202 " << prop->getName() << "\n"; + getOstream() << "205 " << prop->getDataSource() << "\n"; + } + } +} + +void TcpReportingSession::flush() { flushOstream(); } + +void TcpReportingSession::serialize(RTT::base::PropertyBase *v) { + if (shouldSendFrame()) { + } +} + +void TcpReportingSession::serialize(const RTT::PropertyBag &v) { + lastSerializedPropertyBag = &v; + + if (shouldSendFrame()) { + for (int i = 0; i < v.size(); ++i) { + writeItemUpdate(v.getItem(i)); + } + getOstream() << "203 " << sentFrames << " -- end of frame" << std::endl; + sentFrames++; + if (sentFrames > frameLimit && frameLimit != 0) { + getOstream() << "204 Limit reached" << std::endl; + } + } +} +} diff --git a/reporting/tcpreportingsession.hpp b/reporting/tcpreportingsession.hpp new file mode 100644 index 00000000..b7e0049a --- /dev/null +++ b/reporting/tcpreportingsession.hpp @@ -0,0 +1,59 @@ +#ifndef TCPREPORTINGSESSION_HPP +#define TCPREPORTINGSESSION_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include "command.hpp" + +namespace OCL { + +using asio::ip::tcp; +class TcpReportingSession : public RTT::marsh::MarshallInterface { + public: + TcpReportingSession(tcp::socket* socket); + ~TcpReportingSession(); + void terminate(); + std::ostream& getOstream(); + void flushOstream(); + bool addSubscription(std::string itemName); + bool removeSubscription(std::string itemName); + boost::unordered_set &getSubscriptions(); + const RTT::PropertyBag* lastSerializedPropertyBag; + unsigned long frameLimit; + unsigned long sentFrames; + bool silenced; + + virtual void flush(); + virtual void serialize(RTT::base::PropertyBase*); + virtual void serialize(const RTT::PropertyBag& v); + + private: + tcp::socket* socket; + asio::streambuf input_buffer; + asio::streambuf output_buffer; + std::ostream ostream; + + TcpReportingInterpreter interpreter; + + boost::unordered_set subscriptions; + + void registerRead(); + void handleLine(const asio::error_code& error, std::size_t bytes_transferred); + void handleWrite(const asio::error_code& error, + std::size_t bytes_transferred); + + void writeItemUpdate(RTT::base::PropertyBase* prop); + inline bool shouldSendFrame() { + return !subscriptions.empty() && (frameLimit == 0 || sentFrames <= frameLimit) && + !silenced && socket->is_open(); + } +}; +} + +#endif // TCPREPORTINGSESSION_HPP