Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drain_events and publish channel using single connection in different threads #32

Open
khomyakov42 opened this issue Aug 9, 2016 · 4 comments

Comments

@khomyakov42
Copy link

khomyakov42 commented Aug 9, 2016

When I try drain_events in first thread and publish message in obtained channel in second thread I never get the channel in second thread. Possible dead lock or i will should use different connection for publish and drain_events?

@veegee
Copy link
Owner

veegee commented Aug 9, 2016

That sounds like a deadlock. It's best to use a different connection to publish, but I don't think there's anything wrong with publishing in the same connection in principle. If it's deadlocking, it's a bug. I'll look into it, thanks for the report.

@khomyakov42
Copy link
Author

khomyakov42 commented Aug 10, 2016

Example

import amqpy
import time
import traceback
import os
import threading
from multiprocessing.pool import ThreadPool

NUMBER_MESSAGE = 10000
NUMBER_EXCHANGE = 10

EXCHANGE_NAMES = ['exchange-%s' % i for i in range(NUMBER_EXCHANGE)]



class Consumer(threading.Thread):
    def __init__(self, conn, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.messages = []
        self.conn = conn

    def on_message(self, msg):
        self.messages.append(msg.body)
        print('[%d]Receive %s from %s' % (len(self.messages), msg.body, msg.delivery_info['exchange']))
        msg.ack()

    def declare(self):
        for exchange_name in EXCHANGE_NAMES:
            queue_name = 'queue-to-%s' % exchange_name
            self.conn.channel().queue_declare(queue_name)
            self.conn.channel().queue_bind(queue_name, exchange_name)
            self.conn.channel().basic_consume(queue_name, callback=self.on_message)

    def run(self):
        while len(self.messages) != NUMBER_MESSAGE:
            try:
                self.conn.drain_events(timeout=2)
            except amqpy.Timeout:
                pass
            except Exception as e:
                traceback.print_exc()
                os._exit(666)


class Producer(threading.Thread):
    def __init__(self, conn, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.messages = []
        self.conn = conn

    def run(self):
        messages = []
        conn = self.conn
        try:
            with ThreadPool(processes=len(EXCHANGE_NAMES)) as pool:
                free_exchange_names = set(EXCHANGE_NAMES)
                thread_exchange_name = {}

                def sender(i):
                    indent = threading.get_ident()
                    if indent not in thread_exchange_name:
                        thread_exchange_name[indent] = free_exchange_names.pop()
                    exchange_name = thread_exchange_name[indent]

                    channel = conn.channel()
                    body = str(i)
                    message = amqpy.Message(body, delivery_mode=2)
                    channel.basic_publish(message, exchange_name)
                    messages.append(body)
                    print('[%d]Send %s to %s' % (len(messages), body, exchange_name))
                pool.map(sender, range(NUMBER_MESSAGE))
            self.messages = messages
        except Exception as e:
            traceback.print_exc()
            os._exit(666)


def declare_exchanges():
    c = amqpy.Connection()
    for exchange_name in EXCHANGE_NAMES:
        c.channel().exchange_declare(exchange_name, 'fanout')

CASE = 3


if CASE == 1:  # deadlock or timeout
    declare_exchanges()

    conn = amqpy.Connection()
    c = Consumer(conn=conn)
    c.declare()
    c.start()

    p = Producer(conn=conn)
    p.start()

    p.join()
    c.join()

    assert len(c.messages) == len(p.messages)
    assert set(c.messages) == set(p.messages)
elif CASE == 2:  # timeout
    declare_exchanges()

    conn = amqpy.Connection()

    p = Producer(conn=conn)
    p.start()

    c = Consumer(conn=conn)
    c.declare()
    c.start()

    p.join()
    c.join()
elif CASE == 3:  # success
    declare_exchanges()

    c = Consumer(conn=amqpy.Connection())
    c.declare()
    c.start()

    p = Producer(conn=amqpy.Connection())
    p.start()

    p.join()
    c.join()

    assert len(c.messages) == len(p.messages)
    assert set(c.messages) == set(p.messages)

@denis-ryzhkov
Copy link

conn.drain_events() also makes it impossible to create new consumers later - on demand.

How to reproduce:

import gevent.monkey
gevent.monkey.patch_all()

import amqpy, gevent, logging, time

# Init:

conn = amqpy.Connection()
ch = conn.channel()
ch.exchange_declare('test.exchange', 'direct')
ch.queue_declare('test.q')
ch.queue_bind('test.q', exchange='test.exchange', routing_key='test.q')
ch.close()

def consume():
    ch = conn.channel()
    ch.basic_consume(queue='test.q', callback=on_msg)
    while True:
        time.sleep(10)

def on_msg(msg):
    print(msg.body)

# Start logging "amqpy":

logger = logging.getLogger('amqpy')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

# Start "drain_events" loop in a background thread:

def drain_events():
    while True:
        conn.drain_events(timeout=None)

gevent.spawn(drain_events)

# Main runtime:

time.sleep(1)  # Emulate that we need a new consumer on some event in runtime.
gevent.spawn(consume)

# Block main thread:

while True:
    time.sleep(10)

There is no basic.consume in the log:

Write:  channel: 1 method_t(class_id=20, method_id=10) channel.open
Read:   channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open

But if we comment out gevent.spawn(drain_events),
then basic.consume is sent and we get basic.consume-ok as expected:

Write:  channel: 1 method_t(class_id=20, method_id=10) channel.open
Read:   channel: 1 method_t(class_id=20, method_id=11) channel.open-ok
Channel open
Write:  channel: 1 method_t(class_id=60, method_id=20) basic.consume
Read:   channel: 1 method_t(class_id=60, method_id=21) basic.consume-ok

So we need a separate connection for each consumer created on demand,
but this is what channels are for - complete isolation inside the same connection.

@veegee, please try to create one more drain_events() in a Channel.

It could also fix the original issue with publishing and consuming using the same connection.

@veegee
Copy link
Owner

veegee commented Sep 2, 2016

I was in the middle of reworking the underlying concurrency mechanism. Python is not a very friendly language for this kind of concurrency. Supporting gevent properly is tricky as well.

One option is I can try to remove all locking except the very lowest level frame locks and shift the burden of synchronization to the user, which means the user will be responsible for locking the connection when doing concurrent writes from multiple threads.

Another option is to do away with locks and go for a go-style "channel" mechanism. I think this will be a good option and I'll be working on this shortly.

I would really like to support the Python 3 async/await stuff, especially now that pypy3 has gotten funding from Mozilla.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants