Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate janus to make the main program less blocking and cleaner #3

Open
AlanCoding opened this issue Oct 21, 2024 · 1 comment
Labels
refactoring code build-out and clean-up

Comments

@AlanCoding
Copy link
Member

This library offers a Queue type which can go async or non-async.

https://github.com/aio-libs/janus

import asyncio
import janus


def threaded(sync_q: janus.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q: janus.AsyncQueue[int]) -> None:
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main() -> None:
    queue: janus.Queue[int] = janus.Queue()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()

This could replace specifically the following lines here:

self.message_queue = multiprocessing.Queue()
self.process = multiprocessing.Process(target=work_loop, args=(self.worker_id, self.message_queue, finished_queue))

worker.message_queue.put(message)

message = await loop.run_in_executor(None, self.finished_queue.get)

I would like some research to determine if this works as a drop-in replacement now. If so, this might be a better long-term pattern.

Tip from @webknjaz

@AlanCoding
Copy link
Member Author

AlanCoding commented Oct 21, 2024

https://github.com/ansible/dispatcher/compare/main...AlanCoding:janus?expand=1

This works in that it will start up, run schedules, and process messages. But it won't shut down anymore.

INFO:dispatcher.producers.brokered:Received message from channel 'test_channel': lambda: 13, sending to worker
WARNING:dispatcher.pool:Ran out of available workers, queueing up next task, current queued 19
DEBUG:dispatcher.brokers.pg_notify:Received notification: test_channel - lambda: 14
INFO:dispatcher.producers.brokered:Received message from channel 'test_channel': lambda: 14, sending to worker
WARNING:dispatcher.pool:Ran out of available workers, queueing up next task, current queued 20
INFO:dispatcher.producers.scheduled:Sending scheduled task to worker: lambda: __import__("time").sleep(1)
WARNING:dispatcher.pool:Ran out of available workers, queueing up next task, current queued 21
INFO:dispatcher.producers.scheduled:Sending scheduled task to worker: lambda: __import__("time").sleep(2)
WARNING:dispatcher.pool:Ran out of available workers, queueing up next task, current queued 22
^CINFO:root:Received exit signal SIGTERM...
DEBUG:root:Shutting down, starting with producers.
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.producers.brokered:Successfully canceled production from pg_notify
INFO:dispatcher.producers.scheduled:Stopping scheduled tasks
DEBUG:dispatcher.main:Gracefully shutting down worker pool
^CINFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
^CINFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
^CINFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
^CINFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
^CINFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal
INFO:dispatcher.worker.task:Received worker process exit signal

It appears that something isn't cleaned up quite right, and it hangs on the worker side somehow.

@AlanCoding AlanCoding added the refactoring code build-out and clean-up label Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactoring code build-out and clean-up
Projects
None yet
Development

No branches or pull requests

1 participant