Skip to content

Commit

Permalink
Maybe this will be more safe
Browse files Browse the repository at this point in the history
  • Loading branch information
PredaaA committed Dec 23, 2023
1 parent 64ff994 commit 5cd8d56
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions twitchio/ext/eventsub/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ class Websocket:
def __init__(self, client: Client, http: http.EventSubHTTP):
self.client = client
self._http = http
self._subscription_pool = _WakeupList[_Subscription]()
self._subscription_pool.add_append_callback(self._wakeup_and_connect)
self._subscription_pool = []
# self._subscription_pool.add_append_callback(self._wakeup_and_connect)
self._sock: Optional[aiohttp.ClientWebSocketResponse] = None
self._pump_task: Optional[asyncio.Task] = None
self._timeout: Optional[int] = None
self._session_id: Optional[str] = None
self._target_user_id: int | None = None # each websocket can only have one authenticated user on it for some bizzare reason, but this isnt documented anywhere
self.remaining_slots: int = 300 # default to 300
self._ready: asyncio.Event() = asyncio.Event()

def __hash__(self) -> int:
return hash(self.session_id)
Expand All @@ -113,6 +114,12 @@ def session_id(self) -> Optional[str]:
def is_connected(self) -> bool:
return self._sock is not None and not self._sock.closed

def is_ready(self) -> bool:
return self._ready.is_set()

def wait_until_ready(self) -> Awaitable[None]:
return self._ready.wait()

async def _subscribe(self, obj: _Subscription) -> dict | None:
try:
resp = await self._http.create_websocket_subscription(obj.event, obj.condition, self._session_id, obj.token)
Expand All @@ -132,13 +139,11 @@ async def _subscribe(self, obj: _Subscription) -> dict | None:

return data

def add_subscription(self, sub: _Subscription) -> None:
self._subscription_pool.append(sub)
async def wait_and_subscribe(self, obj: _Subscription):
await self.wait_until_ready()

async def _wakeup_and_connect(self, obj: _Subscription):
if self.is_connected:
await self._subscribe(obj)
return
self._subscription_pool.append(obj)
await self._subscribe(obj)

async def connect(self, reconnect_url: Optional[str] = None):
async with aiohttp.ClientSession() as session:
Expand All @@ -152,6 +157,7 @@ async def connect(self, reconnect_url: Optional[str] = None):

logger.debug("Created websocket connection with session ID: %s and timeout %s", self._session_id, self._timeout)

self._ready.set()
self._pump_task = self.client.loop.create_task(self.pump())

if reconnect_url: # don't resubscribe to events
Expand Down Expand Up @@ -189,6 +195,7 @@ async def pump(self) -> None:
elif isinstance(frame, models.ReconnectEvent):
self.client.run_event("eventsub_reconnect", frame)
self._sock = None
self._ready.clear()
await self.connect(frame.reconnect_url)
await sock.close(code=aiohttp.WSCloseCode.GOING_AWAY, message=b"reconnecting")
return
Expand All @@ -198,6 +205,7 @@ async def pump(self) -> None:
await cast(aiohttp.ClientWebSocketResponse, self._sock).close(
code=aiohttp.WSCloseCode.ABNORMAL_CLOSURE, message=b"timeout surpassed"
)
self._ready.clear()
await self.connect()
return

Expand Down Expand Up @@ -233,7 +241,7 @@ async def _assign_subscription(self, sub: _Subscription) -> None:
await self._create_and_connect_socket()
s = self._sockets[-1]

s.add_subscription(sub)
s.wait_and_subscribe(sub)

assert sub.created is not None

Expand Down

0 comments on commit 5cd8d56

Please sign in to comment.