diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 62d47a0..d27d564 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -38,7 +38,6 @@ Optional, Union, TextIO, - AnyStr, IO, List, Set, @@ -193,9 +192,7 @@ def __init__( """ if mode not in ("w", "wb", "a", "ab"): raise ValueError( - "Mode is '{}', but it must be 'w', 'wb', 'a', or 'ab'".format( - mode - ) + "Mode is '{}', but it must be 'w', 'wb', 'a', or 'ab'".format(mode) ) self.name: str = str(os.fspath(path)) @@ -470,11 +467,10 @@ class PipedGzipReader(PipedCompressionReader): """ def __init__( - self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None + self, + path, ): - super().__init__( - path, ["gzip"] - ) + super().__init__(path, ["gzip"]) class PipedGzipWriter(PipedCompressionWriter): @@ -618,17 +614,8 @@ class PipedXzReader(PipedCompressionReader): master branch.) """ - def __init__( - self, - path, - threads: Optional[int] = None - ): - super().__init__( - path, - ["xz"], - "-T", - threads - ) + def __init__(self, path, threads: Optional[int] = None): + super().__init__(path, ["xz"], "-T", threads) class PipedXzWriter(PipedCompressionWriter): @@ -658,14 +645,7 @@ def __init__( and compresslevel not in self._accepted_compression_levels ): raise ValueError("compresslevel must be between 0 and 9") - super().__init__( - path, - ["xz"], - mode, - compresslevel, - "-T", - threads - ) + super().__init__(path, ["xz"], mode, compresslevel, "-T", threads) class PipedIGzipReader(PipedCompressionReader): @@ -676,9 +656,7 @@ class PipedIGzipReader(PipedCompressionReader): architecture-specific optimizations as a result. """ - def __init__( - self, path - ): + def __init__(self, path): if not _can_read_concatenated_gz("igzip"): # Instead of elaborate version string checking once the problem is # fixed, it is much easier to use this, "proof in the pudding" type @@ -688,9 +666,7 @@ def __init__( "concatenated gzip files and is therefore not " "safe to use. See: https://github.com/intel/isa-l/issues/143" ) - super().__init__( - path, ["igzip"] - ) + super().__init__(path, ["igzip"]) class PipedZstdReader(PipedCompressionReader): @@ -778,21 +754,14 @@ def __init__( class PipedPythonIsalReader(PipedCompressionReader): def __init__( - self, path, + self, + path, ): - super().__init__( - path, - [sys.executable, "-m", "isal.igzip"] - ) + super().__init__(path, [sys.executable, "-m", "isal.igzip"]) class PipedPythonIsalWriter(PipedCompressionWriter): - def __init__( - self, - path, - mode: str = "wb", - compresslevel: Optional[int] = None - ): + def __init__(self, path, mode: str = "wb", compresslevel: Optional[int] = None): if compresslevel is not None and compresslevel not in range(0, 4): raise ValueError("compresslevel must be between 0 and 3") super().__init__( @@ -803,24 +772,24 @@ def __init__( ) -def _open_stdin_or_out(mode: str, **text_mode_kwargs) -> IO: - # Do not return sys.stdin or sys.stdout directly as we want the returned object - # to be closable without closing sys.stdout. - std = dict(r=sys.stdin, w=sys.stdout)[mode[0]] - return open(std.fileno(), mode=mode, closefd=False, **text_mode_kwargs) +def _open_stdin_or_out(mode: str): + assert "b" in mode + std = sys.stdout if "w" in mode else sys.stdin + return open(std.fileno(), mode=mode, closefd=False) -def _open_bz2(filename, mode: str, threads: Optional[int], **text_mode_kwargs): +def _open_bz2(filename, mode: str, threads: Optional[int]): + assert "b" in mode if threads != 0: try: if "r" in mode: - return PipedPBzip2Reader(filename, mode, threads, **text_mode_kwargs) + return PipedPBzip2Reader(filename, threads) else: - return PipedPBzip2Writer(filename, mode, threads, **text_mode_kwargs) + return PipedPBzip2Writer(filename, mode, threads) except OSError: pass # We try without threads. - return bz2.open(filename, mode, **text_mode_kwargs) + return bz2.open(filename, mode) def _open_xz( @@ -828,7 +797,6 @@ def _open_xz( mode: str, compresslevel: Optional[int], threads: Optional[int], - **text_mode_kwargs, ): if compresslevel is None: compresslevel = 6 @@ -836,11 +804,9 @@ def _open_xz( if threads != 0: try: if "r" in mode: - return PipedXzReader(filename, mode, threads, **text_mode_kwargs) + return PipedXzReader(filename, threads) else: - return PipedXzWriter( - filename, mode, compresslevel, threads, **text_mode_kwargs - ) + return PipedXzWriter(filename, mode, compresslevel, threads) except OSError: pass # We try without threads. @@ -848,7 +814,6 @@ def _open_xz( filename, mode, preset=compresslevel if "w" in mode else None, - **text_mode_kwargs, ) @@ -857,19 +822,17 @@ def _open_zst( # noqa: C901 mode: str, compresslevel: Optional[int], threads: Optional[int], - **text_mode_kwargs, ): + assert "b" in mode assert compresslevel != 0 if compresslevel is None: compresslevel = 3 if threads != 0: try: if "r" in mode: - return PipedZstdReader(filename, mode, **text_mode_kwargs) + return PipedZstdReader(filename) else: - return PipedZstdWriter( - filename, mode, compresslevel, threads, **text_mode_kwargs - ) + return PipedZstdWriter(filename, mode, compresslevel, threads) except OSError: if zstandard is None: # No fallback available @@ -881,12 +844,7 @@ def _open_zst( # noqa: C901 cctx = zstandard.ZstdCompressor(level=compresslevel) else: cctx = None - f = zstandard.open( - filename, - mode, - cctx=cctx, - **text_mode_kwargs, - ) + f = zstandard.open(filename, mode, cctx=cctx) if mode == "rb": return io.BufferedReader(f) elif mode == "wb": @@ -897,7 +855,7 @@ def _open_zst( # noqa: C901 def _open_gz( # noqa: C901 filename, mode: str, compresslevel, threads, **text_mode_kwargs ): - assert mode in ("rt", "rb", "wt", "wb", "at", "ab") + assert "b" in mode if compresslevel is None: # Force the same compression level on every tool regardless of # library defaults @@ -911,7 +869,6 @@ def _open_gz( # noqa: C901 filename, mode, compresslevel, - **text_mode_kwargs, threads=1, ) if gzip_ng_threaded and zlib_ng: @@ -922,7 +879,6 @@ def _open_gz( # noqa: C901 # zlib-ng level 1 is 50% bigger than zlib level 1. Level # 2 gives a size close to expectations. compresslevel=2 if compresslevel == 1 else compresslevel, - **text_mode_kwargs, threads=threads or max(_available_cpu_count(), 4), ) except zlib_ng.error: # Bad compression level @@ -930,35 +886,24 @@ def _open_gz( # noqa: C901 try: if "r" in mode: try: - return PipedPigzReader( - filename, mode, threads=threads, **text_mode_kwargs - ) + return PipedPigzReader(filename, threads=threads) except OSError: - return PipedGzipReader(filename, mode, **text_mode_kwargs) + return PipedGzipReader(filename) else: try: return PipedPigzWriter( - filename, - mode, - compresslevel, - threads=threads, - **text_mode_kwargs, + filename, mode, compresslevel, threads=threads ) except OSError: - return PipedGzipWriter( - filename, mode, compresslevel, **text_mode_kwargs - ) + return PipedGzipWriter(filename, mode, compresslevel) except OSError: pass # We try without threads. - g = _open_reproducible_gzip( + return _open_reproducible_gzip( filename, - mode=mode[0] + "b", + mode=mode, compresslevel=compresslevel, ) - if "t" in mode: - return io.TextIOWrapper(g, **text_mode_kwargs) - return g def _open_reproducible_gzip(filename, mode: str, compresslevel: int): @@ -1126,18 +1071,13 @@ def xopen( # noqa: C901 # The function is complex, but readable. """ if mode in ("r", "w", "a"): mode += "t" # type: ignore + binary_mode = mode + "b" + else: + binary_mode = mode if mode not in ("rt", "rb", "wt", "wb", "at", "ab"): raise ValueError("Mode '{}' not supported".format(mode)) filename = os.fspath(filename) - if "b" in mode: - # Do not pass encoding etc. in binary mode as this raises errors. - text_mode_kwargs = dict() - else: - text_mode_kwargs = dict(encoding=encoding, errors=errors, newline=newline) - if filename == "-": - return _open_stdin_or_out(mode, **text_mode_kwargs) - if format not in (None, "gz", "xz", "bz2", "zst"): raise ValueError( f"Format not supported: {format}. " @@ -1147,22 +1087,18 @@ def xopen( # noqa: C901 # The function is complex, but readable. if detected_format is None and "w" not in mode: detected_format = _detect_format_from_content(filename) - if detected_format == "gz": - opened_file = _open_gz( - filename, mode, compresslevel, threads, **text_mode_kwargs - ) + if filename == "-": + opened_file = _open_stdin_or_out(binary_mode) + elif detected_format == "gz": + opened_file = _open_gz(filename, binary_mode, compresslevel, threads) elif detected_format == "xz": - opened_file = _open_xz( - filename, mode, compresslevel, threads, **text_mode_kwargs - ) + opened_file = _open_xz(filename, binary_mode, compresslevel, threads) elif detected_format == "bz2": - opened_file = _open_bz2(filename, mode, threads, **text_mode_kwargs) + opened_file = _open_bz2(filename, binary_mode, threads) elif detected_format == "zst": - opened_file = _open_zst( - filename, mode, compresslevel, threads, **text_mode_kwargs - ) + opened_file = _open_zst(filename, binary_mode, compresslevel, threads) else: - opened_file = open(filename, mode, **text_mode_kwargs) # type: ignore + opened_file = open(filename, binary_mode) # type: ignore # The "write" method for GzipFile is very costly. Lots of python calls are # made. To a lesser extent this is true for LzmaFile and BZ2File. By @@ -1176,4 +1112,6 @@ def xopen( # noqa: C901 # The function is complex, but readable. opened_file = io.BufferedWriter( opened_file, buffer_size=BUFFER_SIZE # type: ignore ) + if "t" in mode: + return io.TextIOWrapper(opened_file, encoding, errors, newline) return opened_file