Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File descriptor leak using multiprocessing.Queue #131794

Open
gduvalsc opened this issue Mar 27, 2025 · 11 comments
Open

File descriptor leak using multiprocessing.Queue #131794

gduvalsc opened this issue Mar 27, 2025 · 11 comments
Labels
stdlib Python modules in the Lib dir topic-multiprocessing type-bug An unexpected behavior, bug, or error

Comments

@gduvalsc
Copy link

gduvalsc commented Mar 27, 2025

Bug report

Bug description:

"Too many open files" can occur with multiprocessing.Queue, even if the protocol used to work with the queue is correct.

The file descriptor leak can be demonstrated with two small programs:

File "ko.py"

import os
import multiprocessing

pid = os.getpid()
cmd = f'lsof -p {pid} | grep -i pipe | wc -l'
cmdfull = f'lsof -p {pid} | grep -i pipe'
queues = {}

for i in range(100):
    queues[i] = multiprocessing.Queue()
    os.system(cmd)
    queues[i].close()
    queues[i].join_thread()
    os.system(cmd)

os.system(cmdfull)

File "ok.py"

import os
import multiprocessing

pid = os.getpid()
cmd = f'lsof -p {pid} | grep -i pipe | wc -l'
cmdfull = f'lsof -p {pid} | grep -i pipe'
queues = {}

for i in range(100):
    queues[i] = multiprocessing.Queue()
    queues[i].put('Hello')
    os.system(cmd)
    queues[i].close()
    queues[i].join_thread()
    os.system(cmd)

os.system(cmdfull)

An empty queue that remains empty is not abnormal (e.g., an error queue when no errors occur).

The workaround is to create a class extending multiprocessing.Queue, adding a close method that inserts an extra message into the queue before performing the actual close.

This issue can be observed on Linux and macOS. However, it is not specific to a particular operating system.

CPython versions tested on:

3.12

Operating systems tested on:

Linux

@gduvalsc gduvalsc added the type-bug An unexpected behavior, bug, or error label Mar 27, 2025
@tom-pytel
Copy link
Contributor

The workaround is to create a class extending multiprocessing.Queue, adding a close method that inserts an extra message into the queue before performing the actual close.

Or you can just del queues[i] after use.

But seriously, this appears to be expected / desired behavior. So much so that there is even a test for it:

def test_closed_queue_empty_exceptions(self):
    # Assert that checking the emptiness of an unused closed queue
    # does not raise an OSError. The rationale is that q.close() is
    # a no-op upon construction and becomes effective once the queue
    # has been used (e.g., by calling q.put()).
    for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
        q.close()  # this is a no-op since the feeder thread is None
        q.join_thread()  # this is also a no-op
        self.assertTrue(q.empty())

    for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
        q.put('foo')  # make sure that the queue is 'used'
        q.close()  # close the feeder thread
        q.join_thread()  # make sure to join the feeder thread
        with self.assertRaisesRegex(OSError, 'is closed'):
            q.empty()

@HarryVasanth
Copy link

Thanks for pointing out the test case, @tom-pytel. You're right that the current behavior is documented in the test suite, which shows that close() is indeed a no-op for unused queues.

However, I think the issue @gduvalsc is highlighting remains valid. There's a file descriptor leak occurring with empty queues that can lead to "Too many open files" errors in long-running applications. The test case actually confirms this behavior difference between used and unused queues.

While del queues[i] might help in some cases by triggering garbage collection, it doesn't guarantee immediate cleanup of file descriptors, especially in complex applications where references might be held elsewhere.

The workaround of adding a dummy message before closing seems like a practical solution, but perhaps this behavior should be reconsidered. Maybe close() should ensure file descriptors are properly cleaned up regardless of whether the queue was used or not?

This seems like a legitimate issue that could affect production applications, especially those that create many queues that might remain empty (like error queues in normal operation).

@tom-pytel
Copy link
Contributor

However, I think the issue @gduvalsc is highlighting remains valid. There's a file descriptor leak occurring with empty queues that can lead to "Too many open files" errors in long-running applications.

No leak, the pipe to communicate with the thread is simply not being closed (on purpose). Its a one-line change to close it for an unused queue but then you will break the tests and in theory any real-world applications which currently depend on this behavior, no? If you want to play with it the change is:

self._jointhread = None

to:

self._jointhread = lambda: (self._reader.close(), self._writer.close())

@gduvalsc
Copy link
Author

Thank you for your feedback on my issue! In my code, which is much more complex than the examples I have shown, I can't find a workaround that solves the problem.

In the function that creates and uses the queues, I store the references of the objects created by "multiprocessing.Queue" in a dictionary called "queues", and I have added a "del queues" at the end of the function, but the problem still persists...

In my function, there are subprocesses created using "multiprocessing.Process", and these subprocesses, of course, interact with the queues to read from and write to them.

The process operates on a tree with thousands of nodes and fails with the error "too many files open" when the number of descriptors reaches 65,536.

@gduvalsc
Copy link
Author

For your information, in my code I have implemented the following function:

def create_queue_workaround():
    q = multiprocessing.Queue()
    q._jointhread = lambda: (q._reader.close(), q._writer.close())
    return q

On the KO.py testcase, the behavior becomes correct.

But .... in my real program, I always have the leak ....

@tom-pytel
Copy link
Contributor

tom-pytel commented Mar 28, 2025

But .... in my real program, I always have the leak ....

Well, that should be closing unused queues, so the filed descriptor could be from some other case (assuming you don't have 32768 open Queues sitting around). Would multiprocessing.SimpleQueue work for you? And double check to see the pipes are actually closed after use by checking _reader.closed and _writer.closed. You can also try to manually close them always after use to double check if they are the cause.

You can also throw in a gc.collect() to make sure its not Queues waiting to be cleaned up that weren't closed for some reason.

@gduvalsc
Copy link
Author

Could there be a connection with the subprocesses working on the queues?

My program is quite particular: the main function creates the queues, closes them, and within this main function, other functions are called by subprocesses. The queues are visible to the subprocesses because the subprocess functions are inside the main function.

Example:

def main_function():
.....
          queue = multiprocessing.Queue()
.....
          def function_subprocess():
 .....
                    queue.put()
......
          queue.close()
          queue.join_Thread()

The descriptor queue is used within the subprocess running function_subprocess. When the subprocess is joined by the mainprocess, this subprocess disappears but what happen regarding the queue?

@gduvalsc
Copy link
Author

I'm afraid SimpleQueue are not working in my case.

In fact my code is the following:

a) I have a class "Parallel" encapsulating multiprocessing functions:

import multiprocessing

class Parallel:
    
    def __init__(self, action, workers=multiprocessing.cpu_count()):
        self.limit = int(workers)
        self.workers = dict()
        self.action = action

    def push(self, arg):
        if len(self.workers.keys()) < self.limit:
            p = multiprocessing.Process(target=self.action, args=(arg,))
            p.start()
            self.workers[p.sentinel] = p
        else:
            if len(self.workers.keys()) == self.limit:
                x = multiprocessing.connection.wait(self.workers.keys())
                for e in x:
                    self.workers[e].join()
                    del self.workers[e]
                p = multiprocessing.Process(target=self.action, args=(arg,))
                p.start()
                self.workers[p.sentinel] = p

    def join(self):
        while len(self.workers):
            x = multiprocessing.connection.wait(self.workers.keys())
            for e in x:
                self.workers[e].join()
                del self.workers[e]

b) One typical use is the following

pr = Parallel(read_from_queue, workers=len(collections))
for collection in collections: pr.push(collection)
pw = Parallel(do, workers = multiprocessing.cpu_count()
for e in thezip.namelist(): pw.push(e)
pw.join()
thezip.close()
for collection in collections: queues[collection].put('KAIROS_DONE')
pr.join()

So there are several readers / workers in parallel.

Queues are created before the sequence above and joined after

@tom-pytel
Copy link
Contributor

The descriptor queue is used within the subprocess running function_subprocess

The descriptors in the subprocess don't count for your main process and the queue in the closure should be decref'd when the function exits. The only other thing I can mention is that I've had problems before closing Queues which still had unread items from the child process.

@picnixz picnixz added the stdlib Python modules in the Lib dir label Mar 28, 2025
@gduvalsc
Copy link
Author

In my case, real closing occur only on the ERROR_QUEUE (usually empty) and never on other queues ....

In the same spirit as your workaround to force closing, I have implemented the workaorund like this.

def create_queue_workaround(qname):

    q = multiprocessing.Queue()
    def f():
        logger.warning(f'Closing queue: {qname}, reader: {q._reader.fileno()}, writer:{q._writer.fileno()}')
        return (q._reader.close(), q._writer.close())
    q._jointhread = f
    logger.warning(f'Opening queue: {qname}, reader: {q._reader.fileno()}, writer:{q._writer.fileno()}')
    return q

In the application log, I have only "closing messages" related to the error queue.

For the other queues (normal queues where subprocess are reading or writing), on the Opening queue message is visible in the application log.

@gduvalsc
Copy link
Author

The behavior I observed could be a "user error" if the join_thread function was never called on "normal" queues.

But the sequence in the application is:

            for collection in collections: 
                queues[collection].close()
                queues[collection].join_thread()
            queues['ERROR_QUEUE'].close()
            queues['ERROR_QUEUE'].join_thread()

I feel like subprocesses reading or writing queues play a role in this behavior

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stdlib Python modules in the Lib dir topic-multiprocessing type-bug An unexpected behavior, bug, or error
Projects
None yet
Development

No branches or pull requests

5 participants