Skip to content

Commit 177cf27

Browse files
[bug] Address proactive reconnect codepaths (#153)
1 parent 7bcc717 commit 177cf27

File tree

4 files changed

+322
-122
lines changed

4 files changed

+322
-122
lines changed

src/replit_river/rate_limiter.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import random
44
from contextvars import Context
5+
from typing import Protocol
56

67
from replit_river.error_schema import RiverException
78
from replit_river.transport_options import ConnectionRetryOptions
@@ -15,6 +16,13 @@ def __init__(self, code: str, message: str, client_id: str) -> None:
1516
self.client_id = client_id
1617

1718

19+
class RateLimiter(Protocol):
20+
def start_restoring_budget(self, user: str) -> None: ...
21+
def get_backoff_ms(self, user: str) -> float: ...
22+
def has_budget(self, user: str) -> bool: ...
23+
def consume_budget(self, user: str) -> None: ...
24+
25+
1826
class LeakyBucketRateLimit:
1927
"""Asynchronous leaky bucket rate limiter.
2028

src/replit_river/v2/client_transport.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ async def get_or_create_session(self) -> Session:
5454
call ensure_connected on whatever session is active.
5555
"""
5656
existing_session = self._session
57-
if not existing_session or existing_session.is_closed():
57+
if not existing_session or existing_session.is_terminal():
5858
logger.info("Creating new session")
59+
if existing_session:
60+
await existing_session.close()
5961
new_session = Session(
6062
client_id=self._client_id,
6163
server_id=self._server_id,
@@ -80,7 +82,7 @@ async def _retry_connection(self) -> Session:
8082
logger.debug("Triggering get_or_create_session")
8183
return await self.get_or_create_session()
8284

83-
async def _delete_session(self, session: Session) -> None:
85+
def _delete_session(self, session: Session) -> None:
8486
if self._session is session:
8587
self._session = None
8688
else:

0 commit comments

Comments
 (0)