Skip to content

Commit

Permalink
Merge pull request #215 from pycompression/release_1.7.1
Browse files Browse the repository at this point in the history
Release 1.7.1
  • Loading branch information
rhpvorderman authored Sep 25, 2024
2 parents 2f2b749 + 63f3bcd commit 440949b
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 32 deletions.
1 change: 0 additions & 1 deletion .github/release_checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ Release checklist
from CHANGELOG.rst.
- [ ] Push tag to remote. This triggers the wheel/sdist build on github CI.
- [ ] merge `main` branch back into `develop`.
- [ ] Add updated version number to develop. (`setup.py` and `src/isal/__init__.py`)
- [ ] Build the new tag on readthedocs. Only build the last patch version of
each minor version. So `1.1.1` and `1.2.0` but not `1.1.0`, `1.1.1` and `1.2.0`.
- [ ] Create a new release on github.
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 1.7.1
-----------------
+ Fix a bug where flushing files when writing in threaded mode did not work
properly.
+ Prevent threaded opening from blocking python exit when an error is thrown
in the calling thread.

version 1.7.0
-----------------
+ Include a patched ISA-L version 2.31. The applied patches make compilation
Expand Down
3 changes: 1 addition & 2 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
sphinx
sphinx-rtd-theme
# See https://github.com/sphinx-doc/sphinx-argparse/issues/56
sphinx-argparse <0.5.0
sphinx-argparse
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def build_isa_l():
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Programming Language :: C",
Expand Down
61 changes: 32 additions & 29 deletions src/isal/igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
gzip_file = io.BufferedReader(
_ThreadedGzipReader(filename, block_size=block_size))
else:
gzip_file = io.BufferedWriter(
gzip_file = FlushableBufferedWriter(
_ThreadedGzipWriter(
filename,
mode.replace("t", "b"),
Expand Down Expand Up @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
self.worker = threading.Thread(target=self._decompress)
self._closed = False
self.running = True
self._calling_thread = threading.current_thread()
self.worker.start()

def _check_closed(self, msg=None):
Expand All @@ -110,15 +111,15 @@ def _check_closed(self, msg=None):
def _decompress(self):
block_size = self.block_size
block_queue = self.queue
while self.running:
while self.running and self._calling_thread.is_alive():
try:
data = self.fileobj.read(block_size)
except Exception as e:
self.exception = e
return
if not data:
return
while self.running:
while self.running and self._calling_thread.is_alive():
try:
block_queue.put(data, timeout=0.05)
break
Expand Down Expand Up @@ -166,6 +167,12 @@ def closed(self) -> bool:
return self._closed


class FlushableBufferedWriter(io.BufferedWriter):
def flush(self):
super().flush()
self.raw.flush()


class _ThreadedGzipWriter(io.RawIOBase):
"""
Write a gzip file using multiple threads.
Expand Down Expand Up @@ -215,6 +222,7 @@ def __init__(self,
if "b" not in mode:
mode += "b"
self.lock = threading.Lock()
self._calling_thread = threading.current_thread()
self.exception: Optional[Exception] = None
self.level = level
self.previous_block = b""
Expand Down Expand Up @@ -308,30 +316,35 @@ def write(self, b) -> int:
self.input_queues[worker_index].put((data, zdict))
return len(data)

def flush(self):
def _end_gzip_stream(self):
self._check_closed()
# Wait for all data to be compressed
for in_q in self.input_queues:
in_q.join()
# Wait for all data to be written
for out_q in self.output_queues:
out_q.join()
# Write an empty deflate block with a lost block marker.
self.raw.write(isal_zlib.compress(b"", wbits=-15))
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
self.raw.write(trailer)
self._crc = 0
self._size = 0
self.raw.flush()

def flush(self):
self._end_gzip_stream()
self._write_gzip_header()

def close(self) -> None:
if self._closed:
return
self.flush()
self._end_gzip_stream()
self.stop()
if self.exception:
self.raw.close()
self._closed = True
raise self.exception
# Write an empty deflate block with a lost block marker.
self.raw.write(isal_zlib.compress(b"", wbits=-15))
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
self.raw.write(trailer)
self.raw.flush()
if self.closefd:
self.raw.close()
self._closed = True
Expand All @@ -348,7 +361,7 @@ def _compress(self, index: int):
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
if not (self.running and self._calling_thread.is_alive()):
return
continue
try:
Expand All @@ -364,41 +377,31 @@ def _compress(self, index: int):
def _write(self):
index = 0
output_queues = self.output_queues
fp = self.raw
total_crc = 0
size = 0
while True:
out_index = index % self.threads
output_queue = output_queues[out_index]
try:
compressed, crc, data_length = output_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
self._crc = total_crc
self._size = size
if not (self.running and self._calling_thread.is_alive()):
return
continue
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
size += data_length
fp.write(compressed)
self._crc = isal_zlib.crc32_combine(self._crc, crc, data_length)
self._size += data_length
self.raw.write(compressed)
output_queue.task_done()
index += 1

def _compress_and_write(self):
if not self.threads == 1:
raise SystemError("Compress_and_write is for one thread only")
fp = self.raw
total_crc = 0
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
self._crc = total_crc
self._size = size
if not (self.running and self._calling_thread.is_alive()):
return
continue
try:
Expand All @@ -408,9 +411,9 @@ def _compress_and_write(self):
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
size += data_length
fp.write(compressed)
self._crc = isal_zlib.crc32_combine(self._crc, crc, data_length)
self._size += data_length
self.raw.write(compressed)
in_queue.task_done()

def _set_error_and_empty_queue(self, error, q):
Expand Down
37 changes: 37 additions & 0 deletions tests/test_igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io
import itertools
import os
import subprocess
import sys
import tempfile
from pathlib import Path

Expand Down Expand Up @@ -218,3 +220,38 @@ def test_threaded_writer_does_not_close_stream():
assert not test_stream.closed
test_stream.seek(0)
assert gzip.decompress(test_stream.read()) == b"thisisatest"


@pytest.mark.timeout(5)
@pytest.mark.parametrize(
["mode", "threads"], itertools.product(["rb", "wb"], [1, 2]))
def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
program = tmp_path / "no_context_manager.py"
test_file = tmp_path / "output.gz"
# Write 40 mb input data to saturate read buffer. Because of the repetitive
# nature the resulting gzip file is very small (~40 KiB).
test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024)))
with open(program, "wt") as f:
f.write("from isal import igzip_threaded\n")
f.write(
f"f = igzip_threaded.open('{test_file}', "
f"mode='{mode}', threads={threads})\n"
)
f.write("raise Exception('Error')\n")
subprocess.run([sys.executable, str(program)])


@pytest.mark.parametrize("threads", [1, 2])
def test_flush(tmp_path, threads):
test_file = tmp_path / "output.gz"
with igzip_threaded.open(test_file, "wb", threads=threads) as f:
f.write(b"1")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"1"
f.write(b"2")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"12"
f.write(b"3")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"123"
assert gzip.decompress(test_file.read_bytes()) == b"123"

0 comments on commit 440949b

Please sign in to comment.