Skip to content

Commit

Permalink
Update event scanner chunk logic
Browse files Browse the repository at this point in the history
  • Loading branch information
cyc60 committed Nov 15, 2024
1 parent 4c9dc35 commit 9e56318
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sw-utils"
version = "v0.6.30"
version = "v0.6.31"
description = "StakeWise Python utils"
authors = ["StakeWise Labs <[email protected]>"]
license = "GPL-3.0-or-later"
Expand Down
38 changes: 19 additions & 19 deletions sw_utils/event_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,25 @@ def __init__(
self,
processor: EventProcessor,
argument_filters: dict[str, Any] | None = None,
chunk_size: int | None = None,
):
self.processor = processor
self.argument_filters = argument_filters
self._contract_call = lambda from_block, to_block: getattr(
processor.contract.events, processor.contract_event
).get_logs(argument_filters=argument_filters, fromBlock=from_block, toBlock=to_block)
# todo: remove type ignore after move Contract wrapper to sw-utils
self.provider = self.processor.contract.contract.w3.provider # type: ignore
# Scan in chunks, commit between
self.chunk_size = chunk_size or self.max_scan_chunk_size // 2

async def process_new_events(self, to_block: BlockNumber) -> None:
current_from_block = await self.processor.get_from_block()
if current_from_block >= to_block:
return

# Scan in chunks, commit between
chunk_size = self.max_scan_chunk_size

while current_from_block < to_block:
estimated_end_block = min(to_block, BlockNumber(current_from_block + chunk_size))
current_to_block, new_events = await self._scan_chunk(
current_from_block, estimated_end_block
)
current_to_block, new_events = await self._scan_chunk(current_from_block, to_block)
await self.processor.process_events(new_events, to_block=current_to_block)

if new_events:
Expand All @@ -83,14 +82,13 @@ async def process_new_events(self, to_block: BlockNumber) -> None:
to_block,
)

# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self._estimate_next_chunk_size(chunk_size)

# Try to increase blocks range for the next time
self._estimate_next_chunk_size()
# Set where the next chunk starts
current_from_block = BlockNumber(current_to_block + 1)

async def _scan_chunk(
self, from_block: BlockNumber, to_block: BlockNumber
self, from_block: BlockNumber, last_block: BlockNumber
) -> tuple[BlockNumber, list[EventData]]:
"""
Read and process events between block numbers.
Expand All @@ -99,22 +97,24 @@ async def _scan_chunk(
"""
retries = self.max_request_retries
for i in range(retries):
to_block = min(last_block, BlockNumber(from_block + self.chunk_size))
try:
return to_block, await self._contract_call(from_block, to_block)
with self.provider.disable_retries():
return to_block, await self._contract_call(from_block, to_block)
except Exception as e:
if i < retries - 1:
# Decrease the `eth_getBlocks` range
to_block = BlockNumber(from_block + ((to_block - from_block) // 2))
self.chunk_size = self.chunk_size // 2
to_block = BlockNumber(from_block + self.chunk_size)
# Let the JSON-RPC to recover e.g. from restart
await sleep(self.request_retry_seconds)
continue

raise e

raise RuntimeError(f'Failed to sync chunk: from block={from_block}, to block={to_block}')
raise RuntimeError(f'Failed to sync chunk: from block={from_block}, to block={last_block}')

def _estimate_next_chunk_size(self, current_chuck_size: int) -> int:
current_chuck_size *= self.chunk_size_multiplier
current_chuck_size = max(self.min_scan_chunk_size, current_chuck_size)
current_chuck_size = min(self.max_scan_chunk_size, current_chuck_size)
return current_chuck_size
def _estimate_next_chunk_size(self) -> None:
self.chunk_size *= self.chunk_size_multiplier
self.chunk_size = max(self.min_scan_chunk_size, self.chunk_size)
self.chunk_size = min(self.max_scan_chunk_size, self.chunk_size)
9 changes: 9 additions & 0 deletions sw_utils/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ def lock_endpoint(self, endpoint_uri: URI | str) -> Iterator:
finally:
self._locker_provider = None

@contextlib.contextmanager
def disable_retries(self) -> Iterator:
cache = self.retry_timeout
self.retry_timeout = 0
try:
yield
finally:
self.retry_timeout = cache

def set_retry_timeout(self, retry_timeout: int) -> None:
self.retry_timeout = retry_timeout

Expand Down

0 comments on commit 9e56318

Please sign in to comment.