Skip to content

Commit 6b942fb

Browse files
authored
Merge pull request #55 from y5c4l3/issue53-workaround
Start threads only after calling read and write on ThreadedGzip
2 parents 12b0703 + 1c3f210 commit 6b942fb

File tree

3 files changed

+53
-14
lines changed

3 files changed

+53
-14
lines changed

CHANGELOG.rst

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ Changelog
77
.. This document is user facing. Please word the changes in such a way
88
.. that users understand how the changes affect the new version.
99
10+
version 0.5.1
11+
-----------------
12+
+ Fix a bug where ``gzip_ng_threaded.open`` could
13+
cause a hang when the program exited and the program was not used with a
14+
context manager.
15+
1016
version 0.5.0
1117
-----------------
1218
+ Wheels are now build for MacOS arm64 architectures.

src/zlib_ng/gzip_ng_threaded.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
100100
self.block_size = block_size
101101
self.worker = threading.Thread(target=self._decompress)
102102
self._closed = False
103-
self.running = True
104-
self.worker.start()
103+
self.running = False
105104

106105
def _check_closed(self, msg=None):
107106
if self._closed:
@@ -125,8 +124,19 @@ def _decompress(self):
125124
except queue.Full:
126125
pass
127126

127+
def _start(self):
128+
if not self.running:
129+
self.running = True
130+
self.worker.start()
131+
132+
def _stop(self):
133+
if self.running:
134+
self.running = False
135+
self.worker.join()
136+
128137
def readinto(self, b):
129138
self._check_closed()
139+
self._start()
130140
result = self.buffer.readinto(b)
131141
if result == 0:
132142
while True:
@@ -154,8 +164,7 @@ def tell(self) -> int:
154164
def close(self) -> None:
155165
if self._closed:
156166
return
157-
self.running = False
158-
self.worker.join()
167+
self._stop()
159168
self.fileobj.close()
160169
if self.closefd:
161170
self.raw.close()
@@ -252,7 +261,6 @@ def __init__(self,
252261
self.raw, self.closefd = open_as_binary_stream(filename, mode)
253262
self._closed = False
254263
self._write_gzip_header()
255-
self.start()
256264

257265
def _check_closed(self, msg=None):
258266
if self._closed:
@@ -275,21 +283,24 @@ def _write_gzip_header(self):
275283
self.raw.write(struct.pack(
276284
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))
277285

278-
def start(self):
279-
self.running = True
280-
self.output_worker.start()
281-
for worker in self.compression_workers:
282-
worker.start()
286+
def _start(self):
287+
if not self.running:
288+
self.running = True
289+
self.output_worker.start()
290+
for worker in self.compression_workers:
291+
worker.start()
283292

284293
def stop(self):
285294
"""Stop, but do not care for remaining work"""
286-
self.running = False
287-
for worker in self.compression_workers:
288-
worker.join()
289-
self.output_worker.join()
295+
if self.running:
296+
self.running = False
297+
for worker in self.compression_workers:
298+
worker.join()
299+
self.output_worker.join()
290300

291301
def write(self, b) -> int:
292302
self._check_closed()
303+
self._start()
293304
with self.lock:
294305
if self.exception:
295306
raise self.exception

tests/test_gzip_ng_threaded.py

+22
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import io
1010
import itertools
1111
import os
12+
import subprocess
13+
import sys
1214
import tempfile
1315
from pathlib import Path
1416

@@ -103,6 +105,7 @@ def test_threaded_write_error(threads):
103105
threads=threads, block_size=8 * 1024)
104106
# Bypass the write method which should not allow blocks larger than
105107
# block_size.
108+
f._start()
106109
f.input_queues[0].put((os.urandom(1024 * 64), b""))
107110
with pytest.raises(OverflowError) as error:
108111
f.close()
@@ -209,3 +212,22 @@ def test_threaded_writer_does_not_close_stream():
209212
assert not test_stream.closed
210213
test_stream.seek(0)
211214
assert gzip.decompress(test_stream.read()) == b"thisisatest"
215+
216+
217+
@pytest.mark.timeout(5)
218+
@pytest.mark.parametrize(
219+
["mode", "threads"], itertools.product(["rb", "wb"], [1, 2]))
220+
def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
221+
program = tmp_path / "no_context_manager.py"
222+
test_file = tmp_path / "output.gz"
223+
# Write 40 mb input data to saturate read buffer. Because of the repetitive
224+
# nature the resulting gzip file is very small (~40 KiB).
225+
test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024)))
226+
with open(program, "wt") as f:
227+
f.write("from zlib_ng import gzip_ng_threaded\n")
228+
f.write(
229+
f"f = gzip_ng_threaded.open('{test_file}', "
230+
f"mode='{mode}', threads={threads})\n"
231+
)
232+
f.write("raise Exception('Error')\n")
233+
subprocess.run([sys.executable, str(program)])

0 commit comments

Comments
 (0)