Asynchronous Python library that listens to Crownstone SSE events.
- Async: using asyncio and aiohttp, easy to integrate in existing projects.
- Multi-functional: Library provides an extra thread wrapper to run the client in sync context.
- Complete: Fully integrated event bus that can be used to listen to events, and run callbacks.
- Python 3.8 or higher
- Aiohttp 3.7
cd to the project folder and run:
$ python setup.py install
To install the library execute the following command:
$ python -m venv venv
Activate your venv using:
$ source venv/bin/activate
Once activated, the venv is used to executed python files, and libraries will be installed in the venv.
To install this library, cd to the project folder and run:
$ python setup.py install
import asyncio
import logging
from crownstone_sse import CrownstoneSSEAsync
# enable logging
logging.basicConfig(format='%(levelname)s :%(message)s', level=logging.DEBUG)
async def main():
# Create a new instance of Crownstone SSE client.
# parameters:
# email (string): your Crownstone account email.
# password (string): your Crownstone account password.
# access_token (string) [optional]: Access token from a previous login to skip the login step.
# websession (aiohttp.ClientSession): provide the websession used in a project this is integrated in.
# reconnection_time (int): time to wait before reconnection on connection loss.
# project_name (string) [optional]: name of the project this is integrated in. This provides context to SSE logs in case of an error.
client = CrownstoneSSEAsync(
email="[email protected]",
password="CrownstoneRocks",
project_name="MyProject"
)
# wait for the client to finish (means: blocking, run forever).
await process_events(client)
# to use this concurrently in an asyncio project, run this instead:
# asyncio.create_task(process_events(client))
async def process_events(sse_client: CrownstoneSSEAsync):
async with sse_client as client:
async for event in client:
# prints string representation of the event.
print(event)
# optionally you can use the provided eventbus.
# event_bus.fire(event.sub_type, event)
# or access specific event details.
# for example a switch state update:
# print(event.switch_state)
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
finally:
print("Crownstone SSE client finished. Thanks for your time!")
The async client is meant to be used in an existing asyncio project. You can pass an existing aiohttp.ClientSession
object to the client, and schedule the receiving of events in the running event loop by using:
asyncio.create_task()
as shown in the example above.
Crownstone SSE library provides a very complete event bus that can be used to schedule coroutines as well as callbacks. Make sure to initiate the event bus within the event loop. The eventbus can be initiated like so:
from crownstone_sse import EventBus
bus = EventBus()
Then you can create a coroutine to be executed upon receiving a specific event:
async def async_update_local_switch_state(event: SwitchStateUpdateEvent):
# example
await crownstone.update_state(event.switch_state)
bus.add_event_listener(EVENT_CROWNSTONE_SWITCH_STATE_UPDATE, async_update_local_switch_state)
The usage of the eventbus is optional here. You can also use an existing eventbus in your project.
import logging
from crownstone_sse import CrownstoneSSE
from crownstone_sse.events import (
SwitchStateUpdateEvent,
SystemEvent,
PresenceEvent,
AbilityChangeEvent,
DataChangeEvent,
)
from crownstone_sse import (
EVENT_ABILITY_CHANGE,
EVENT_DATA_CHANGE,
EVENT_PRESENCE,
EVENT_PRESENCE_ENTER_LOCATION,
EVENT_SWITCH_STATE_UPDATE,
EVENT_SWITCH_STATE_UPDATE_CROWNSTONE,
EVENT_SYSTEM,
OPERATION_CREATE,
OPERATION_DELETE,
OPERATION_UPDATE,
)
# enable logging.
logging.basicConfig(format='%(levelname)s :%(message)s', level=logging.DEBUG)
def switch_update(event: SwitchStateUpdateEvent):
if event.sub_type == EVENT_SWITCH_STATE_UPDATE_CROWNSTONE:
print("Crownstone {} switch state changed to {}".format(event.cloud_id, event.switch_state))
def notify_stream_start(event: SystemEvent):
print(event.message)
def notify_presence_changed(event: PresenceEvent):
if event.sub_type == EVENT_PRESENCE_ENTER_LOCATION:
print("User {} has entered location {}".format(event.user_id, event.location_id))
def notify_ability_changed(event: AbilityChangeEvent):
print("Ability {} changed to {}".format(event.ability_type, event.ability_enabled))
def notify_data_changed(event: DataChangeEvent):
if event.operation == OPERATION_CREATE:
print("New data is available: {}".format(event.changed_item_name))
if event.operation == OPERATION_UPDATE:
print("Name of id {} has been updated to {}".format(event.changed_item_id, event.changed_item_name))
if event.operation == OPERATION_DELETE:
print("Data {} has been deleted".format(event.changed_item_name))
# Create a new instance of Crownstone SSE client.
# email (string): your Crownstone account email.
# password (string): your Crownstone account password.
# access_token (string) [optional]: Access token from a previous login to skip the login step.
# reconnection_time (int): time to wait before reconnection on connection loss.
# project_name (string) [optional]: name of the project this is integrated in. This provides context to SSE logs in case of an error.
sse_client = CrownstoneSSE(
email="[email protected]",
password="CrownstoneRocks",
project_name="MyProject"
)
# Add listeners for event types of your liking, and the desired callback to be executed. see above.
sse_client.add_event_listener(EVENT_SYSTEM, notify_stream_start)
sse_client.add_event_listener(EVENT_SWITCH_STATE_UPDATE, switch_update)
sse_client.add_event_listener(EVENT_PRESENCE, notify_presence_changed)
sse_client.add_event_listener(EVENT_ABILITY_CHANGE, notify_ability_changed)
sse_client.add_event_listener(EVENT_DATA_CHANGE, notify_data_changed)
# Wait until the thread finishes.
# You can terminate the thread by using SIGINT (ctrl + c or stop button in IDE).
try:
sse_client.join()
except KeyboardInterrupt:
sse_client.stop()
This library can be used in synchronous context, and the example above will likely be the go-to option for most users.
You can use the Thread next to other synchronous Python code.
If you want to let the client run forever, you can use:
sse_client.join()
As shown above. This will make the main thread wait till the sse_client thread is finished.
You should however always build in a way to stop the client, you can do so by stopping on KeyboardInterrupt
as shown above.
Callbacks are functions that will be executed everytime an event comes in of an specific event type.
The standard format for a callback is:
def callback(event: EventTypeClass):
# do something
Each event has it's own fields. It is recommended to provide the event type class hint to keep better track of which event it is, and what fields it has.
For example:
def callback(event: PresenceEvent):
print(event.user_id)
print(event.location_id)
You can add the listener like so:
unsub = sse_client.add_event_listener(event_type, callback)
This returns an unsubscribe function in case you want to remove the listener again. To do that, simply call:
unsub()
Currently, there are 7 different event types:
- System event
- Command event
- Data change event
- Presence event
- Switch state update event
- Ability change event
- Ping event
A system event is represented as:
- type
- sub_type
- code
- message
A switch command event is represented as:
- type
- sub_type
- sphere_id
- cloud_id
- unique_id
- switch_val (as SwitchCommandValue)
A multi switch command event is represented as:
- type
- sub_type
- sphere_id
- crownstone_list
- cloud_id
- unique_id
- switch_val (as SwitchCommandValue)
A data change event is represented as:
- operation (update | delete | create)
- type
- sub_type
- sphere_id
- changed_item_id
- changed_item_name
A presence event is represented as:
- type
- sub_type
- sphere_id
- location_id
- user_id
A switch state update event is represented as:
- type
- sub_type
- sphere_id
- cloud_id
- unique_id
- switch_state (percentage)
An ability change event is represented as:
- type
- sub_type
- sphere_id
- cloud_id
- unique_id
- ability_type
- ability_enabled
- ability_synced_to_crownstone
A ping event is represented as:
- type
- counter
- elapsed_time (in seconds)
The ping event exists to notify the client that the connection is still alive, internally.
You can however use it to check how long the connection has been alive as well.
Tests are not available yet for this version. The client has however been live tested on the following:
- Logging in with Crownstone credentials.
- Establishing a connection to the Crownstone SSE server.
- Tested the connection staying alive for longer than 10 minutes (no total timeout).
- Tested client reconnection by manually disabling internet, waiting over 35 seconds, and turning it back on.
- Tested access token renewal by providing a short TTL on the access token when logging in.
- Safely closing the connection and exiting the loop after a manual stop is called, both and running and reconnecting state.
This software is provided under a noncontagious open-source license towards the open-source community. It's available under three open-source licenses:
- License: LGPL v3+, Apache, MIT
This software can also be provided under a commercial license. If you are not an open-source developer or are not planning to release adaptations to the code under one or multiple of the mentioned licenses, contact us to obtain a commercial license.
- License: Crownstone commercial license
For any question contact us at https://crownstone.rocks/contact/ or on our discord server through https://crownstone.rocks/forum/.