Skip to content

Commit

Permalink
Merge pull request #140 from pycompression/iobase
Browse files Browse the repository at this point in the history
Let PipedCompressionWriter/-Reader derive from IOBase
  • Loading branch information
rhpvorderman authored Jan 17, 2024
2 parents 9afe371 + 3fb78b2 commit ce44be6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 40 deletions.
55 changes: 18 additions & 37 deletions src/xopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import subprocess
import tempfile
import time
from abc import ABC, abstractmethod
from subprocess import Popen, PIPE, DEVNULL
from typing import (
Optional,
Expand Down Expand Up @@ -169,30 +168,7 @@ def _can_read_concatenated_gz(program: str) -> bool:
os.remove(temp_path)


class Closing(ABC):
"""
Inherit from this class and implement a close() method to offer context
manager functionality.
"""

def __enter__(self):
return self

def __exit__(self, *exc_info):
self.close()

def __del__(self):
try:
self.close()
except Exception:
pass

@abstractmethod
def close(self):
"""Called when exiting the context manager"""


class PipedCompressionWriter(Closing):
class PipedCompressionWriter(io.IOBase):
"""
Write Compressed files by running an external process and piping into it.
"""
Expand Down Expand Up @@ -226,9 +202,6 @@ def __init__(
)
)

# TODO use a context manager
self.outfile = open(path, mode[0] + "b")
self.closed: bool = False
self.name: str = str(os.fspath(path))
self._mode: str = mode
self._program_args: List[str] = program_args
Expand All @@ -237,6 +210,7 @@ def __init__(
if threads is None:
threads = min(_available_cpu_count(), 4)
self._threads = threads
self.outfile = open(path, mode[0] + "b")
try:
self.process = self._open_process(
mode, compresslevel, threads, self.outfile
Expand Down Expand Up @@ -295,7 +269,10 @@ def write(self, arg: AnyStr) -> int:
def close(self) -> None:
if self.closed:
return
self.closed = True
super().close()
if not hasattr(self, "process"):
# Exception was raised during __init__
return
self._file.close()
retcode = self.process.wait()
self.outfile.close()
Expand Down Expand Up @@ -323,7 +300,7 @@ def __next__(self):
raise io.UnsupportedOperation("not readable")


class PipedCompressionReader(Closing):
class PipedCompressionReader(io.IOBase):
"""
Open a pipe to a process for reading a compressed file.
"""
Expand All @@ -347,7 +324,7 @@ def __init__(
newline=None,
):
"""
Raise an OSError when pigz could not be found.
Raise an OSError when the binary could not be found.
"""
if mode not in ("r", "rt", "rb"):
raise ValueError(
Expand All @@ -370,20 +347,19 @@ def __init__(
threads = 1
program_args += [f"{threads_flag}{threads}"]
self._threads = threads
self.process = Popen(program_args, stdout=PIPE, stderr=PIPE)
self.name = path
self._mode = mode
self.process = Popen(program_args, stdout=PIPE, stderr=PIPE)

assert self.process.stdout is not None
_set_pipe_size_to_max(self.process.stdout.fileno())

self._mode = mode
if "b" not in mode:
self._file: IO = io.TextIOWrapper(
self.process.stdout, encoding=encoding, errors=errors, newline=newline
)
else:
self._file = self.process.stdout
self.closed = False
self._wait_for_output_or_process_exit()
self._raise_if_error()

Expand All @@ -399,7 +375,10 @@ def __repr__(self):
def close(self) -> None:
if self.closed:
return
self.closed = True
super().close()
if not hasattr(self, "process"):
# Exception was raised during __init__
return
retcode = self.process.poll()
check_allowed_code_and_message = False
if retcode is None:
Expand All @@ -413,7 +392,7 @@ def close(self) -> None:
def __iter__(self):
return self

def __next__(self) -> Union[str, bytes]:
def __next__(self) -> Union[str, bytes]: # type: ignore # incompatible with type in IOBase
return self._file.__next__()

def _wait_for_output_or_process_exit(self):
Expand Down Expand Up @@ -471,6 +450,8 @@ def _raise_if_error(

assert self.process.stderr is not None
if not stderr_message:
if self.process.stderr.closed:
return
stderr_message = self.process.stderr.read()

self._file.close()
Expand All @@ -482,7 +463,7 @@ def read(self, *args) -> Union[str, bytes]:
def readinto(self, *args):
return self._file.readinto(*args)

def readline(self, *args) -> Union[str, bytes]:
def readline(self, *args) -> Union[str, bytes]: # type: ignore # incompatible w/type in IOBase
return self._file.readline(*args)

def seekable(self) -> bool:
Expand Down
16 changes: 13 additions & 3 deletions tests/test_piped.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ def test_reader_readline_text(reader):
assert f.readline() == CONTENT_LINES[0]


def test_reader_readlines(reader):
opener, extension = reader
with opener(TEST_DIR / f"file.txt{extension}", "r") as f:
assert f.readlines() == CONTENT_LINES


@pytest.mark.parametrize("threads", [None, 1, 2])
def test_piped_reader_iter(threads, threaded_reader):
opener, extension = threaded_reader
Expand All @@ -197,15 +203,17 @@ def test_writer(tmp_path, writer):

def test_writer_has_iter_method(tmp_path, writer):
opener, extension = writer
with opener(tmp_path / f"out.{extension}") as f:
with opener(tmp_path / f"out{extension}") as f:
f.write("hello")
assert hasattr(f, "__iter__")


def test_reader_iter_without_with(reader):
opener, extension = reader
it = iter(opener(TEST_DIR / f"file.txt{extension}"))
f = opener(TEST_DIR / f"file.txt{extension}")
it = iter(f)
assert CONTENT_LINES[0] == next(it)
f.close()


@pytest.mark.parametrize("mode", ["rb", "rt"])
Expand Down Expand Up @@ -289,14 +297,16 @@ def test_iter_method_writers(writer, tmp_path):
opener, extension = writer
writer = opener(tmp_path / f"test{extension}", "wb")
assert iter(writer) == writer
writer.close()


def test_next_method_writers(writer, tmp_path):
opener, extension = writer
writer = opener(tmp_path / f"test.{extension}", "wb")
writer = opener(tmp_path / f"test{extension}", "wb")
with pytest.raises(io.UnsupportedOperation) as error:
next(writer)
error.match("not readable")
writer.close()


def test_pipedcompressionreader_wrong_mode():
Expand Down

0 comments on commit ce44be6

Please sign in to comment.