Skip to content

Commit c9f7924

Browse files
authored
Merge pull request #239 from opentensor/release/1.5.12
Release/1.5.12
2 parents e3e055b + b6472d2 commit c9f7924

File tree

6 files changed

+97
-19
lines changed

6 files changed

+97
-19
lines changed

.github/workflows/unit-and-integration-test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ jobs:
4848
PYTHONUNBUFFERED: "1"
4949
run: |
5050
source venv/bin/activate
51-
python -m uv run pytest -n 2 tests/unit_tests/ --reruns 3
51+
python -m uv run pytest tests/unit_tests/ --reruns 3 -s
5252
5353
- name: Integration tests
5454
timeout-minutes: 20
5555
env:
5656
PYTHONUNBUFFERED: "1"
5757
run: |
5858
source venv/bin/activate
59-
python -m uv run pytest -n 2 tests/integration_tests/ --reruns 3
59+
python -m uv run pytest tests/integration_tests/ --reruns 3 -s --forked

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
# Changelog
2+
## 1.5.12 /2025-11-167
3+
* RecursionError in `_wait_with_activity_timeout` with concurrent tasks by @Arthurdw in https://github.com/opentensor/async-substrate-interface/pull/238
4+
* Improved Test Running + Race Condition Catch by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/236
5+
6+
7+
**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.11...v1.5.12
8+
29
## 1.5.11 /2025-11-14
310
* Race Condition Bug fixes by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/234
411

async_substrate_interface/async_substrate.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -694,9 +694,17 @@ async def _cancel(self):
694694

695695
async def connect(self, force=False):
696696
if not force:
697-
await self._lock.acquire()
697+
async with self._lock:
698+
return await self._connect_internal(force)
698699
else:
699700
logger.debug("Proceeding without acquiring lock.")
701+
return await self._connect_internal(force)
702+
703+
async def _connect_internal(self, force):
704+
# Check state again after acquiring lock to avoid duplicate connections
705+
if not force and self.state in (State.OPEN, State.CONNECTING):
706+
return None
707+
700708
logger.debug(f"Websocket connecting to {self.ws_url}")
701709
if self._sending is None or self._sending.empty():
702710
self._sending = asyncio.Queue()
@@ -725,17 +733,13 @@ async def connect(self, force=False):
725733
except socket.gaierror:
726734
logger.debug(f"Hostname not known (this is just for testing")
727735
await asyncio.sleep(10)
728-
if self._lock.locked():
729-
self._lock.release()
730736
return await self.connect(force=force)
731737
logger.debug("Connection established")
732738
self.ws = connection
733739
if self._send_recv_task is None or self._send_recv_task.done():
734740
self._send_recv_task = asyncio.get_running_loop().create_task(
735741
self._handler(self.ws)
736742
)
737-
if self._lock.locked():
738-
self._lock.release()
739743
return None
740744

741745
async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
@@ -838,9 +842,15 @@ async def _exit_with_timer(self):
838842
"""
839843
try:
840844
if self.shutdown_timer is not None:
845+
logger.debug("Exiting with timer")
841846
await asyncio.sleep(self.shutdown_timer)
842-
logger.debug("Exiting with timer")
843-
await self.shutdown()
847+
if (
848+
self.state != State.CONNECTING
849+
and self._sending.qsize() == 0
850+
and not self._received_subscriptions
851+
and self._waiting_for_response <= 0
852+
):
853+
await self.shutdown()
844854
except asyncio.CancelledError:
845855
pass
846856

@@ -981,6 +991,7 @@ async def unsubscribe(
981991
original_id = get_next_id()
982992
while original_id in self._in_use_ids:
983993
original_id = get_next_id()
994+
logger.debug(f"Unwatched extrinsic subscription {subscription_id}")
984995
self._received_subscriptions.pop(subscription_id, None)
985996

986997
to_send = {
@@ -2512,6 +2523,7 @@ async def _make_rpc_request(
25122523
subscription_added = False
25132524

25142525
async with self.ws as ws:
2526+
await ws.mark_waiting_for_response()
25152527
for payload in payloads:
25162528
item_id = await ws.send(payload["payload"])
25172529
request_manager.add_request(item_id, payload["id"])
@@ -2523,7 +2535,6 @@ async def _make_rpc_request(
25232535
logger.debug(
25242536
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}"
25252537
)
2526-
await ws.mark_waiting_for_response()
25272538

25282539
while True:
25292540
for item_id in request_manager.unresponded():

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "async-substrate-interface"
3-
version = "1.5.11"
3+
version = "1.5.12"
44
description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface"
55
readme = "README.md"
66
license = { file = "LICENSE" }
@@ -56,5 +56,6 @@ dev = [
5656
"pytest-split==0.10.0",
5757
"pytest-xdist==3.6.1",
5858
"pytest-rerunfailures==10.2",
59-
"bittensor-wallet>=4.0.0"
59+
"bittensor-wallet>=4.0.0",
60+
"pytest-forked"
6061
]

tests/helpers/proxy_server.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,21 @@
99

1010

1111
class ProxyServer:
12-
def __init__(self, upstream: str, time_til_pause: float, time_til_resume: float):
12+
def __init__(
13+
self,
14+
upstream: str,
15+
time_til_pause: float,
16+
time_til_resume: float,
17+
port: int = 8080,
18+
):
1319
self.upstream_server = upstream
1420
self.time_til_pause = time_til_pause
1521
self.time_til_resume = time_til_resume
1622
self.upstream_connection = None
1723
self.connection_time = 0
1824
self.shutdown_time = 0
1925
self.resume_time = 0
26+
self.port = port
2027

2128
def connect(self):
2229
self.upstream_connection = connect(self.upstream_server)
@@ -41,7 +48,7 @@ def proxy_request(self, websocket: ServerConnection):
4148
websocket.send(recd)
4249

4350
def serve(self):
44-
with serve(self.proxy_request, "localhost", 8080) as self.server:
51+
with serve(self.proxy_request, "localhost", self.port) as self.server:
4552
self.server.serve_forever()
4653

4754
def connect_and_serve(self):

tests/integration_tests/test_async_substrate_interface.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os.path
44
import time
55
import threading
6+
import socket
67

78
import bittensor_wallet
89
import pytest
@@ -195,8 +196,16 @@ async def test_query_map_with_odd_number_of_params():
195196
print("test_query_map_with_odd_number_of_params succeeded")
196197

197198

199+
@pytest.mark.skip("Weird issue with the GitHub Actions runner")
198200
@pytest.mark.asyncio
199201
async def test_improved_reconnection():
202+
def get_free_port():
203+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
204+
s.bind(("", 0)) # Bind to port 0 = OS picks free port
205+
s.listen(1)
206+
port_ = s.getsockname()[1]
207+
return port_
208+
200209
print("Testing test_improved_reconnection")
201210
ws_logger_path = "/tmp/websockets-proxy-test"
202211
ws_logger = logging.getLogger("websockets.proxy")
@@ -210,14 +219,15 @@ async def test_improved_reconnection():
210219
os.remove(asi_logger_path)
211220
logger.setLevel(logging.DEBUG)
212221
logger.addHandler(logging.FileHandler(asi_logger_path))
222+
port = get_free_port()
223+
print(f"Testing using server on port {port}")
224+
proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20, port=port)
213225

214-
proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20)
215-
216-
server_thread = threading.Thread(target=proxy.connect_and_serve)
226+
server_thread = threading.Thread(target=proxy.connect_and_serve, daemon=True)
217227
server_thread.start()
218228
await asyncio.sleep(3) # give the server start up time
219229
async with AsyncSubstrateInterface(
220-
"ws://localhost:8080",
230+
f"ws://localhost:{port}",
221231
ss58_format=42,
222232
chain_name="Bittensor",
223233
retry_timeout=10.0,
@@ -247,7 +257,7 @@ async def test_improved_reconnection():
247257
assert "Pausing" in f.read()
248258
with open(asi_logger_path, "r") as f:
249259
assert "Timeout/ConnectionClosed occurred." in f.read()
250-
shutdown_thread = threading.Thread(target=proxy.close)
260+
shutdown_thread = threading.Thread(target=proxy.close, daemon=True)
251261
shutdown_thread.start()
252262
shutdown_thread.join(timeout=5)
253263
server_thread.join(timeout=5)
@@ -293,3 +303,45 @@ async def test_get_payment_info():
293303
assert partial_fee_all_options > partial_fee_no_era
294304
assert partial_fee_all_options > partial_fee_era
295305
print("test_get_payment_info succeeded")
306+
307+
308+
@pytest.mark.asyncio
309+
async def test_concurrent_rpc_requests():
310+
"""
311+
Test that multiple concurrent RPC requests on a shared connection work correctly.
312+
313+
This test verifies the fix for the issue where multiple concurrent tasks
314+
re-initializing the WebSocket connection caused requests to hang.
315+
"""
316+
print("Testing test_concurrent_rpc_requests")
317+
318+
async def concurrent_task(substrate_, task_id):
319+
"""Make multiple RPC calls from a single task."""
320+
for i in range(5):
321+
result = await substrate_.get_block_number(None)
322+
assert isinstance(result, int)
323+
assert result > 0
324+
325+
async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate:
326+
# Run 5 concurrent tasks, each making 5 RPC calls (25 total)
327+
# This tests that the connection is properly shared without re-initialization
328+
tasks = [concurrent_task(substrate, i) for i in range(5)]
329+
await asyncio.gather(*tasks)
330+
331+
print("test_concurrent_rpc_requests succeeded")
332+
333+
334+
@pytest.mark.asyncio
335+
async def test_wait_for_block():
336+
async def handler(_):
337+
return True
338+
339+
substrate = AsyncSubstrateInterface(
340+
LATENT_LITE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor"
341+
)
342+
await substrate.initialize()
343+
current_block = await substrate.get_block_number(None)
344+
result = await substrate.wait_for_block(
345+
current_block + 3, result_handler=handler, task_return=False
346+
)
347+
assert result is True

0 commit comments

Comments
 (0)