-
Notifications
You must be signed in to change notification settings - Fork 21
Add FIFO queue #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add FIFO queue #61
Conversation
5dd5b3f to
43788c7
Compare
Bump pyworker version
9f3b49e to
d8bb1fc
Compare
|
|
||
| def advance_queue_after_completion(event: asyncio.Event): | ||
| """Pop current head and wake next waiter, if any.""" | ||
| if self.queue and self.queue[0] is event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to check if [0] is an event? Small little nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not checking if it is an event, but verifying that the event we pass in is in fact the current head of the queue.
lib/backend.py
Outdated
| if disconnect_task in first_done and not event.is_set(): | ||
| was_head = (self.queue and self.queue[0] is event) | ||
| try: | ||
| self.queue.remove(event) | ||
| except ValueError: | ||
| pass | ||
| if was_head and self.queue: | ||
| self.queue[0].set() | ||
|
|
||
| for t in first_pending: | ||
| t.cancel() | ||
| await asyncio.gather(*first_pending, return_exceptions=True) | ||
| return web.Response(status=499) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isnt this duplicating advance_queue_after_completion?
lib/backend.py
Outdated
| except asyncio.CancelledError: | ||
| # Cleanup if request was cancelled | ||
| was_head = (self.queue and self.queue[0] is event) | ||
| try: | ||
| self.queue.remove(event) | ||
| except ValueError: | ||
| pass | ||
| if was_head and self.queue: | ||
| self.queue[0].set() | ||
|
|
||
| return web.Response(status=499) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here?
|
From gpt 5.1 =) The main one is how cancel_api_call_if_disconnected is used. That function does await request.wait_for_disconnection(), logs, marks the request as canceled in metrics, and then raises asyncio.CancelledError. You always run it in a background task, disconnect_task = create_task(cancel_api_call_if_disconnected()), and then you race that task against other tasks using asyncio.wait. However, whenever disconnect_task “wins” the race, you never actually await it or gather it. In both the first race and the second race you only ever call gather on the “pending” tasks, not on the “done” set. If cancel_api_call_if_disconnected completes by raising CancelledError, that exception will just sit on the finished task, and in CPython that usually produces “Task exception was never retrieved” warnings. Your outer except asyncio.CancelledError around the handler does not help, because that only catches cancellation of the handler coroutine itself, not exceptions inside a separate Task. |
When parallel_requests = False, we implement a FIFO queue in the PyWorker backend to ensure requests are handled sequentially. This replaces the random semaphore first-in-random-out pseudo-queue we had before.
This has been tested with the default serverless templates (comfy, vLLM, TGI) on prod.