Skip to content

Commit

Permalink
Refactor EventSubWSClient subscription handling
Browse files Browse the repository at this point in the history
  • Loading branch information
PredaaA committed Dec 23, 2023
1 parent 6ff42fe commit 64ff994
Showing 1 changed file with 34 additions and 40 deletions.
74 changes: 34 additions & 40 deletions twitchio/ext/eventsub/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,57 +223,51 @@ def __init__(self, client: Client):

async def _assign_subscription(self, sub: _Subscription) -> None:
if not self._sockets:
w = Websocket(self.client, self._http)
await w.connect()
await self._create_and_connect_socket()

self._sockets.append(w)
bad_sockets: set[Websocket] | None = None

success = False
bad_sockets: set[Websocket] | None = None # dont allocate unless we need it
while True:
s = self._get_socket(bad_sockets)
if s is None:
await self._create_and_connect_socket()
s = self._sockets[-1]

while not success:
s: Websocket | None = None # really it'll never be none after this point, but ok pyright
s.add_subscription(sub)

if bad_sockets is not None:
socks = filter(lambda sock: sock not in bad_sockets, self._sockets) # type: ignore
else:
socks = self._sockets

for s in socks:
if s.remaining_slots > 0:
s.add_subscription(sub)
break

else: # there are no sockets, create one and break
s = Websocket(self.client, self._http)
await s.connect()
self._sockets.append(s)

s.add_subscription(sub)
return

assert sub.created is not None # go away pyright
assert sub.created is not None

success, status = await sub.created

if not success and status == 400:
# can't be on that socket due to someone else being on it, try again on a different one
if bad_sockets is None:
bad_sockets = set()

bad_sockets.add(s)
sub.created = asyncio.Future()
continue
if success:
sub.created = None
break

elif not success and status in (401, 403):
if status == 400:
bad_sockets = self._handle_bad_socket(s, bad_sockets, sub)
elif status in (401, 403):
raise Unauthorized("You are not authorized to make this subscription", status=status)

elif not success:
else:
raise RuntimeError(f"Subscription failed, reason unknown. Status: {status}")

else:
sub.created = None # don't need that future to sit in memory
break
async def _create_and_connect_socket(self):
w = Websocket(self.client, self._http)
await w.connect()
self._sockets.append(w)

def _get_socket(self, bad_sockets: set[Websocket] | None) -> Optional[Websocket]:
socks = self._sockets if bad_sockets is None else filter(lambda sock: sock not in bad_sockets, self._sockets)
for s in socks:
if s.remaining_slots > 0:
return s
return None

def _handle_bad_socket(self, s: Websocket, bad_sockets: set[Websocket] | None, sub: _Subscription) -> set[Websocket]:
if bad_sockets is None:
bad_sockets = set()
bad_sockets.add(s)
sub.created = asyncio.Future()
return bad_sockets

async def subscribe_user_updated(self, user: Union[PartialUser, str, int], token: str):
if isinstance(user, PartialUser):
Expand Down

0 comments on commit 64ff994

Please sign in to comment.