Skip to content

Commit c942167

Browse files
authored
fix sm (#36)
* fix sm * fix advisories
1 parent 88bf6de commit c942167

File tree

5 files changed

+30
-58
lines changed

5 files changed

+30
-58
lines changed

Cargo.lock

Lines changed: 12 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/yellowstone-fumarole-client/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ The minor version will be incremented upon a breaking change and the patch versi
1616

1717
### Breaking
1818

19+
## [0.2.2]
20+
21+
- Fixed `FumaroleSM.need_new_blockchain_events` to follow rust's implementation [#35](https://github.com/rpcpool/yellowstone-fumarole/issues/35).
22+
1923
## [0.2.1]
2024

2125
- Fixed `OrderedSet.popfirst` + missing `await self.aclose()` in `run` method [PR](https://github.com/rpcpool/yellowstone-fumarole/pull/33) [@Yolley](https://github.com/Yolley)

python/yellowstone-fumarole-client/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "yellowstone-fumarole-client"
3-
version = "0.2.1"
3+
version = "0.2.2"
44
homepage = "https://github.com/rpcpool/yellowstone-fumarole"
55
repository = "https://github.com/rpcpool/yellowstone-fumarole"
66
description = "Yellowstone Fumarole Python Client"

python/yellowstone-fumarole-client/yellowstone_fumarole_client/runtime/aio.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def __init__(
133133
self.commit_interval = commit_interval
134134
self.gc_interval = gc_interval
135135
self.max_concurrent_download = max_concurrent_download
136+
self.poll_hist_inflight = False
137+
self.commit_offset_inflight = False
136138

137139
# holds metadata about the download task
138140
self.download_tasks = dict()
@@ -167,10 +169,12 @@ def _handle_control_response(self, control_response: ControlResponse):
167169

168170
match response_field:
169171
case "poll_hist":
172+
self.poll_hist_inflight = False
170173
poll_hist = control_response.poll_hist
171174
LOGGER.debug(f"Received poll history {len(poll_hist.events)} events")
172175
self.sm.queue_blockchain_event(poll_hist.events)
173176
case "commit_offset":
177+
self.commit_offset_inflight = False
174178
commit_offset = control_response.commit_offset
175179
LOGGER.debug(f"Received commit offset: {commit_offset}")
176180
self.sm.update_committed_offset(commit_offset.offset)
@@ -181,9 +185,12 @@ def _handle_control_response(self, control_response: ControlResponse):
181185

182186
async def poll_history_if_needed(self):
183187
"""Poll the history if the state machine needs new events."""
188+
if self.poll_hist_inflight:
189+
return
184190
if self.sm.need_new_blockchain_events():
185191
cmd = self._build_poll_history_cmd(self.sm.committable_offset)
186192
await self.control_plane_tx.put(cmd)
193+
self.poll_hist_inflight = True
187194

188195
def commitment_level(self):
189196
"""Gets the commitment level from the subscribe request."""
@@ -241,9 +248,12 @@ async def _force_commit_offset(self):
241248

242249
async def _commit_offset(self):
243250
self.last_commit = time.time()
251+
if self.commit_offset_inflight:
252+
return
244253
if self.sm.last_committed_offset < self.sm.committable_offset:
245254
LOGGER.debug(f"Committing offset {self.sm.committable_offset}")
246255
await self._force_commit_offset()
256+
self.commit_offset_inflight = True
247257

248258
async def _drain_slot_status(self):
249259
"""Drains the slot status from the state machine and sends updates to the Dragonsmouth outlet."""

python/yellowstone-fumarole-client/yellowstone_fumarole_client/runtime/state_machine.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,4 +322,6 @@ def processed_offset_queue_len(self) -> int:
322322

323323
def need_new_blockchain_events(self) -> bool:
324324
"""Check if new blockchain events are needed."""
325-
return not self.slot_status_update_queue and not self.blocked_slot_status_update
325+
MINIMUM_UNPROCESSED_BLOCKCHAIN_EVENT = 10
326+
return len(self.unprocessed_blockchain_event) < MINIMUM_UNPROCESSED_BLOCKCHAIN_EVENT \
327+
or (not self.slot_status_update_queue and not self.blocked_slot_status_update)

0 commit comments

Comments
 (0)