Skip to content

Commit

Permalink
Update event scanner chunk logic (#127)
Browse files Browse the repository at this point in the history
* Update event scanner chunk logic

* Add test for event scanner chunks

* Remove execution client retries

* Add chunk comments

* Fix comment typo

* Update scanner test
  • Loading branch information
cyc60 authored Nov 18, 2024
1 parent 4c9dc35 commit a72fd95
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sw-utils"
version = "v0.6.30"
version = "v0.6.31"
description = "StakeWise Python utils"
authors = ["StakeWise Labs <[email protected]>"]
license = "GPL-3.0-or-later"
Expand Down Expand Up @@ -36,6 +36,7 @@ exclude_dirs = ["sw_utils/tests/"]

[tool.pylint."pre-commit-hook"]
disable=["C0103", "C0114", "C0115", "C0116", "R0801", "R0903", "W0703", "W1514", "W0511"]
ignore-paths=["sw_utils/tests/.*"]

[tool.pylint."BASIC"]
good-names = ["i", "el", "e", "w", "f", "w3"]
Expand Down
35 changes: 17 additions & 18 deletions sw_utils/event_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,26 @@ def __init__(
self,
processor: EventProcessor,
argument_filters: dict[str, Any] | None = None,
chunk_size: int | None = None,
):
self.processor = processor
self.argument_filters = argument_filters
self._contract_call = lambda from_block, to_block: getattr(
processor.contract.events, processor.contract_event
).get_logs(argument_filters=argument_filters, fromBlock=from_block, toBlock=to_block)

# Start with half of max chunk size. 1kk chunks works only with powerful nodes.
start_chunk_size = self.max_scan_chunk_size // 2
# Scan in chunks, commit between.
self.chunk_size = chunk_size or start_chunk_size

async def process_new_events(self, to_block: BlockNumber) -> None:
current_from_block = await self.processor.get_from_block()
if current_from_block >= to_block:
return

# Scan in chunks, commit between
chunk_size = self.max_scan_chunk_size

while current_from_block < to_block:
estimated_end_block = min(to_block, BlockNumber(current_from_block + chunk_size))
current_to_block, new_events = await self._scan_chunk(
current_from_block, estimated_end_block
)
current_to_block, new_events = await self._scan_chunk(current_from_block, to_block)
await self.processor.process_events(new_events, to_block=current_to_block)

if new_events:
Expand All @@ -83,14 +83,13 @@ async def process_new_events(self, to_block: BlockNumber) -> None:
to_block,
)

# Try to guess how many blocks to fetch over `eth_getLogs` API next time
chunk_size = self._estimate_next_chunk_size(chunk_size)

# Try to increase blocks range for the next time
self._estimate_next_chunk_size()
# Set where the next chunk starts
current_from_block = BlockNumber(current_to_block + 1)

async def _scan_chunk(
self, from_block: BlockNumber, to_block: BlockNumber
self, from_block: BlockNumber, last_block: BlockNumber
) -> tuple[BlockNumber, list[EventData]]:
"""
Read and process events between block numbers.
Expand All @@ -99,22 +98,22 @@ async def _scan_chunk(
"""
retries = self.max_request_retries
for i in range(retries):
to_block = min(last_block, BlockNumber(from_block + self.chunk_size))
try:
return to_block, await self._contract_call(from_block, to_block)
except Exception as e:
if i < retries - 1:
# Decrease the `eth_getBlocks` range
to_block = BlockNumber(from_block + ((to_block - from_block) // 2))
self.chunk_size = self.chunk_size // 2
# Let the JSON-RPC to recover e.g. from restart
await sleep(self.request_retry_seconds)
continue

raise e

raise RuntimeError(f'Failed to sync chunk: from block={from_block}, to block={to_block}')
raise RuntimeError(f'Failed to sync chunk: from block={from_block}, to block={last_block}')

def _estimate_next_chunk_size(self, current_chuck_size: int) -> int:
current_chuck_size *= self.chunk_size_multiplier
current_chuck_size = max(self.min_scan_chunk_size, current_chuck_size)
current_chuck_size = min(self.max_scan_chunk_size, current_chuck_size)
return current_chuck_size
def _estimate_next_chunk_size(self) -> None:
self.chunk_size *= self.chunk_size_multiplier
self.chunk_size = max(self.min_scan_chunk_size, self.chunk_size)
self.chunk_size = min(self.max_scan_chunk_size, self.chunk_size)
49 changes: 49 additions & 0 deletions sw_utils/tests/test_event_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest
from eth_typing import BlockNumber
from web3.types import EventData

from sw_utils.event_scanner import EventProcessor, EventScanner


class MockedEventProcessor(EventProcessor):
@staticmethod
async def get_from_block() -> BlockNumber:
return BlockNumber(777)

@staticmethod
async def process_events(events: list[EventData], to_block: BlockNumber) -> None:
pass


async def fetch_events(a, b):
return []


async def fetch_events_broken(a, b):
raise ConnectionError


class TestExitSignatureCrud:
async def test_basic(self):
default_chunk_size = 500000
p = MockedEventProcessor()
scanner = EventScanner(processor=p)
scanner._contract_call = fetch_events
assert scanner.chunk_size == default_chunk_size

scanner._contract_call = fetch_events_broken
scanner.request_retry_seconds = 0
scanner.max_request_retries = 1
with pytest.raises(ConnectionError):
await scanner.process_new_events(888)
assert scanner.chunk_size == default_chunk_size

scanner.max_request_retries = 2
with pytest.raises(ConnectionError):
await scanner.process_new_events(888)
assert scanner.chunk_size == default_chunk_size // 2

scanner.max_request_retries = 3
with pytest.raises(ConnectionError):
await scanner.process_new_events(888)
assert scanner.chunk_size == default_chunk_size // 8

0 comments on commit a72fd95

Please sign in to comment.