Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial example for service to service communication #14

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
76 changes: 76 additions & 0 deletions examples/4_service_to_service/example_1_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""First Service for example. Sends a message to service two and emits an event for the client."""

import logging

from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectDirectMessageParams,
IntersectEventDefinition,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
intersect_event,
intersect_message,
intersect_status,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ExampleServiceOneCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""Service 1 Capability."""

@intersect_status()
def status(self) -> str:
"""Basic status function which returns a hard-coded string."""
return 'Up'

@intersect_message()
def pass_text_to_service_2(self, text: str) -> None:
"""Takes in a string parameter and sends it to service 2."""
msg_to_send = IntersectDirectMessageParams(
destination='example-organization.example-facility.example-system.example-subsystem.service-two',
operation='ServiceTwo.test_service',
payload=text,
)

# Send intersect message to another service
self.intersect_sdk_call_service(msg_to_send, self.service_2_handler)

@intersect_event(events={'response_event': IntersectEventDefinition(event_type=str)})
def service_2_handler(self, msg: str) -> None:
"""Handles response from service 2 and emits the response as an event for the client."""
self.intersect_sdk_emit_event('response_event', f'Received Response from Service 2: {msg}')


if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt3.1.1',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='example-organization',
facility='example-facility',
system='example-system',
subsystem='example-subsystem',
service='service-one',
),
status_interval=30.0,
**from_config_file,
)
capability = ExampleServiceOneCapabilityImplementation()
capability.capability_name = 'ServiceOne'
service = IntersectService([capability], config)
logger.info('Starting Service 1, use Ctrl+C to exit.')
default_intersect_lifecycle_loop(
service,
)
174 changes: 174 additions & 0 deletions examples/4_service_to_service/example_1_service_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
{
"asyncapi": "2.6.0",
"x-intersect-version": "0.6.4",
"info": {
"title": "example-organization.example-facility.example-system.example-subsystem.service-one",
"version": "0.0.0",
"description": "Service One Capability.\n"
},
"defaultContentType": "application/json",
"channels": {
"pass_text_to_service_2": {
"publish": {
"message": {
"schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0",
"contentType": "application/json",
"traits": {
"$ref": "#/components/messageTraits/commonHeaders"
},
"payload": {
"type": "null"
}
},
"description": "Takes in a string parameter and sends it to service 2."
},
"subscribe": {
"message": {
"schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0",
"contentType": "application/json",
"traits": {
"$ref": "#/components/messageTraits/commonHeaders"
},
"payload": {
"type": "string"
}
},
"description": "Takes in a string parameter and sends it to service 2."
},
"events": []
}
},
"events": {
"response_event": {
"type": "string"
}
},
"components": {
"schemas": {},
"messageTraits": {
"commonHeaders": {
"messageHeaders": {
"$defs": {
"IntersectDataHandler": {
"description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE",
"enum": [
0,
1
],
"title": "IntersectDataHandler",
"type": "integer"
}
},
"description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.",
"properties": {
"source": {
"description": "source of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Source",
"type": "string"
},
"destination": {
"description": "destination of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Destination",
"type": "string"
},
"created_at": {
"description": "the UTC timestamp of message creation",
"format": "date-time",
"title": "Created At",
"type": "string"
},
"sdk_version": {
"description": "SemVer string of SDK's version, used to check for compatibility",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"title": "Sdk Version",
"type": "string"
},
"data_handler": {
"allOf": [
{
"$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler"
}
],
"default": 0,
"description": "Code signifying where data is stored."
},
"has_error": {
"default": false,
"description": "If this value is True, the payload will contain the error message (a string)",
"title": "Has Error",
"type": "boolean"
}
},
"required": [
"source",
"destination",
"created_at",
"sdk_version"
],
"title": "UserspaceMessageHeader",
"type": "object"
},
"eventHeaders": {
"$defs": {
"IntersectDataHandler": {
"description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE",
"enum": [
0,
1
],
"title": "IntersectDataHandler",
"type": "integer"
}
},
"description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.",
"properties": {
"source": {
"description": "source of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Source",
"type": "string"
},
"created_at": {
"description": "the UTC timestamp of message creation",
"format": "date-time",
"title": "Created At",
"type": "string"
},
"sdk_version": {
"description": "SemVer string of SDK's version, used to check for compatibility",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"title": "Sdk Version",
"type": "string"
},
"data_handler": {
"allOf": [
{
"$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler"
}
],
"default": 0,
"description": "Code signifying where data is stored."
},
"event_name": {
"title": "Event Name",
"type": "string"
}
},
"required": [
"source",
"created_at",
"sdk_version",
"event_name"
],
"title": "EventMessageHeaders",
"type": "object"
}
}
}
},
"status": {
"type": "string"
}
}
61 changes: 61 additions & 0 deletions examples/4_service_to_service/example_2_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Second Service for example."""

import logging

from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
intersect_message,
intersect_status,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ExampleServiceTwoCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""Service 2 Capability."""

@intersect_status()
def status(self) -> str:
"""Basic status function which returns a hard-coded string."""
return 'Up'

@intersect_message
def test_service(self, text: str) -> str:
"""Returns the text given along with acknowledgement."""
return f'Acknowledging service one text -> {text}'


if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt3.1.1',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='example-organization',
facility='example-facility',
system='example-system',
subsystem='example-subsystem',
service='service-two',
),
status_interval=30.0,
**from_config_file,
)
capability = ExampleServiceTwoCapabilityImplementation()
capability.capability_name = 'ServiceTwo'
service = IntersectService([capability], config)
logger.info('Starting Service 2, use Ctrl+C to exit.')
default_intersect_lifecycle_loop(
service,
)
Loading