Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 18 additions & 37 deletions src/xopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import subprocess
import tempfile
import time
from abc import ABC, abstractmethod
from subprocess import Popen, PIPE, DEVNULL
from typing import (
Optional,
Expand Down Expand Up @@ -169,30 +168,7 @@ def _can_read_concatenated_gz(program: str) -> bool:
os.remove(temp_path)


class Closing(ABC):
"""
Inherit from this class and implement a close() method to offer context
manager functionality.
"""

def __enter__(self):
return self

def __exit__(self, *exc_info):
self.close()

def __del__(self):
try:
self.close()
except Exception:
pass

@abstractmethod
def close(self):
"""Called when exiting the context manager"""


class PipedCompressionWriter(Closing):
class PipedCompressionWriter(io.IOBase):
"""
Write Compressed files by running an external process and piping into it.
"""
Expand Down Expand Up @@ -226,9 +202,6 @@ def __init__(
)
)

# TODO use a context manager
self.outfile = open(path, mode[0] + "b")
self.closed: bool = False
self.name: str = str(os.fspath(path))
self._mode: str = mode
self._program_args: List[str] = program_args
Expand All @@ -237,6 +210,7 @@ def __init__(
if threads is None:
threads = min(_available_cpu_count(), 4)
self._threads = threads
self.outfile = open(path, mode[0] + "b")
try:
self.process = self._open_process(
mode, compresslevel, threads, self.outfile
Expand Down Expand Up @@ -295,7 +269,10 @@ def write(self, arg: AnyStr) -> int:
def close(self) -> None:
if self.closed:
return
self.closed = True
super().close()
if not hasattr(self, "process"):
# Exception was raised during __init__
return
self._file.close()
retcode = self.process.wait()
self.outfile.close()
Expand Down Expand Up @@ -323,7 +300,7 @@ def __next__(self):
raise io.UnsupportedOperation("not readable")


class PipedCompressionReader(Closing):
class PipedCompressionReader(io.IOBase):
"""
Open a pipe to a process for reading a compressed file.
"""
Expand All @@ -347,7 +324,7 @@ def __init__(
newline=None,
):
"""
Raise an OSError when pigz could not be found.
Raise an OSError when the binary could not be found.
"""
if mode not in ("r", "rt", "rb"):
raise ValueError(
Expand All @@ -370,20 +347,19 @@ def __init__(
threads = 1
program_args += [f"{threads_flag}{threads}"]
self._threads = threads
self.process = Popen(program_args, stdout=PIPE, stderr=PIPE)
self.name = path
self._mode = mode
self.process = Popen(program_args, stdout=PIPE, stderr=PIPE)

assert self.process.stdout is not None
_set_pipe_size_to_max(self.process.stdout.fileno())

self._mode = mode
if "b" not in mode:
self._file: IO = io.TextIOWrapper(
self.process.stdout, encoding=encoding, errors=errors, newline=newline
)
else:
self._file = self.process.stdout
self.closed = False
self._wait_for_output_or_process_exit()
self._raise_if_error()

Expand All @@ -399,7 +375,10 @@ def __repr__(self):
def close(self) -> None:
if self.closed:
return
self.closed = True
super().close()
if not hasattr(self, "process"):
# Exception was raised during __init__
return
retcode = self.process.poll()
check_allowed_code_and_message = False
if retcode is None:
Expand All @@ -413,7 +392,7 @@ def close(self) -> None:
def __iter__(self):
return self

def __next__(self) -> Union[str, bytes]:
def __next__(self) -> Union[str, bytes]: # type: ignore # incompatible with type in IOBase
Copy link
Collaborator

@rhpvorderman rhpvorderman Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I ran into during my refactoring in #138 . The externally opened classes are slightly different in that they both support t and b modes. It would be much more in line with the rest of python if they are binary only, and a textwrapper is used around the pipedcompressionreader/writer objects. (Rather than interning a TextIOWrapper in the object as is done now). This prevented me from simplifying the gzip opening further.

But that is something for later. I will make an issue about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TextIOWrapper derives from TextIOBase, which derives from IOBase, so the problem that the return type changes in a derived class is still there. typeshed "solved" the issue the same way:
https://github.com/python/typeshed/blob/770724013de34af6f75fa444cdbb76d187b41875/stdlib/io.pyi#L145
I read a comment somewhere on the typeshed issue tracker stating that Python’s IO classes are just weird, and I agree.

It’s true we won’t have to deal with this when we make the Piped... classes work with binary data only. I agree and like the idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know that the issue is not something that is necessarily wrong on xopen's end. Thank you for the explanation.

return self._file.__next__()

def _wait_for_output_or_process_exit(self):
Expand Down Expand Up @@ -471,6 +450,8 @@ def _raise_if_error(

assert self.process.stderr is not None
if not stderr_message:
if self.process.stderr.closed:
return
stderr_message = self.process.stderr.read()

self._file.close()
Expand All @@ -482,7 +463,7 @@ def read(self, *args) -> Union[str, bytes]:
def readinto(self, *args):
return self._file.readinto(*args)

def readline(self, *args) -> Union[str, bytes]:
def readline(self, *args) -> Union[str, bytes]: # type: ignore # incompatible w/type in IOBase
return self._file.readline(*args)

def seekable(self) -> bool:
Expand Down
16 changes: 13 additions & 3 deletions tests/test_piped.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ def test_reader_readline_text(reader):
assert f.readline() == CONTENT_LINES[0]


def test_reader_readlines(reader):
opener, extension = reader
with opener(TEST_DIR / f"file.txt{extension}", "r") as f:
assert f.readlines() == CONTENT_LINES


@pytest.mark.parametrize("threads", [None, 1, 2])
def test_piped_reader_iter(threads, threaded_reader):
opener, extension = threaded_reader
Expand All @@ -197,15 +203,17 @@ def test_writer(tmp_path, writer):

def test_writer_has_iter_method(tmp_path, writer):
opener, extension = writer
with opener(tmp_path / f"out.{extension}") as f:
with opener(tmp_path / f"out{extension}") as f:
f.write("hello")
assert hasattr(f, "__iter__")


def test_reader_iter_without_with(reader):
opener, extension = reader
it = iter(opener(TEST_DIR / f"file.txt{extension}"))
f = opener(TEST_DIR / f"file.txt{extension}")
it = iter(f)
assert CONTENT_LINES[0] == next(it)
f.close()


@pytest.mark.parametrize("mode", ["rb", "rt"])
Expand Down Expand Up @@ -289,14 +297,16 @@ def test_iter_method_writers(writer, tmp_path):
opener, extension = writer
writer = opener(tmp_path / f"test{extension}", "wb")
assert iter(writer) == writer
writer.close()


def test_next_method_writers(writer, tmp_path):
opener, extension = writer
writer = opener(tmp_path / f"test.{extension}", "wb")
writer = opener(tmp_path / f"test{extension}", "wb")
with pytest.raises(io.UnsupportedOperation) as error:
next(writer)
error.match("not readable")
writer.close()


def test_pipedcompressionreader_wrong_mode():
Expand Down