Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If not all DB records are received, try one-by-one #531

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion insteon_mqtt/db/Device.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,23 @@ def is_current(self, delta):
"""
return delta == self.delta

#-----------------------------------------------------------------------
def is_complete(self):
"""See if the database is complete.

Check that all DB entries between START_MEM_LOC and self.last
(inclusive) are present in either self.entries or self.unused.

Returns:
(bool) Returns True if the database is complete.
"""
if (self.last not in self.entries.values() and
self.last not in self.unused.values()):
# Last entry hasn't been added/downloaded yet
return False
expected_entries = ((START_MEM_LOC - self.last.mem_loc) / 8) + 1
return len(self.entries) + len(self.unused) == expected_entries

#-----------------------------------------------------------------------
def increment_delta(self):
"""Increments the current database delta by 1
Expand Down Expand Up @@ -770,7 +787,7 @@ def _add_using_unused(self, addr, group, is_controller, data,
#-----------------------------------------------------------------------
def _add_using_new(self, addr, group, is_controller, data,
on_done):
"""Add a anew entry at the end of the database.
"""Add a new entry at the end of the database.

First we send the new entry to the remote device. If that works,
then we update the previously last entry to mart it as "not the last
Expand Down
121 changes: 121 additions & 0 deletions insteon_mqtt/db/DeviceScanManagerI2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#===========================================================================
#
# Device Scan Manager for i2 or newer Devices
#
#===========================================================================
from .. import log
from .. import message as Msg
from .. import util
from .. import handler
from .DeviceEntry import DeviceEntry
from .Device import START_MEM_LOC

LOG = log.get_logger()


class DeviceScanManagerI2:
"""Manager for scaning the link database of an i2 or newer device.

This class can be used to download any entries that failed to download
using DeviceDbGet.py (or all entries, if started with a cleared DB).
"""
def __init__(self, device, device_db, on_done=None, *, num_retry=3,
mem_addr: int = START_MEM_LOC):
"""Constructor

Args
device: (Device) The Insteon Device object
device_db: (db.Device) The device database being retrieved.
on_done: Finished callback. Will be called when the scan
operation is done.
Keyword-only Args:
num_retry: (int) The number of times to retry each message if the
handler times out without returning Msg.FINISHED.
This count does include the initial sending so a
retry of 3 will send once and then retry 2 more times.
mem_addr: (int) Address at which to start downloading.
"""
self.db = device_db
self.device = device
self._mem_addr = mem_addr
self.on_done = util.make_callback(on_done)
self._num_retry = num_retry

#-------------------------------------------------------------------
def start_scan(self):
"""Start a managed scan of a i2 (or newer) device database."""
self._request_next_record(self.on_done)

#-------------------------------------------------------------------
def _request_next_record(self, on_done):
"""Request the next missing DB record.

Args:
on_done: (callback) a callback that is passed around and run on the
completion of the scan
"""

done, last_entry = self._calculate_next_addr()
if done:
if self.db.is_complete():
on_done(True, "Database received", last_entry)
else:
on_done(False, "Database incomplete", last_entry)
return

data = bytes([
0x00,
0x00, # ALDB record request
self._mem_addr >> 8, # Address MSB
self._mem_addr & 0xff, # Address LSB
0x01, # Read one record
] + [0x00] * 9)
msg = Msg.OutExtended.direct(self.device.addr, 0x2f, 0x00, data)
msg_handler = handler.ExtendedCmdResponse(msg, self.handle_record,
on_done=on_done,
num_retry=self._num_retry)
self.device.send(msg, msg_handler)

#-------------------------------------------------------------------
def handle_record(self, msg, on_done):
"""Handle an ALDB record response by adding an entry to the DB and
fetching the next entry.

Args:
msg: (message.InpExtended) The ALDB record response.
on_done: (callback) a callback that is passed around and run on the
completion of the scan
"""

# Convert the message to a database device entry.
entry = DeviceEntry.from_bytes(msg.data, db=self.db)
LOG.ui("Entry: %s", entry)

# Skip entries w/ a null memory location.
if entry.mem_loc:
self.db.add_entry(entry)

self._request_next_record(on_done)

#-------------------------------------------------------------------
def _calculate_next_addr(self) -> (bool, DeviceEntry):
"""Calculate the memory address of the next missing record.

Returns:
(bool) True if no more records to read
(DeviceEntry) Last entry or closest-to-last (if not received yet)
"""
done = False
last = None
addr = self._mem_addr
entry = self.db.entries.get(addr, self.db.unused.get(addr, None))
while entry is not None:
last = entry
if self.db.last.identical(entry):
# This is the last record (and we already have it)
done = True
break
addr -= 0x8
entry = self.db.entries.get(addr, self.db.unused.get(addr, None))
self._mem_addr = addr
return done, last
1 change: 1 addition & 0 deletions insteon_mqtt/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
from .DeviceEntry import DeviceEntry
from .DeviceModifyManagerI1 import DeviceModifyManagerI1
from .DeviceScanManagerI1 import DeviceScanManagerI1
from .DeviceScanManagerI2 import DeviceScanManagerI2
from .Modem import Modem
from .ModemEntry import ModemEntry
5 changes: 4 additions & 1 deletion insteon_mqtt/handler/DeviceDbGet.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def msg_received(self, protocol, msg):
# Note that if the entry is a null entry (all zeros), then
# is_last_rec will be True as well.
if entry.db_flags.is_last_rec:
self.on_done(True, "Database received", entry)
if self.db.is_complete():
self.on_done(True, "Database received", entry)
else:
self.on_done(False, "Database incomplete", entry)
return Msg.FINISHED

# Otherwise keep processing records as they arrive.
Expand Down
22 changes: 19 additions & 3 deletions insteon_mqtt/handler/DeviceRefresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def msg_received(self, protocol, msg):
# Clear the current database values.
self.device.db.clear()

# When the update message below ends, update the db delta
# When database download is complete, update the db delta
# w/ the current value and save the database.
def on_done(success, message, data):
if success:
Expand All @@ -116,7 +116,22 @@ def on_done(success, message, data):
self.addr, self.device.db)
self.on_done(success, message, data)

# Request that the device send us all of it's database
# Called after DeviceDbGet finishes trying a bulk download
def on_done_dbget(success, message, data):
if success:
# Skip gap-filling step & call above-defined func
on_done(success, message, data)
else:
LOG.warning("%s database bulk download error: %s",
self.addr, message)
# Try filling-in gaps one entry at a time
manager = db.DeviceScanManagerI2(self.device,
self.device.db,
on_done=on_done,
num_retry=3)
manager.start_scan()

# Request that the device send us all of its database
# records. These will be streamed as fast as possible to
# us and the handler will update the database. We need a
# retry count here because battery powered devices don't
Expand All @@ -130,7 +145,8 @@ def on_done(success, message, data):
else:
db_msg = Msg.OutExtended.direct(self.addr, 0x2f, 0x00,
bytes(14))
msg_handler = DeviceDbGet(self.device.db, on_done,
msg_handler = DeviceDbGet(self.device.db,
on_done_dbget,
num_retry=3)
self.device.send(db_msg, msg_handler)
# Either way - this transaction is complete.
Expand Down
155 changes: 155 additions & 0 deletions tests/db/test_DeviceScanManagerI2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#===========================================================================
#
# Tests for: insteont_mqtt/db/DeviceScanManagerI2.py
#
#===========================================================================
import pytest
import insteon_mqtt as IM
import insteon_mqtt.message as Msg
from insteon_mqtt.db.Device import START_MEM_LOC

class Test_DeviceScanManagerI2:
def test_start_scan(self):
dev_addr = IM.Address('0a.12.34')
device = MockDevice(dev_addr, 0)
manager = IM.db.DeviceScanManagerI2(device, device.db)

first_mem_addr = START_MEM_LOC
data = bytes([
0x00,
0x00, # ALDB record request
first_mem_addr >> 8, # Address MSB
first_mem_addr & 0xff, # Address LSB
0x01, # Read one record
] + [0x00] * 9)
db_msg = Msg.OutExtended.direct(dev_addr, 0x2f, 0x00, data)

manager.start_scan()
assert device.sent[0].to_bytes() == db_msg.to_bytes()

#-------------------------------------------------------------------
@pytest.mark.parametrize("init_db,mem_addr,recv,exp_calls,next_addr", [
# Have all but last record. Receive it, making DB complete.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xf7, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xef, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ "Database received" ],
None ),
# Have all but third record. Receive it, making DB complete.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xf7, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xef, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ "Database received" ],
None ),
# Have first two records. Receive third and request fourth.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xef, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xf7, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ ],
0xfe7 ),
# Have first two records. Receive last. DB still incomplete.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xf7, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ "Database incomplete" ],
None ),
# Have first and last records. Receive third. DB still incomplete.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xef, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ "Database incomplete" ],
None ),
# Have first and last records. Receive second and request third.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
None,
[ 0x00, 0x01, 0x0f, 0xf7, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ ],
0xfef ),
# Have first and last records. Expecting second, but receive third.
# Request second.
( [ [ 0x00, 0x00, 0x0f, 0xff, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ 0x00, 0x00, 0x0f, 0xe7, 0xff,
0x00, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ] ],
0xff7,
[ 0x00, 0x01, 0x0f, 0xef, 0xff,
0xff, 0x01, 0x0a, 0x12, 0x34, 0x00, 0x00, 0x00, 0x00 ],
[ ],
0xff7 )
])
def test_handle_record(self, init_db, mem_addr, recv, exp_calls,
next_addr):
calls = []

def callback(success, msg, value):
calls.append(msg)

modem_addr = IM.Address('09.12.34')
dev_addr = IM.Address('0a.12.34')
device = MockDevice(dev_addr, 0)
if mem_addr is None:
# Expecting the same address that we're about to receive
mem_addr = (recv[2] << 8) + recv[3]
manager = IM.db.DeviceScanManagerI2(device, device.db, callback,
mem_addr=mem_addr)

# Initialize DB starting state
for data in init_db:
entry = IM.db.DeviceEntry.from_bytes(data, db=device.db)
device.db.add_entry(entry)

# Handle received message, check callbacks
flags = Msg.Flags(Msg.Flags.Type.DIRECT, True)
msg = Msg.InpExtended(dev_addr, modem_addr, flags, 0x2f, 0x00, recv)
manager.handle_record(msg, callback)
assert len(calls) == len(exp_calls)
for idx, call in enumerate(exp_calls):
assert calls[idx] == call

# Check address of next requested entry (if any expected)
if next_addr is not None:
assert len(device.sent) == 1
sent_data = device.sent[0].data
requested_addr = (sent_data[2] << 8) + sent_data[3]
assert requested_addr == next_addr

#===========================================================================
class MockDevice:
"""Mock insteon_mqtt/Device class
"""
def __init__(self, addr, db_delta):
self.sent = []
self.addr = addr
self.db = IM.db.Device(addr, None, self)
self.db.delta = db_delta

def send(self, msg, handler, priority=None, after=None):
self.sent.append(msg)
Loading
Loading