Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "neosmartblue.py"
version = "0.1.3"
version = "0.1.4"
description = "A Python library for controlling Neo Smart Blinds via BlueLink Bluetooth connection"
authors = [
{name = "ikifar2012", email = "[email protected]"}
Expand Down
61 changes: 44 additions & 17 deletions src/neosmartblue/py/device.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import asyncio
import logging
from typing import Optional, Dict, Any
from bleak import BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic
from .parse_status import parse_status_data

from .const import _RX_UUID, _TX_UUID
Expand Down Expand Up @@ -89,28 +92,52 @@ async def ping(self):
command = generate_ping_command()
await self.send_command(command)

async def receive_data(self, timeout: float = 5.0) -> bytes:
"""
Listen for a notification on the TX characteristic.

async def receive_data(self, timeout: float = 5.0) -> Optional[Dict[str, Any]]:
"""Listen for a notification on the TX characteristic.

Parameters:
timeout (float): The number of seconds to wait for a notification.

Returns:
bytes: The data received, or None if no data is received within the timeout.
Optional[Dict[str, Any]]: Parsed status dict from parse_status_data, or None if timeout/no notification.
"""
data = None
logger = logging.getLogger(__name__)
data: Optional[Dict[str, Any]] = None
data_event = asyncio.Event()

def notification_handler(sender: int, received_data: bytes):
def notification_handler(sender: BleakGATTCharacteristic, received_data: bytearray):
nonlocal data
print("Notification from", sender, ":", received_data.hex().upper())
# extract the status payload from the received data
status_payload = received_data[4:9]
status = parse_status_data(status_payload)
data = status
print("Parsed status:", status)
try:
printable = bytes(received_data)
logger.info(
"Notification from characteristic %s: %s",
getattr(sender, 'uuid', sender),
printable.hex().upper(),
)
if len(printable) >= 9:
status_payload = printable[4:9]
status = parse_status_data(status_payload)
data = status
logger.info("Parsed status: %s", status)
data_event.set()
else:
logger.info(
"Received data too short to parse status payload (len=%d)",
len(printable),
)
except Exception: # pragma: no cover - defensive
logger.exception("Error handling notification")

await self.client.start_notify(_TX_UUID, notification_handler)
await asyncio.sleep(timeout)
await self.client.stop_notify(_TX_UUID)
return data
try:
try:
await asyncio.wait_for(data_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
# No data within timeout; return None
pass
return data
finally:
try:
await self.client.stop_notify(_TX_UUID)
except Exception: # pragma: no cover - defensive cleanup
logger.exception("Failed to stop notifications")