Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroMQ with scheduler doesn't work since 0.11.8 #401

Open
vunhatchuong opened this issue Jan 16, 2025 · 2 comments
Open

ZeroMQ with scheduler doesn't work since 0.11.8 #401

vunhatchuong opened this issue Jan 16, 2025 · 2 comments

Comments

@vunhatchuong
Copy link

Version 0.11.7 is the last working release for this code.

from taskiq import TaskiqScheduler, ZeroMQBroker
from taskiq.schedule_sources import LabelScheduleSource

broker = ZeroMQBroker()
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)

@broker.task(schedule=[{"cron": "* * * * *"}])
async def hi() -> None:
    print("Hello")

Then runs:

taskiq worker module:broker -w 1

taskiq scheduler module:scheduler

Expected output:

[2025-01-16 16:21:17,802][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 91204
[2025-01-16 16:21:17,803][taskiq.worker][INFO   ][MainProcess] Starting 1 worker processes.
[2025-01-16 16:21:17,805][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 91205
[2025-01-16 16:21:17,809][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2025-01-16 16:22:00,041][taskiq.receiver.receiver][INFO   ][worker-0] Executing task module:hi with ID: a529cfa06fa246e0b3181d7bf468746f
Hello

Reality:

Worker doesn't execute task.

@s3rius
Copy link
Member

s3rius commented Jan 16, 2025

I guess it might be caused by this line. Try removing the with statement and only leaving this.

		await self.socket.send_multipart(parts)

If it works, you can make a PR, but I'm not sure if it's a correct solution.

@s3rius
Copy link
Member

s3rius commented Jan 16, 2025

Actually, I guess this will work better.

import math
from logging import getLogger
from typing import AsyncGenerator, Callable, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage

try:
    import zmq
    from zmq.asyncio import Context, Socket
except ImportError:
    zmq = None  # type: ignore

_T = TypeVar("_T")

logger = getLogger(__name__)


class ZeroMQBroker(AsyncBroker):
    """
    ZeroMQ broker.

    This broker starts a socket ON A CLIENT SIDE,
    and all workers connect to this socket using sub_host.

    If you're using this socket you have to be sure,
    that your workers start after the client is ready.
    """

    def __init__(
        self,
        zmq_pub_host: str = "tcp://localhost:5555",
        zmq_sub_host: str = "tcp://0.0.0.0:5555",
        result_backend: "Optional[AsyncResultBackend[_T]]" = None,
        task_id_generator: Optional[Callable[[], str]] = None,
    ) -> None:
        if zmq is None:
            raise RuntimeError(
                "To use ZMQ broker please install pyzmq lib or taskiq[zmq].",
            )
        super().__init__(result_backend, task_id_generator)
        self.context = Context()
        self.pub_host = zmq_pub_host
        self.sub_host = zmq_sub_host
        self.socket: Socket

    async def startup(self) -> None:
        """
        Startup for zmq broker.

        This function creates actual connections to
        sockets. if current process is worker,
        it subscribes, otherwise it becomes publisher.
        """
        if self.is_worker_process:
            self.socket = self.context.socket(zmq.PULL)
        else:
            self.socket = self.context.socket(zmq.PUSH)

        await super().startup()

    async def kick(self, message: BrokerMessage) -> None:
        """
        Kicking message.

        This method is used to publish message
        via socket.

        :param message: message to publish.
        """
        part_len = 100
        parts = [
            message.message[
                idx * part_len : min(idx * part_len + part_len, len(message.message))
            ]
            for idx in range(math.ceil(len(message.message) / part_len))
        ]
        with self.socket.connect(self.pub_host) as sock:
            await sock.send_multipart(parts)

    async def listen(self) -> AsyncGenerator[bytes, None]:
        """
        Start accepting new messages.

        :yields: incoming messages.
        """
        with self.socket.bind(self.sub_host) as sock:
            while True:
                data = await sock.recv_multipart()
                yield b"".join(data)

This ZMQ broker architecture will be reversed compared to the previous version. It will be binding socket on worker side and sending to this socket from all the publishers. Which makes sense, but now you can only have 1 worker at a time.

I could have made it possible to use multiple clients listening by passing SO_REUSEPORT socket option, but it seems like ZMQ doesn't support it.

zeromq/libzmq#1443

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants