From 08132b29a68de73694353fe1c503e1bbe8556bc5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 19 Jan 2024 17:05:28 +0100 Subject: [PATCH] Refactor init function for PipedCompressionProgram --- src/xopen/__init__.py | 74 +++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index ce56f08..12cc4ed 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -190,6 +190,7 @@ def __init__( used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to use all available cores. """ + self._program_args = program_args[:] if mode not in ("r", "rb", "w", "wb", "a", "ab"): raise ValueError( "Mode is '{}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'".format( @@ -200,10 +201,8 @@ def __init__( if isinstance(path, bytes) and sys.platform == "win32": path = path.decode() self.name: str = str(path) - self._path = path self._mode: str = mode self._stderr = tempfile.TemporaryFile("w+b") - self._program_args: List[str] = program_args self._threads_flag: Optional[str] = threads_flag if threads is None: @@ -214,23 +213,45 @@ def __init__( else: threads = min(_available_cpu_count(), 4) self._threads = threads - if "r" not in mode: - self.outfile: Optional[BinaryIO] = open(path, mode[0] + "b") - else: - self.outfile = None - try: - self.process = self._open_process(mode, compresslevel, threads) - except OSError: - if self.outfile: - self.outfile.close() - raise + + if threads != 0 and self._threads_flag is not None: + self._program_args += [f"{self._threads_flag}{threads}"] + + # Setting close_fds to True in the Popen arguments is necessary due to + # . + # However, close_fds is not supported on Windows. See + # . + kwargs = {} + if sys.platform != "win32": + kwargs["close_fds"] = True + if "r" in mode: + self._program_args += ["-c", "-d", path] # type: ignore + self.outfile = None + self.process = subprocess.Popen( + self._program_args, stderr=self._stderr, stdout=PIPE, **kwargs + ) # type: ignore self._file: BinaryIO = self.process.stdout # type: ignore self._wait_for_output_or_process_exit() self._raise_if_error() else: + if compresslevel is not None: + self._program_args += ["-" + str(compresslevel)] + self.outfile = open(path, mode[0] + "b") + try: + self.process = Popen( + self._program_args, + stderr=self._stderr, + stdin=PIPE, + stdout=self.outfile, + **kwargs, + ) # type: ignore + except OSError: + self.outfile.close() + raise assert self.process.stdin is not None self._file = self.process.stdin # type: ignore + _set_pipe_size_to_max(self._file.fileno()) def __repr__(self): @@ -242,35 +263,6 @@ def __repr__(self): self._threads, ) - def _open_process( - self, - mode: str, - compresslevel: Optional[int], - threads: int, - ) -> Popen: - program_args: List[str] = self._program_args[:] # prevent list aliasing - if threads != 0 and self._threads_flag is not None: - program_args += [f"{self._threads_flag}{threads}"] - - if ("w" or "a") in mode and compresslevel is not None: - program_args += ["-" + str(compresslevel)] - - if "r" in mode: - program_args += ["-c", "-d", self._path] # type: ignore - kwargs = dict(stdout=PIPE) - else: - kwargs = dict(stdin=PIPE, stdout=self.outfile) # type: ignore - - # Setting close_fds to True in the Popen arguments is necessary due to - # . - # However, close_fds is not supported on Windows. See - # . - if sys.platform != "win32": - kwargs["close_fds"] = True - - process = Popen(program_args, stderr=self._stderr, **kwargs) # type: ignore - return process - def write(self, arg: bytes) -> int: return self._file.write(arg)