Skip to content

Commit f4eb378

Browse files
authored
Merge pull request #59 from pycompression/fixflush
Fix flushing behaviour in threaded mode.
2 parents b0728bb + 31125b2 commit f4eb378

File tree

3 files changed

+43
-24
lines changed

3 files changed

+43
-24
lines changed

CHANGELOG.rst

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Changelog
99
1010
version 0.5.1-dev
1111
-----------------
12+
+ Fix a bug where flushing in threaded mode did not write the data to the
13+
output file.
1214
+ Threaded reading and writing do no longer block exiting when an exception
1315
occurs in the main thread.
1416

src/zlib_ng/gzip_ng_threaded.py

+25-24
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF,
6060
gzip_file = io.BufferedReader(
6161
_ThreadedGzipReader(filename, block_size=block_size))
6262
else:
63-
gzip_file = io.BufferedWriter(
63+
gzip_file = FlushableBufferedWriter(
6464
_ThreadedGzipWriter(
6565
filename,
6666
mode.replace("t", "b"),
@@ -167,6 +167,12 @@ def closed(self) -> bool:
167167
return self._closed
168168

169169

170+
class FlushableBufferedWriter(io.BufferedWriter):
171+
def flush(self):
172+
super().flush()
173+
self.raw.flush()
174+
175+
170176
class _ThreadedGzipWriter(io.RawIOBase):
171177
"""
172178
Write a gzip file using multiple threads.
@@ -315,30 +321,35 @@ def write(self, b) -> int:
315321
self.input_queues[worker_index].put((data, zdict))
316322
return len(data)
317323

318-
def flush(self):
324+
def _end_gzip_stream(self):
319325
self._check_closed()
320326
# Wait for all data to be compressed
321327
for in_q in self.input_queues:
322328
in_q.join()
323329
# Wait for all data to be written
324330
for out_q in self.output_queues:
325331
out_q.join()
332+
# Write an empty deflate block with a lost block marker.
333+
self.raw.write(zlib_ng.compress(b"", wbits=-15))
334+
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
335+
self.raw.write(trailer)
336+
self._crc = 0
337+
self._size = 0
326338
self.raw.flush()
327339

340+
def flush(self):
341+
self._end_gzip_stream()
342+
self._write_gzip_header()
343+
328344
def close(self) -> None:
329345
if self._closed:
330346
return
331-
self.flush()
347+
self._end_gzip_stream()
332348
self.stop()
333349
if self.exception:
334350
self.raw.close()
335351
self._closed = True
336352
raise self.exception
337-
# Write an empty deflate block with a lost block marker.
338-
self.raw.write(zlib_ng.compress(b"", wbits=-15))
339-
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
340-
self.raw.write(trailer)
341-
self.raw.flush()
342353
if self.closefd:
343354
self.raw.close()
344355
self._closed = True
@@ -371,41 +382,31 @@ def _compress(self, index: int):
371382
def _write(self):
372383
index = 0
373384
output_queues = self.output_queues
374-
fp = self.raw
375-
total_crc = 0
376-
size = 0
377385
while self._calling_thread.is_alive():
378386
out_index = index % self.threads
379387
output_queue = output_queues[out_index]
380388
try:
381389
compressed, crc, data_length = output_queue.get(timeout=0.05)
382390
except queue.Empty:
383391
if not self.running:
384-
self._crc = total_crc
385-
self._size = size
386392
return
387393
continue
388-
total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length)
389-
size += data_length
390-
fp.write(compressed)
394+
self._crc = zlib_ng.crc32_combine(self._crc, crc, data_length)
395+
self._size += data_length
396+
self.raw.write(compressed)
391397
output_queue.task_done()
392398
index += 1
393399

394400
def _compress_and_write(self):
395401
if not self.threads == 1:
396402
raise SystemError("Compress_and_write is for one thread only")
397-
fp = self.raw
398-
total_crc = 0
399-
size = 0
400403
in_queue = self.input_queues[0]
401404
compressor = self.compressors[0]
402405
while self._calling_thread.is_alive():
403406
try:
404407
data, zdict = in_queue.get(timeout=0.05)
405408
except queue.Empty:
406409
if not self.running:
407-
self._crc = total_crc
408-
self._size = size
409410
return
410411
continue
411412
try:
@@ -415,9 +416,9 @@ def _compress_and_write(self):
415416
self._set_error_and_empty_queue(e, in_queue)
416417
return
417418
data_length = len(data)
418-
total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length)
419-
size += data_length
420-
fp.write(compressed)
419+
self._crc = zlib_ng.crc32_combine(self._crc, crc, data_length)
420+
self._size += data_length
421+
self.raw.write(compressed)
421422
in_queue.task_done()
422423

423424
def _set_error_and_empty_queue(self, error, q):

tests/test_gzip_ng_threaded.py

+16
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,19 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
230230
)
231231
f.write("raise Exception('Error')\n")
232232
subprocess.run([sys.executable, str(program)])
233+
234+
235+
@pytest.mark.parametrize("threads", [1, 2])
236+
def test_flush(tmp_path, threads):
237+
test_file = tmp_path / "output.gz"
238+
with gzip_ng_threaded.open(test_file, "wb", threads=threads) as f:
239+
f.write(b"1")
240+
f.flush()
241+
assert gzip.decompress(test_file.read_bytes()) == b"1"
242+
f.write(b"2")
243+
f.flush()
244+
assert gzip.decompress(test_file.read_bytes()) == b"12"
245+
f.write(b"3")
246+
f.flush()
247+
assert gzip.decompress(test_file.read_bytes()) == b"123"
248+
assert gzip.decompress(test_file.read_bytes()) == b"123"

0 commit comments

Comments
 (0)