From 27f3c4f3cc620826591bf5fc13f49aca6264f63b Mon Sep 17 00:00:00 2001 From: Gregory Cage Date: Tue, 20 Aug 2024 15:25:23 -0400 Subject: [PATCH 1/4] Add initial example for service to service communication --- examples/4_service_to_service/__init__.py | 0 .../4_service_to_service/example_1_service.py | 78 +++++++++++++++++++ .../4_service_to_service/example_2_service.py | 62 +++++++++++++++ .../4_service_to_service/example_client.py | 78 +++++++++++++++++++ pyproject.toml | 1 + tests/e2e/test_examples.py | 7 ++ 6 files changed, 226 insertions(+) create mode 100644 examples/4_service_to_service/__init__.py create mode 100644 examples/4_service_to_service/example_1_service.py create mode 100644 examples/4_service_to_service/example_2_service.py create mode 100644 examples/4_service_to_service/example_client.py diff --git a/examples/4_service_to_service/__init__.py b/examples/4_service_to_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/4_service_to_service/example_1_service.py b/examples/4_service_to_service/example_1_service.py new file mode 100644 index 0000000..801b1dd --- /dev/null +++ b/examples/4_service_to_service/example_1_service.py @@ -0,0 +1,78 @@ +"""First Service for example. Sends a message to service two and emits an event for the client.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectDirectMessageParams, + IntersectEventDefinition, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_event, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceOneCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service One Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message() + def pass_text_to_service_2(self, text: str) -> None: + """Takes in a string parameter and sends it to service 2.""" + logger.info('maing it to service one') + msg_to_send = IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-two', + operation='ServiceTwo.test_service', + payload=text, + ) + + # Send intersect message to another service + self.intersect_sdk_call_service(msg_to_send, self.service_2_handler) + + @intersect_event(events={'response_event': IntersectEventDefinition(event_type=str)}) + def service_2_handler(self, msg: str) -> None: + """Handles response from service two and emites the response as an event for the client.""" + logger.info('maing it to right before emitting event') + self.intersect_sdk_emit_event('response_event', f'Received Response from Service 2: {msg}') + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-one', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceOneCapabilityImplementation() + capability.capability_name = 'ServiceOne' + service = IntersectService([capability], config) + logger.info('Starting service one, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_2_service.py b/examples/4_service_to_service/example_2_service.py new file mode 100644 index 0000000..bd2b0ad --- /dev/null +++ b/examples/4_service_to_service/example_2_service.py @@ -0,0 +1,62 @@ +"""Second Service for example.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceTwoCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service Two Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message + def test_service(self, text: str) -> str: + """Returns the text given along with acknowledgement.""" + logger.info('Making it to service 2') + return f'Acknowledging service one text -> {text}' + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-two', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceTwoCapabilityImplementation() + capability.capability_name = 'ServiceTwo' + service = IntersectService([capability], config) + logger.info('Starting service two, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py new file mode 100644 index 0000000..b2fb6c0 --- /dev/null +++ b/examples/4_service_to_service/example_client.py @@ -0,0 +1,78 @@ +"""Client for service to service example. + +Kicks off exmaple by sending message to service one, and then +waits for an event from service one to confirm the messages were passed between the two services properly. + +""" + +import logging + +from intersect_sdk import ( + INTERSECT_JSON_VALUE, + IntersectClient, + IntersectClientCallback, + IntersectClientConfig, + IntersectDirectMessageParams, + default_intersect_lifecycle_loop, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class SampleOrchestrator: + """Simply contains an event callback for events from Service One.""" + + def event_callback( + self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE + ) -> None: + """This simply prints the event from Service One to your console. + + Params: + source: the source of the response message. + operation: the name of the function we called in the original message. + _has_error: Boolean value which represents an error. + payload: Value of the response from the Service. + """ + logger.info('making it to event callback') + print(payload) + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + + # The counter will start after the initial message. + # If the service is already active and counting, this may do nothing. + initial_messages = [ + IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-one', + operation='ServiceOne.pass_text_to_service_2', + payload='Kicking off the example!', + ) + ] + config = IntersectClientConfig( + initial_message_event_config=IntersectClientCallback( + messages_to_send=initial_messages, + services_to_start_listening_for_events=[ + 'example-organization.example-facility.example-system.example-subsystem.service-one' + ], + ), + **from_config_file, + ) + orchestrator = SampleOrchestrator() + client = IntersectClient( + config=config, + event_callback=orchestrator.event_callback, + ) + default_intersect_lifecycle_loop( + client, + ) diff --git a/pyproject.toml b/pyproject.toml index 988b3ad..b606147 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ test = [ test-all = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml" test-all-debug = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml -s" test-unit = "pytest tests/unit --cov=src/intersect_sdk/" +test-e2e = "pytest tests/e2e --cov=src/intersect_sdk/" lint = {composite = ["lint-format", "lint-ruff", "lint-mypy"]} lint-format = "ruff format" lint-ruff = "ruff check --fix" diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c6af75f..c65071f 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -132,3 +132,10 @@ def test_example_3_ping_pong_events(): def test_example_3_ping_pong_events_amqp(): assert run_example_test('3_ping_pong_events_amqp') == 'ping\npong\nping\npong\n' + + +def test_example_4_service_to_service(): + assert ( + run_example_test('4_service_to_service') + == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!' + ) From 667e07ac712788747799f4887ccf6c01b109325c Mon Sep 17 00:00:00 2001 From: Gregory Cage Date: Tue, 20 Aug 2024 15:38:55 -0400 Subject: [PATCH 2/4] Add json schemas for service to service example --- .../example_1_service_schema.json | 174 ++++++++++++++++++ .../example_2_service_schema.json | 170 +++++++++++++++++ 2 files changed, 344 insertions(+) create mode 100644 examples/4_service_to_service/example_1_service_schema.json create mode 100644 examples/4_service_to_service/example_2_service_schema.json diff --git a/examples/4_service_to_service/example_1_service_schema.json b/examples/4_service_to_service/example_1_service_schema.json new file mode 100644 index 0000000..2cbb1d4 --- /dev/null +++ b/examples/4_service_to_service/example_1_service_schema.json @@ -0,0 +1,174 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-one", + "version": "0.0.0", + "description": "Service One Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "pass_text_to_service_2": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "null" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "events": [] + } + }, + "events": { + "response_event": { + "type": "string" + } + }, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } diff --git a/examples/4_service_to_service/example_2_service_schema.json b/examples/4_service_to_service/example_2_service_schema.json new file mode 100644 index 0000000..56d4741 --- /dev/null +++ b/examples/4_service_to_service/example_2_service_schema.json @@ -0,0 +1,170 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-two", + "version": "0.0.0", + "description": "Service Two Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "test_service": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "events": [] + } + }, + "events": {}, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } From a801c280d4f68a10be4286957ade9ddbe048908a Mon Sep 17 00:00:00 2001 From: Gregory Cage Date: Wed, 21 Aug 2024 10:56:57 -0400 Subject: [PATCH 3/4] Remove debugging statements and fix typos --- examples/4_service_to_service/example_1_service.py | 8 +++----- examples/4_service_to_service/example_2_service.py | 5 ++--- examples/4_service_to_service/example_client.py | 5 ++--- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/examples/4_service_to_service/example_1_service.py b/examples/4_service_to_service/example_1_service.py index 801b1dd..0619e83 100644 --- a/examples/4_service_to_service/example_1_service.py +++ b/examples/4_service_to_service/example_1_service.py @@ -20,7 +20,7 @@ class ExampleServiceOneCapabilityImplementation(IntersectBaseCapabilityImplementation): - """Service One Capability.""" + """Service 1 Capability.""" @intersect_status() def status(self) -> str: @@ -30,7 +30,6 @@ def status(self) -> str: @intersect_message() def pass_text_to_service_2(self, text: str) -> None: """Takes in a string parameter and sends it to service 2.""" - logger.info('maing it to service one') msg_to_send = IntersectDirectMessageParams( destination='example-organization.example-facility.example-system.example-subsystem.service-two', operation='ServiceTwo.test_service', @@ -42,8 +41,7 @@ def pass_text_to_service_2(self, text: str) -> None: @intersect_event(events={'response_event': IntersectEventDefinition(event_type=str)}) def service_2_handler(self, msg: str) -> None: - """Handles response from service two and emites the response as an event for the client.""" - logger.info('maing it to right before emitting event') + """Handles response from service 2 and emits the response as an event for the client.""" self.intersect_sdk_emit_event('response_event', f'Received Response from Service 2: {msg}') @@ -72,7 +70,7 @@ def service_2_handler(self, msg: str) -> None: capability = ExampleServiceOneCapabilityImplementation() capability.capability_name = 'ServiceOne' service = IntersectService([capability], config) - logger.info('Starting service one, use Ctrl+C to exit.') + logger.info('Starting Service 1, use Ctrl+C to exit.') default_intersect_lifecycle_loop( service, ) diff --git a/examples/4_service_to_service/example_2_service.py b/examples/4_service_to_service/example_2_service.py index bd2b0ad..ef5670f 100644 --- a/examples/4_service_to_service/example_2_service.py +++ b/examples/4_service_to_service/example_2_service.py @@ -17,7 +17,7 @@ class ExampleServiceTwoCapabilityImplementation(IntersectBaseCapabilityImplementation): - """Service Two Capability.""" + """Service 2 Capability.""" @intersect_status() def status(self) -> str: @@ -27,7 +27,6 @@ def status(self) -> str: @intersect_message def test_service(self, text: str) -> str: """Returns the text given along with acknowledgement.""" - logger.info('Making it to service 2') return f'Acknowledging service one text -> {text}' @@ -56,7 +55,7 @@ def test_service(self, text: str) -> str: capability = ExampleServiceTwoCapabilityImplementation() capability.capability_name = 'ServiceTwo' service = IntersectService([capability], config) - logger.info('Starting service two, use Ctrl+C to exit.') + logger.info('Starting Service 2, use Ctrl+C to exit.') default_intersect_lifecycle_loop( service, ) diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py index b2fb6c0..dd3834b 100644 --- a/examples/4_service_to_service/example_client.py +++ b/examples/4_service_to_service/example_client.py @@ -21,12 +21,12 @@ class SampleOrchestrator: - """Simply contains an event callback for events from Service One.""" + """Simply contains an event callback for events from Service 1.""" def event_callback( self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE ) -> None: - """This simply prints the event from Service One to your console. + """This simply prints the event from Service 1 to your console. Params: source: the source of the response message. @@ -34,7 +34,6 @@ def event_callback( _has_error: Boolean value which represents an error. payload: Value of the response from the Service. """ - logger.info('making it to event callback') print(payload) From 391e7b0e6e9e4e37e47c40c79d81fbbb70436580 Mon Sep 17 00:00:00 2001 From: Lance Drane Date: Wed, 21 Aug 2024 11:33:06 -0400 Subject: [PATCH 4/4] fix svc2svc request cleanup issue and E2E test Signed-off-by: Lance Drane --- .../4_service_to_service/example_client.py | 2 ++ src/intersect_sdk/service.py | 28 ++++++++++--------- tests/e2e/test_examples.py | 2 +- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py index dd3834b..25e2573 100644 --- a/examples/4_service_to_service/example_client.py +++ b/examples/4_service_to_service/example_client.py @@ -35,6 +35,8 @@ def event_callback( payload: Value of the response from the Service. """ print(payload) + # break out of pubsub loop + raise Exception if __name__ == '__main__': diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index 32b768c..5bdd9ae 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -125,6 +125,8 @@ def __init__( self.got_valid_response: bool = False self.response_fn = response_handler self.waiting: bool = False + self.cleanup_req = False + """When this flag is set to True, mark this request for GC deletion.""" def __init__( self, @@ -507,14 +509,6 @@ def create_external_request( self._external_requests_lock.release_lock() return request_uuid - def _delete_external_request(self, req_id: UUID) -> None: - req_id_str = str(req_id) - if req_id_str in self._external_requests: - self._external_requests_lock.acquire_lock(blocking=True) - req: IntersectService._ExternalRequest = self._external_requests.pop(req_id_str) - del req - self._external_requests_lock.release_lock() - def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalRequest | None: req_id_str = str(req_id) if req_id_str in self._external_requests: @@ -524,14 +518,25 @@ def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalReque def _process_external_requests(self) -> None: self._external_requests_lock.acquire_lock(blocking=True) + + # process requests for extreq in self._external_requests.values(): if not extreq.processed: self._process_external_request(extreq) + # delete requests + cleanup_list = [ + str(extreq.request_id) + for extreq in self._external_requests.values() + if extreq.cleanup_req + ] + for extreq_id in cleanup_list: + extreq = self._external_requests.pop(extreq_id) + del extreq + self._external_requests_lock.release_lock() def _process_external_request(self, extreq: IntersectService._ExternalRequest) -> None: response = None - cleanup_req = False now = datetime.now(timezone.utc) logger.debug(f'Processing external request {extreq.request_id} @ {now}') @@ -555,7 +560,7 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - logger.warning( f'External service request encountered an error: {error_msg}' ) - cleanup_req = True + extreq.cleanup_req = True else: logger.debug('Request wait timed-out!') extreq.waiting = False @@ -570,9 +575,6 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - ): extreq.response_fn(response) - if cleanup_req: - self._delete_external_request(extreq.request_id) - def _handle_service_message_raw(self, raw: bytes) -> None: """Main broker callback function. diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c65071f..f1990df 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -137,5 +137,5 @@ def test_example_3_ping_pong_events_amqp(): def test_example_4_service_to_service(): assert ( run_example_test('4_service_to_service') - == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!' + == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n' )