Skip to content

Fixdeadlock #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.

version 0.5.1
version 0.5.1-dev
-----------------
+ Fix a bug where ``gzip_ng_threaded.open`` could
cause a hang when the program exited and the program was not used with a
context manager.
+ Threaded reading and writing do no longer block exiting when an exception
occurs in the main thread.

version 0.5.0
-----------------
Expand Down
51 changes: 21 additions & 30 deletions src/zlib_ng/gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
self.block_size = block_size
self.worker = threading.Thread(target=self._decompress)
self._closed = False
self.running = False
self.running = True
self._calling_thread = threading.current_thread()
self.worker.start()

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -109,34 +111,23 @@ def _check_closed(self, msg=None):
def _decompress(self):
block_size = self.block_size
block_queue = self.queue
while self.running:
while self.running and self._calling_thread.is_alive():
try:
data = self.fileobj.read(block_size)
except Exception as e:
self.exception = e
return
if not data:
return
while self.running:
while self.running and self._calling_thread.is_alive():
try:
block_queue.put(data, timeout=0.05)
break
except queue.Full:
pass

def _start(self):
if not self.running:
self.running = True
self.worker.start()

def _stop(self):
if self.running:
self.running = False
self.worker.join()

def readinto(self, b):
self._check_closed()
self._start()
result = self.buffer.readinto(b)
if result == 0:
while True:
Expand Down Expand Up @@ -164,7 +155,8 @@ def tell(self) -> int:
def close(self) -> None:
if self._closed:
return
self._stop()
self.running = False
self.worker.join()
self.fileobj.close()
if self.closefd:
self.raw.close()
Expand Down Expand Up @@ -224,6 +216,7 @@ def __init__(self,
if "b" not in mode:
mode += "b"
self.lock = threading.Lock()
self._calling_thread = threading.current_thread()
self.exception: Optional[Exception] = None
self.level = level
self.previous_block = b""
Expand Down Expand Up @@ -261,6 +254,7 @@ def __init__(self,
self.raw, self.closefd = open_as_binary_stream(filename, mode)
self._closed = False
self._write_gzip_header()
self.start()

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -283,24 +277,21 @@ def _write_gzip_header(self):
self.raw.write(struct.pack(
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))

def _start(self):
if not self.running:
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()
def start(self):
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()

def stop(self):
"""Stop, but do not care for remaining work"""
if self.running:
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()

def write(self, b) -> int:
self._check_closed()
self._start()
with self.lock:
if self.exception:
raise self.exception
Expand Down Expand Up @@ -360,7 +351,7 @@ def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: zlib_ng._ParallelCompress = self.compressors[index]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand All @@ -383,7 +374,7 @@ def _write(self):
fp = self.raw
total_crc = 0
size = 0
while True:
while self._calling_thread.is_alive():
out_index = index % self.threads
output_queue = output_queues[out_index]
try:
Expand All @@ -408,7 +399,7 @@ def _compress_and_write(self):
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand Down
1 change: 0 additions & 1 deletion tests/test_gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def test_threaded_write_error(threads):
threads=threads, block_size=8 * 1024)
# Bypass the write method which should not allow blocks larger than
# block_size.
f._start()
f.input_queues[0].put((os.urandom(1024 * 64), b""))
with pytest.raises(OverflowError) as error:
f.close()
Expand Down
Loading