Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework multiplexer to use a deque PoC #304

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

bdraco
Copy link

@bdraco bdraco commented Oct 24, 2024

Since there is only one consumer of the queue, an asyncio.Queue is not needed and simple deque can be used

Since there is only one consumer of the queue, an
asyncio.Queue is not needed and simple deque can be used
@@ -133,13 +133,10 @@ async def _runner(self) -> None:
if not from_peer:
Copy link
Author

@bdraco bdraco Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop could be refactored in the future to be something like this as well to avoid creating tasks

        self._healthy.set()
        from_peer_data: bytes | None = None
        try:
            while not transport.is_closing():
                # Wait until data need to be processed
                from_peer_data = None          
                try:
                   async with asyncio.timeout(PEER_TCP_TIMEOUT), async_interrupt.interrupt(self._queue_ready_future, WriterReady, None):
                        from_peer_data = await self._reader.readexactly(32)
                except WriterReady:
                    # To peer
                    message = self._queue.popleft()
                    if not self._queue:
                        self._queue_ready_future = self._loop.create_future()
                    self._write_message(message)

                    # Flush buffer
                    await self._writer.drain()
                else:
                    # From peer
                    await self._read_message(from_peer_data)

                # throttling
                if not self._throttling:
                    continue
                await asyncio.sleep(self._throttling)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need to benchmark that to verify its faster but the profile looks like its ~30% better even with raising the exception to control flow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant