diff --git a/examples/4_service_to_service/__init__.py b/examples/4_service_to_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/4_service_to_service/example_1_service.py b/examples/4_service_to_service/example_1_service.py new file mode 100644 index 0000000..0619e83 --- /dev/null +++ b/examples/4_service_to_service/example_1_service.py @@ -0,0 +1,76 @@ +"""First Service for example. Sends a message to service two and emits an event for the client.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectDirectMessageParams, + IntersectEventDefinition, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_event, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceOneCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service 1 Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message() + def pass_text_to_service_2(self, text: str) -> None: + """Takes in a string parameter and sends it to service 2.""" + msg_to_send = IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-two', + operation='ServiceTwo.test_service', + payload=text, + ) + + # Send intersect message to another service + self.intersect_sdk_call_service(msg_to_send, self.service_2_handler) + + @intersect_event(events={'response_event': IntersectEventDefinition(event_type=str)}) + def service_2_handler(self, msg: str) -> None: + """Handles response from service 2 and emits the response as an event for the client.""" + self.intersect_sdk_emit_event('response_event', f'Received Response from Service 2: {msg}') + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-one', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceOneCapabilityImplementation() + capability.capability_name = 'ServiceOne' + service = IntersectService([capability], config) + logger.info('Starting Service 1, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_1_service_schema.json b/examples/4_service_to_service/example_1_service_schema.json new file mode 100644 index 0000000..2cbb1d4 --- /dev/null +++ b/examples/4_service_to_service/example_1_service_schema.json @@ -0,0 +1,174 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-one", + "version": "0.0.0", + "description": "Service One Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "pass_text_to_service_2": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "null" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "events": [] + } + }, + "events": { + "response_event": { + "type": "string" + } + }, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } diff --git a/examples/4_service_to_service/example_2_service.py b/examples/4_service_to_service/example_2_service.py new file mode 100644 index 0000000..ef5670f --- /dev/null +++ b/examples/4_service_to_service/example_2_service.py @@ -0,0 +1,61 @@ +"""Second Service for example.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceTwoCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service 2 Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message + def test_service(self, text: str) -> str: + """Returns the text given along with acknowledgement.""" + return f'Acknowledging service one text -> {text}' + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-two', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceTwoCapabilityImplementation() + capability.capability_name = 'ServiceTwo' + service = IntersectService([capability], config) + logger.info('Starting Service 2, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_2_service_schema.json b/examples/4_service_to_service/example_2_service_schema.json new file mode 100644 index 0000000..56d4741 --- /dev/null +++ b/examples/4_service_to_service/example_2_service_schema.json @@ -0,0 +1,170 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-two", + "version": "0.0.0", + "description": "Service Two Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "test_service": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "events": [] + } + }, + "events": {}, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py new file mode 100644 index 0000000..25e2573 --- /dev/null +++ b/examples/4_service_to_service/example_client.py @@ -0,0 +1,79 @@ +"""Client for service to service example. + +Kicks off exmaple by sending message to service one, and then +waits for an event from service one to confirm the messages were passed between the two services properly. + +""" + +import logging + +from intersect_sdk import ( + INTERSECT_JSON_VALUE, + IntersectClient, + IntersectClientCallback, + IntersectClientConfig, + IntersectDirectMessageParams, + default_intersect_lifecycle_loop, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class SampleOrchestrator: + """Simply contains an event callback for events from Service 1.""" + + def event_callback( + self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE + ) -> None: + """This simply prints the event from Service 1 to your console. + + Params: + source: the source of the response message. + operation: the name of the function we called in the original message. + _has_error: Boolean value which represents an error. + payload: Value of the response from the Service. + """ + print(payload) + # break out of pubsub loop + raise Exception + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + + # The counter will start after the initial message. + # If the service is already active and counting, this may do nothing. + initial_messages = [ + IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-one', + operation='ServiceOne.pass_text_to_service_2', + payload='Kicking off the example!', + ) + ] + config = IntersectClientConfig( + initial_message_event_config=IntersectClientCallback( + messages_to_send=initial_messages, + services_to_start_listening_for_events=[ + 'example-organization.example-facility.example-system.example-subsystem.service-one' + ], + ), + **from_config_file, + ) + orchestrator = SampleOrchestrator() + client = IntersectClient( + config=config, + event_callback=orchestrator.event_callback, + ) + default_intersect_lifecycle_loop( + client, + ) diff --git a/pyproject.toml b/pyproject.toml index 988b3ad..b606147 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ test = [ test-all = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml" test-all-debug = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml -s" test-unit = "pytest tests/unit --cov=src/intersect_sdk/" +test-e2e = "pytest tests/e2e --cov=src/intersect_sdk/" lint = {composite = ["lint-format", "lint-ruff", "lint-mypy"]} lint-format = "ruff format" lint-ruff = "ruff check --fix" diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index 32b768c..5bdd9ae 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -125,6 +125,8 @@ def __init__( self.got_valid_response: bool = False self.response_fn = response_handler self.waiting: bool = False + self.cleanup_req = False + """When this flag is set to True, mark this request for GC deletion.""" def __init__( self, @@ -507,14 +509,6 @@ def create_external_request( self._external_requests_lock.release_lock() return request_uuid - def _delete_external_request(self, req_id: UUID) -> None: - req_id_str = str(req_id) - if req_id_str in self._external_requests: - self._external_requests_lock.acquire_lock(blocking=True) - req: IntersectService._ExternalRequest = self._external_requests.pop(req_id_str) - del req - self._external_requests_lock.release_lock() - def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalRequest | None: req_id_str = str(req_id) if req_id_str in self._external_requests: @@ -524,14 +518,25 @@ def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalReque def _process_external_requests(self) -> None: self._external_requests_lock.acquire_lock(blocking=True) + + # process requests for extreq in self._external_requests.values(): if not extreq.processed: self._process_external_request(extreq) + # delete requests + cleanup_list = [ + str(extreq.request_id) + for extreq in self._external_requests.values() + if extreq.cleanup_req + ] + for extreq_id in cleanup_list: + extreq = self._external_requests.pop(extreq_id) + del extreq + self._external_requests_lock.release_lock() def _process_external_request(self, extreq: IntersectService._ExternalRequest) -> None: response = None - cleanup_req = False now = datetime.now(timezone.utc) logger.debug(f'Processing external request {extreq.request_id} @ {now}') @@ -555,7 +560,7 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - logger.warning( f'External service request encountered an error: {error_msg}' ) - cleanup_req = True + extreq.cleanup_req = True else: logger.debug('Request wait timed-out!') extreq.waiting = False @@ -570,9 +575,6 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - ): extreq.response_fn(response) - if cleanup_req: - self._delete_external_request(extreq.request_id) - def _handle_service_message_raw(self, raw: bytes) -> None: """Main broker callback function. diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c6af75f..f1990df 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -132,3 +132,10 @@ def test_example_3_ping_pong_events(): def test_example_3_ping_pong_events_amqp(): assert run_example_test('3_ping_pong_events_amqp') == 'ping\npong\nping\npong\n' + + +def test_example_4_service_to_service(): + assert ( + run_example_test('4_service_to_service') + == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n' + )