@@ -63,7 +63,27 @@ namespace thinger{
6363 /* *
6464 * Can be override to start reconnection process
6565 */
66- virtual void disconnected (){}
66+ virtual void disconnected (){
67+ // stop all streaming resources after disconnect
68+ if (thinger_resource::get_streaming_counter ()>0 ) {
69+ thinger_map<thinger_resource>::entry* current = resources_.begin ();
70+ while (current!=NULL ){
71+ current->value_ .disable_streaming ();
72+ current = current->next_ ;
73+ }
74+ }
75+ }
76+
77+ /* *
78+ * Stream a given resource
79+ */
80+ void stream_resource (thinger_resource& resource, thinger_message::signal_flag type){
81+ thinger_message message;
82+ message.set_stream_id (resource.get_stream_id ());
83+ message.set_signal_flag (type);
84+ resource.fill_api_io (message.get_data ());
85+ send_message (message);
86+ }
6787
6888 public:
6989
@@ -77,7 +97,8 @@ namespace thinger{
7797 keep_alive_response = true ;
7898
7999 thinger_message message;
80- message.resources ().add (" login" ).add (username).add (device_id).add (credential);
100+ message.set_signal_flag (thinger_message::AUTH);
101+ message.resources ().add (username).add (device_id).add (credential);
81102 if (!send_message (message)) return false ;
82103
83104 thinger_message response;
@@ -86,22 +107,44 @@ namespace thinger{
86107
87108 bool call_endpoint (const char * endpoint_name){
88109 thinger_message message;
89- message.resources ().add (" ep" ).add (endpoint_name);
110+ message.set_signal_flag (thinger_message::CALL_ENDPOINT);
111+ message.resources ().add (endpoint_name);
90112 return send_message (message);
91113 }
92114
93115 bool call_endpoint (const char * endpoint_name, pson& data){
94116 thinger_message message;
117+ message.set_signal_flag (thinger_message::CALL_ENDPOINT);
95118 message.set_data (data);
96- message.resources ().add (" ep " ). add ( endpoint_name);
119+ message.resources ().add (endpoint_name);
97120 return send_message (message);
98121 }
99122
123+ bool call_endpoint (const char * endpoint_name, thinger_resource& resource){
124+ thinger_message message;
125+ message.set_signal_flag (thinger_message::CALL_ENDPOINT);
126+ message.resources ().add (endpoint_name);
127+ resource.fill_api_io (message.get_data ());
128+ return send_message (message);
129+ }
130+
131+ /* *
132+ * Stream the given resource. This resource should be previously requested by an external process.
133+ * Otherwise, the resource will not be streamed as nothing will be listening for it.
134+ */
135+ bool stream (thinger_resource& resource){
136+ if (resource.stream_enabled ()){
137+ stream_resource (resource, thinger_message::STREAM_EVENT);
138+ return true ;
139+ }
140+ return false ;
141+ }
142+
100143 bool send_message (thinger_message& message)
101144 {
102145 thinger_encoder sink;
103146 sink.encode (message);
104- encoder.pb_encode_varint (message. get_message_type () );
147+ encoder.pb_encode_varint (MESSAGE );
105148 encoder.pb_encode_varint (sink.bytes_written ());
106149 encoder.encode (message);
107150 return write (NULL , 0 , true );
@@ -119,24 +162,35 @@ namespace thinger{
119162 if (keep_alive_response){
120163 last_keep_alive = current_time;
121164 keep_alive_response = false ;
122- encoder.pb_encode_varint (thinger_message:: KEEP_ALIVE);
165+ encoder.pb_encode_varint (KEEP_ALIVE);
123166 encoder.pb_encode_varint (0 );
124167 write (NULL , 0 , true );
125168 }else {
126169 disconnected ();
127170 }
128171 }
172+
173+ // handle streaming resources
174+ if (thinger_resource::get_streaming_counter ()>0 ){
175+ thinger_map<thinger_resource>::entry* current = resources_.begin ();
176+ while (current!=NULL ){
177+ if (current->value_ .stream_required (current_time)){
178+ stream_resource (current->value_ , thinger_message::STREAM_SAMPLE);
179+ }
180+ current = current->next_ ;
181+ }
182+ }
129183 }
130184
131185 bool read_message (thinger_message& message){
132- uint8_t message_type = decoder.pb_decode_varint32 ();
133- switch (message_type ){
134- case thinger_message:: MESSAGE: {
186+ uint8_t type = decoder.pb_decode_varint32 ();
187+ switch (type ){
188+ case MESSAGE: {
135189 size_t size = decoder.pb_decode_varint32 ();
136190 decoder.decode (message, size);
137191 }
138192 break ;
139- case thinger_message:: KEEP_ALIVE: {
193+ case KEEP_ALIVE: {
140194 size_t size = decoder.pb_decode_varint32 ();
141195 keep_alive_response = true ;
142196 }
0 commit comments