2020 set_global_event_loop_policy ,
2121)
2222from ...config import global_config
23- from ...serializers import Serializers
23+ from ...serializers import Serializers , BaseSerializer
2424from .message import (
2525 EMPTY_BYTE ,
2626 ERROR ,
3333from ..thing import Thing
3434from ..property import Property
3535from ..properties import TypedDict
36- from ..actions import BoundAction , action as remote_method
36+ from ..actions import BoundAction
3737from ..logger import LogHistoryHandler
3838
3939
@@ -377,88 +377,64 @@ async def run_thing_instance(self, instance: Thing, scheduler: typing.Optional["
377377 return_value = await self .execute_operation (instance , objekt , operation , payload , preserialized_payload )
378378
379379 # handle return value
380- if (
381- isinstance (return_value , tuple )
382- and len (return_value ) == 2
383- and (isinstance (return_value [1 ], bytes ) or isinstance (return_value [1 ], PreserializedData ))
384- ):
385- if fetch_execution_logs :
386- return_value [0 ] = {
387- "return_value" : return_value [0 ],
388- "execution_logs" : list_handler .log_list ,
389- }
390- payload = SerializableData (
391- return_value [0 ],
392- Serializers .for_object (thing_id , instance .__class__ .__name__ , objekt ),
393- )
394- if isinstance (return_value [1 ], bytes ):
395- preserialized_payload = PreserializedData (return_value [1 ])
396- # elif isinstance(return_value, PreserializedData):
397- # if fetch_execution_logs:
398- # return_value = {
399- # "return_value" : return_value.value,
400- # "execution_logs" : list_handler.log_list
401- # }
402- # payload = SerializableData(return_value.value, content_type='application/json')
403- # preserialized_payload = return_value
404-
405- elif isinstance (return_value , bytes ):
406- payload = SerializableData (None , content_type = "application/json" )
407- preserialized_payload = PreserializedData (return_value )
408- else :
409- # complete thing execution context
410- if fetch_execution_logs :
411- return_value = {
412- "return_value" : return_value ,
413- "execution_logs" : list_handler .log_list ,
414- }
415- payload = SerializableData (
416- return_value ,
417- Serializers .for_object (thing_id , instance .__class__ .__name__ , objekt ),
418- )
419- preserialized_payload = PreserializedData (EMPTY_BYTE , content_type = "text/plain" )
380+ serializer = Serializers .for_object (thing_id , instance .__class__ .__name__ , objekt )
381+ rpayload , rpreserialized_payload = self .format_return_value (return_value , serializer = serializer )
382+
383+ # complete thing execution context
384+ if fetch_execution_logs :
385+ rpayload .value = dict (return_value = rpayload .value , execution_logs = list_handler .log_list )
386+
387+ # raise any payload errors now
388+ rpayload .require_serialized ()
389+
420390 # set reply
421- scheduler .last_operation_reply = (payload , preserialized_payload , REPLY )
391+ scheduler .last_operation_reply = (rpayload , rpreserialized_payload , REPLY )
392+
422393 except BreakInnerLoop :
423394 # exit the loop and stop the thing
424395 instance .logger .info (
425- "Thing {} with instance name {} exiting event loop." .format (
426- instance .__class__ .__name__ , instance .id
427- )
396+ "Thing {} with id {} exiting event loop." .format (instance .__class__ .__name__ , instance .id )
428397 )
429- return_value = None
398+
399+ # send a reply with None return value
400+ rpayload , rpreserialized_payload = self .format_return_value (None , Serializers .json )
401+
402+ # complete thing execution context
430403 if fetch_execution_logs :
431- return_value = {
432- "return_value" : None ,
433- "execution_logs" : list_handler .log_list ,
434- }
435- scheduler .last_operation_reply = (
436- SerializableData (return_value , content_type = "application/json" ),
437- PreserializedData (EMPTY_BYTE , content_type = "text/plain" ),
438- None ,
439- )
440- return
404+ rpayload .value = dict (return_value = rpayload .value , execution_logs = list_handler .log_list )
405+
406+ # set reply, let the message broker decide
407+ scheduler .last_operation_reply = (rpayload , rpreserialized_payload , None )
408+
409+ # quit the loop
410+ break
411+
441412 except Exception as ex :
442413 # error occurred while executing the operation
443414 instance .logger .error (
444415 "Thing {} with ID {} produced error : {} - {}." .format (
445416 instance .__class__ .__name__ , instance .id , type (ex ), ex
446417 )
447418 )
448- return_value = dict (exception = format_exception_as_json (ex ))
449- if fetch_execution_logs :
450- return_value ["execution_logs" ] = list_handler .log_list
451- scheduler .last_operation_reply = (
452- SerializableData (return_value , content_type = "application/json" ),
453- PreserializedData (EMPTY_BYTE , content_type = "text/plain" ),
454- ERROR ,
419+
420+ # send a reply with error
421+ rpayload , rpreserialized_payload = self .format_return_value (
422+ dict (exception = format_exception_as_json (ex )), Serializers .json
455423 )
424+
425+ # complete thing execution context
426+ if fetch_execution_logs :
427+ rpayload .value ["execution_logs" ] = list_handler .log_list
428+
429+ # set error reply
430+ scheduler .last_operation_reply = (rpayload , rpreserialized_payload , ERROR )
431+
456432 finally :
457433 # cleanup
458434 if fetch_execution_logs :
459435 instance .logger .removeHandler (list_handler )
460436 instance .logger .debug (
461- "thing {} with instance name {} completed execution of operation {} on {}" .format (
437+ "thing {} with id {} completed execution of operation {} on {}" .format (
462438 instance .__class__ .__name__ , instance .id , operation , objekt
463439 )
464440 )
@@ -501,17 +477,19 @@ async def execute_operation(
501477 elif operation == Operations .deleteproperty :
502478 prop = instance .properties [objekt ] # type: Property
503479 del prop # raises NotImplementedError when deletion is not implemented which is mostly the case
480+ elif operation == Operations .invokeaction and objekt == "get_thing_description" :
481+ # special case
482+ if payload is None :
483+ payload = dict ()
484+ args = payload .pop ("__args__" , tuple ())
485+ return self .get_thing_description (instance , * args , ** payload )
504486 elif operation == Operations .invokeaction :
505487 if payload is None :
506488 payload = dict ()
507489 args = payload .pop ("__args__" , tuple ())
508490 # payload then become kwargs
509491 if preserialized_payload != EMPTY_BYTE :
510492 args = (preserialized_payload ,) + args
511- # special case
512- if objekt == "get_thing_description" :
513- return self .get_thing_description (instance , * args , ** payload )
514- # normal Thing action
515493 action = instance .actions [objekt ] # type: BoundAction
516494 if action .execution_info .iscoroutine :
517495 # the actual scheduling as a purely async task is done by the scheduler, not here,
@@ -528,6 +506,30 @@ async def execute_operation(
528506 "Unimplemented execution path for Thing {} for operation {}" .format (instance .id , operation )
529507 )
530508
509+ def format_return_value (
510+ self ,
511+ return_value : typing .Any ,
512+ serializer : BaseSerializer ,
513+ ) -> tuple [SerializableData , PreserializedData ]:
514+ if (
515+ isinstance (return_value , tuple )
516+ and len (return_value ) == 2
517+ and (isinstance (return_value [1 ], bytes ) or isinstance (return_value [1 ], PreserializedData ))
518+ ):
519+ payload = SerializableData (return_value [0 ], serializer = serializer , content_type = serializer .content_type )
520+ if isinstance (return_value [1 ], bytes ):
521+ preserialized_payload = PreserializedData (return_value [1 ])
522+ elif isinstance (return_value , bytes ):
523+ payload = SerializableData (None , content_type = "application/json" )
524+ preserialized_payload = PreserializedData (return_value )
525+ elif isinstance (return_value , PreserializedData ):
526+ payload = SerializableData (None , content_type = "application/json" )
527+ preserialized_payload = return_value
528+ else :
529+ payload = SerializableData (return_value , serializer = serializer , content_type = serializer .content_type )
530+ preserialized_payload = PreserializedData (EMPTY_BYTE , content_type = "text/plain" )
531+ return payload , preserialized_payload
532+
531533 async def _process_timeouts (
532534 self ,
533535 request_message : RequestMessage ,
0 commit comments