From 21ec529987d10e0010badd37f8da3274167d436f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 18 May 2022 07:43:53 +0800 Subject: [PATCH 01/10] Run everything through 'black' That way people who use it won't be deterred, while it unifies style everywhere. --- doc/source/conf.py | 92 ++--- git/__init__.py | 54 +-- git/cmd.py | 592 +++++++++++++++++++------------ git/compat.py | 37 +- git/config.py | 274 +++++++++----- git/db.py | 15 +- git/diff.py | 377 +++++++++++++------- git/exc.py | 78 ++-- git/ext/gitdb | 2 +- git/index/base.py | 479 ++++++++++++++++--------- git/index/fun.py | 185 ++++++---- git/index/typ.py | 67 ++-- git/index/util.py | 35 +- git/objects/__init__.py | 10 +- git/objects/base.py | 61 ++-- git/objects/blob.py | 7 +- git/objects/commit.py | 339 +++++++++++------- git/objects/fun.py | 75 ++-- git/objects/submodule/base.py | 536 +++++++++++++++++++--------- git/objects/submodule/root.py | 204 ++++++++--- git/objects/submodule/util.py | 40 ++- git/objects/tag.py | 50 ++- git/objects/tree.py | 158 ++++++--- git/objects/util.py | 362 ++++++++++++------- git/refs/head.py | 87 +++-- git/refs/log.py | 117 +++--- git/refs/reference.py | 58 +-- git/refs/remote.py | 21 +- git/refs/symbolic.py | 266 +++++++++----- git/refs/tag.py | 42 ++- git/remote.py | 538 ++++++++++++++++++---------- git/repo/base.py | 582 +++++++++++++++++++----------- git/repo/fun.py | 131 ++++--- git/types.py | 61 +++- git/util.py | 453 ++++++++++++++--------- setup.py | 34 +- test/lib/__init__.py | 7 +- test/lib/helper.py | 146 +++++--- test/performance/lib.py | 43 +-- test/performance/test_commit.py | 52 ++- test/performance/test_odb.py | 39 +- test/performance/test_streams.py | 87 +++-- test/test_actor.py | 1 - test/test_base.py | 55 ++- test/test_blob.py | 9 +- test/test_clone.py | 17 +- test/test_commit.py | 252 ++++++++----- test/test_config.py | 289 ++++++++------- test/test_db.py | 3 +- test/test_diff.py | 236 +++++++----- test/test_docs.py | 433 ++++++++++++++-------- test/test_exc.py | 83 +++-- test/test_fun.py | 103 +++--- test/test_git.py | 173 +++++---- test/test_index.py | 311 +++++++++------- test/test_installation.py | 61 +++- test/test_reflog.py | 37 +- test/test_refs.py | 137 ++++--- test/test_remote.py | 218 +++++++----- test/test_repo.py | 477 +++++++++++++++---------- test/test_stats.py | 24 +- test/test_submodule.py | 567 ++++++++++++++++++----------- test/test_tree.py | 38 +- test/test_util.py | 172 +++++---- test/tstrunner.py | 3 +- 65 files changed, 6674 insertions(+), 3918 deletions(-) diff --git a/doc/source/conf.py b/doc/source/conf.py index 286058fdc..d2803a826 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -20,38 +20,40 @@ # If your extensions are in another directory, add it here. If the directory # is relative to the documentation root, use os.path.abspath to make it # absolute, like shown here. -#sys.path.append(os.path.abspath('.')) -sys.path.insert(0, os.path.abspath('../..')) +# sys.path.append(os.path.abspath('.')) +sys.path.insert(0, os.path.abspath("../..")) # General configuration # --------------------- # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest'] +extensions = ["sphinx.ext.autodoc", "sphinx.ext.doctest"] # Add any paths that contain templates here, relative to this directory. templates_path = [] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. -#source_encoding = 'utf-8' +# source_encoding = 'utf-8' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = 'GitPython' -copyright = 'Copyright (C) 2008, 2009 Michael Trier and contributors, 2010-2015 Sebastian Thiel' +project = "GitPython" +copyright = ( + "Copyright (C) 2008, 2009 Michael Trier and contributors, 2010-2015 Sebastian Thiel" +) # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -with open(os.path.join(os.path.dirname(__file__), "..", "..", 'VERSION')) as fd: +with open(os.path.join(os.path.dirname(__file__), "..", "..", "VERSION")) as fd: VERSION = fd.readline().strip() version = VERSION # The full version, including alpha/beta/rc tags. @@ -59,61 +61,60 @@ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. -#language = None +# language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: -#today = '' +# today = '' # Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' +# today_fmt = '%B %d, %Y' # List of documents that shouldn't be included in the build. -#unused_docs = [] +# unused_docs = [] # List of directories, relative to source directory, that shouldn't be searched # for source files. -exclude_trees = ['build'] +exclude_trees = ["build"] # The reST default role (used for this markup: `text`) to use for all documents. -#default_role = None +# default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +# add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). -#add_module_names = True +# add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. -#show_authors = False +# show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # Options for HTML output # ----------------------- -html_theme = 'sphinx_rtd_theme' -html_theme_options = { -} +html_theme = "sphinx_rtd_theme" +html_theme_options = {} # The name for this set of Sphinx documents. If None, it defaults to # " v documentation". -#html_title = None +# html_title = None # A shorter title for the navigation bar. Default is the same as html_title. -#html_short_title = None +# html_short_title = None # The name of an image file (relative to this directory) to place at the top # of the sidebar. -#html_logo = None +# html_logo = None # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. -#html_favicon = None +# html_favicon = None # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, @@ -122,72 +123,71 @@ # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' +# html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. -#html_use_smartypants = True +# html_use_smartypants = True # Custom sidebar templates, maps document names to template names. -#html_sidebars = {} +# html_sidebars = {} # Additional templates that should be rendered to pages, maps page names to # template names. -#html_additional_pages = {} +# html_additional_pages = {} # If false, no module index is generated. -#html_use_modindex = True +# html_use_modindex = True # If false, no index is generated. -#html_use_index = True +# html_use_index = True # If true, the index is split into individual pages for each letter. -#html_split_index = False +# html_split_index = False # If true, the reST sources are included in the HTML build as _sources/. -#html_copy_source = True +# html_copy_source = True # If true, an OpenSearch description file will be output, and all pages will # contain a tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. -#html_use_opensearch = '' +# html_use_opensearch = '' # If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = '' +# html_file_suffix = '' # Output file base name for HTML help builder. -htmlhelp_basename = 'gitpythondoc' +htmlhelp_basename = "gitpythondoc" # Options for LaTeX output # ------------------------ # The paper size ('letter' or 'a4'). -#latex_paper_size = 'letter' +# latex_paper_size = 'letter' # The font size ('10pt', '11pt' or '12pt'). -#latex_font_size = '10pt' +# latex_font_size = '10pt' # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, document class [howto/manual]). latex_documents = [ - ('index', 'GitPython.tex', r'GitPython Documentation', - r'Michael Trier', 'manual'), + ("index", "GitPython.tex", r"GitPython Documentation", r"Michael Trier", "manual"), ] # The name of an image file (relative to this directory) to place at the top of # the title page. -#latex_logo = None +# latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. -#latex_use_parts = False +# latex_use_parts = False # Additional stuff for the LaTeX preamble. -#latex_preamble = '' +# latex_preamble = '' # Documents to append as an appendix to all manuals. -#latex_appendices = [] +# latex_appendices = [] # If false, no module index is generated. -#latex_use_modindex = True +# latex_use_modindex = True diff --git a/git/__init__.py b/git/__init__.py index ae9254a26..3f26886f7 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -4,8 +4,8 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php # flake8: noqa -#@PydevCodeAnalysisIgnore -from git.exc import * # @NoMove @IgnorePep8 +# @PydevCodeAnalysisIgnore +from git.exc import * # @NoMove @IgnorePep8 import inspect import os import sys @@ -14,14 +14,14 @@ from typing import Optional from git.types import PathLike -__version__ = 'git' +__version__ = "git" -#{ Initialization +# { Initialization def _init_externals() -> None: """Initialize external projects by putting them into the path""" - if __version__ == 'git' and 'PYOXIDIZER' not in os.environ: - sys.path.insert(1, osp.join(osp.dirname(__file__), 'ext', 'gitdb')) + if __version__ == "git" and "PYOXIDIZER" not in os.environ: + sys.path.insert(1, osp.join(osp.dirname(__file__), "ext", "gitdb")) try: import gitdb @@ -29,26 +29,27 @@ def _init_externals() -> None: raise ImportError("'gitdb' could not be found in your PYTHONPATH") from e # END verify import -#} END initialization + +# } END initialization ################# _init_externals() ################# -#{ Imports +# { Imports try: from git.config import GitConfigParser # @NoMove @IgnorePep8 - from git.objects import * # @NoMove @IgnorePep8 - from git.refs import * # @NoMove @IgnorePep8 - from git.diff import * # @NoMove @IgnorePep8 - from git.db import * # @NoMove @IgnorePep8 - from git.cmd import Git # @NoMove @IgnorePep8 - from git.repo import Repo # @NoMove @IgnorePep8 - from git.remote import * # @NoMove @IgnorePep8 - from git.index import * # @NoMove @IgnorePep8 - from git.util import ( # @NoMove @IgnorePep8 + from git.objects import * # @NoMove @IgnorePep8 + from git.refs import * # @NoMove @IgnorePep8 + from git.diff import * # @NoMove @IgnorePep8 + from git.db import * # @NoMove @IgnorePep8 + from git.cmd import Git # @NoMove @IgnorePep8 + from git.repo import Repo # @NoMove @IgnorePep8 + from git.remote import * # @NoMove @IgnorePep8 + from git.index import * # @NoMove @IgnorePep8 + from git.util import ( # @NoMove @IgnorePep8 LockFile, BlockingLockFile, Stats, @@ -56,15 +57,18 @@ def _init_externals() -> None: rmtree, ) except GitError as exc: - raise ImportError('%s: %s' % (exc.__class__.__name__, exc)) from exc + raise ImportError("%s: %s" % (exc.__class__.__name__, exc)) from exc -#} END imports +# } END imports -__all__ = [name for name, obj in locals().items() - if not (name.startswith('_') or inspect.ismodule(obj))] +__all__ = [ + name + for name, obj in locals().items() + if not (name.startswith("_") or inspect.ismodule(obj)) +] -#{ Initialize git executable path +# { Initialize git executable path GIT_OK = None @@ -79,12 +83,14 @@ def refresh(path: Optional[PathLike] = None) -> None: return GIT_OK = True -#} END initialize git executable path + + +# } END initialize git executable path ################# try: refresh() except Exception as exc: - raise ImportError('Failed to initialize: {0}'.format(exc)) from exc + raise ImportError("Failed to initialize: {0}".format(exc)) from exc ################# diff --git a/git/cmd.py b/git/cmd.py index 1ddf9e03f..12409b0c8 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -9,12 +9,7 @@ import logging import os import signal -from subprocess import ( - call, - Popen, - PIPE, - DEVNULL -) +from subprocess import call, Popen, PIPE, DEVNULL import subprocess import threading from textwrap import dedent @@ -29,10 +24,7 @@ from git.exc import CommandError from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present -from .exc import ( - GitCommandError, - GitCommandNotFound -) +from .exc import GitCommandError, GitCommandNotFound from .util import ( LazyMixin, stream_copy, @@ -40,8 +32,24 @@ # typing --------------------------------------------------------------------------- -from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping, - Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload) +from typing import ( + Any, + AnyStr, + BinaryIO, + Callable, + Dict, + IO, + Iterator, + List, + Mapping, + Sequence, + TYPE_CHECKING, + TextIO, + Tuple, + Union, + cast, + overload, +) from git.types import PathLike, Literal, TBD @@ -52,15 +60,26 @@ # --------------------------------------------------------------------------------- -execute_kwargs = {'istream', 'with_extended_output', - 'with_exceptions', 'as_process', 'stdout_as_string', - 'output_stream', 'with_stdout', 'kill_after_timeout', - 'universal_newlines', 'shell', 'env', 'max_chunk_size', 'strip_newline_in_stdout'} +execute_kwargs = { + "istream", + "with_extended_output", + "with_exceptions", + "as_process", + "stdout_as_string", + "output_stream", + "with_stdout", + "kill_after_timeout", + "universal_newlines", + "shell", + "env", + "max_chunk_size", + "strip_newline_in_stdout", +} log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) -__all__ = ('Git',) +__all__ = ("Git",) # ============================================================================== @@ -69,18 +88,24 @@ # Documentation ## @{ -def handle_process_output(process: 'Git.AutoInterrupt' | Popen, - stdout_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None], - Callable[[bytes, 'Repo', 'DiffIndex'], None]], - stderr_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None]], - finalizer: Union[None, - Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None, - decode_streams: bool = True, - kill_after_timeout: Union[None, float] = None) -> None: + +def handle_process_output( + process: "Git.AutoInterrupt" | Popen, + stdout_handler: Union[ + None, + Callable[[AnyStr], None], + Callable[[List[AnyStr]], None], + Callable[[bytes, "Repo", "DiffIndex"], None], + ], + stderr_handler: Union[ + None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None] + ], + finalizer: Union[ + None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None] + ] = None, + decode_streams: bool = True, + kill_after_timeout: Union[None, float] = None, +) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. This function returns once the finalizer returns @@ -101,8 +126,13 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, should be killed. """ # Use 2 "pump" threads and wait for both to finish. - def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool, - handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None: + def pump_stream( + cmdline: List[str], + name: str, + stream: Union[BinaryIO, TextIO], + is_decode: bool, + handler: Union[None, Callable[[Union[bytes, str]], None]], + ) -> None: try: for line in stream: if handler: @@ -114,21 +144,25 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], handler(line) except Exception as ex: - log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") + log.error( + f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}" + ) if "I/O operation on closed file" not in str(ex): # Only reraise if the error was not due to the stream closing - raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex + raise CommandError( + [f"<{name}-pump>"] + remove_password_if_present(cmdline), ex + ) from ex finally: stream.close() - if hasattr(process, 'proc'): - process = cast('Git.AutoInterrupt', process) - cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, 'args', '') + if hasattr(process, "proc"): + process = cast("Git.AutoInterrupt", process) + cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") p_stdout = process.proc.stdout if process.proc else None p_stderr = process.proc.stderr if process.proc else None else: process = cast(Popen, process) - cmdline = getattr(process, 'args', '') + cmdline = getattr(process, "args", "") p_stdout = process.stdout p_stderr = process.stderr @@ -137,15 +171,16 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] if p_stdout: - pumps.append(('stdout', p_stdout, stdout_handler)) + pumps.append(("stdout", p_stdout, stdout_handler)) if p_stderr: - pumps.append(('stderr', p_stderr, stderr_handler)) + pumps.append(("stderr", p_stderr, stderr_handler)) threads: List[threading.Thread] = [] for name, stream, handler in pumps: - t = threading.Thread(target=pump_stream, - args=(cmdline, name, stream, decode_streams, handler)) + t = threading.Thread( + target=pump_stream, args=(cmdline, name, stream, decode_streams, handler) + ) t.daemon = True t.start() threads.append(t) @@ -158,12 +193,15 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], if isinstance(process, Git.AutoInterrupt): process._terminate() else: # Don't want to deal with the other case - raise RuntimeError("Thread join() timed out in cmd.handle_process_output()." - f" kill_after_timeout={kill_after_timeout} seconds") + raise RuntimeError( + "Thread join() timed out in cmd.handle_process_output()." + f" kill_after_timeout={kill_after_timeout} seconds" + ) if stderr_handler: error_str: Union[str, bytes] = ( "error: process killed because it timed out." - f" kill_after_timeout={kill_after_timeout} seconds") + f" kill_after_timeout={kill_after_timeout} seconds" + ) if not decode_streams and isinstance(p_stderr, BinaryIO): # Assume stderr_handler needs binary input error_str = cast(str, error_str) @@ -179,19 +217,22 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], def dashify(string: str) -> str: - return string.replace('_', '-') + return string.replace("_", "-") def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]: return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} -def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: +def dict_to_slots_and__excluded_are_none( + self: object, d: Mapping[str, Any], excluded: Sequence[str] = () +) -> None: for k, v in d.items(): setattr(self, k, v) for k in excluded: setattr(self, k, None) + ## -- End Utilities -- @} @@ -200,8 +241,11 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] - if is_win else 0) # mypy error if not windows +PROC_CREATIONFLAGS = ( + CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] + if is_win + else 0 +) # mypy error if not windows class Git(LazyMixin): @@ -220,10 +264,18 @@ class Git(LazyMixin): of the command to stdout. Set its value to 'full' to see details about the returned values. """ - __slots__ = ("_working_dir", "cat_file_all", "cat_file_header", "_version_info", - "_git_options", "_persistent_git_options", "_environment") - _excluded_ = ('cat_file_all', 'cat_file_header', '_version_info') + __slots__ = ( + "_working_dir", + "cat_file_all", + "cat_file_header", + "_version_info", + "_git_options", + "_persistent_git_options", + "_environment", + ) + + _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) @@ -233,7 +285,7 @@ def __setstate__(self, d: Dict[str, Any]) -> None: # CONFIGURATION - git_exec_name = "git" # default that should work on linux and windows + git_exec_name = "git" # default that should work on linux and windows # Enables debugging of GitPython's git commands GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) @@ -282,13 +334,18 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: # warn or raise exception if test failed if not has_git: - err = dedent("""\ + err = ( + dedent( + """\ Bad git executable. The git executable must be specified in one of the following ways: - be included in your $PATH - be set via $%s - explicitly set via git.refresh() - """) % cls._git_exec_env_var + """ + ) + % cls._git_exec_env_var + ) # revert to whatever the old_git was cls.GIT_PYTHON_GIT_EXECUTABLE = old_git @@ -314,7 +371,9 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: if mode in quiet: pass elif mode in warn or mode in error: - err = dedent("""\ + err = ( + dedent( + """\ %s All git commands will error until this is rectified. @@ -326,32 +385,42 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: Example: export %s=%s - """) % ( - err, - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error), - cls._refresh_env_var, - quiet[0]) + """ + ) + % ( + err, + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + cls._refresh_env_var, + quiet[0], + ) + ) if mode in warn: print("WARNING: %s" % err) else: raise ImportError(err) else: - err = dedent("""\ + err = ( + dedent( + """\ %s environment variable has been set but it has been set with an invalid value. Use only the following values: - %s: for no warning or exception - %s: for a printed warning - %s: for a raised exception - """) % ( - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error)) + """ + ) + % ( + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + ) + ) raise ImportError(err) # we get here if this was the init refresh and the refresh mode @@ -395,7 +464,7 @@ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: Hence we undo the escaping just to be sure. """ url = os.path.expandvars(url) - if url.startswith('~'): + if url.startswith("~"): url = os.path.expanduser(url) url = url.replace("\\\\", "\\").replace("\\", "/") return url @@ -441,7 +510,7 @@ def _terminate(self) -> None: log.info("Ignored error after process had died: %r", ex) # can be that nothing really exists anymore ... - if os is None or getattr(os, 'kill', None) is None: + if os is None or getattr(os, "kill", None) is None: return None # try to kill it @@ -458,7 +527,10 @@ def _terminate(self) -> None: # we simply use the shell and redirect to nul. Its slower than CreateProcess, question # is whether we really want to see all these messages. Its annoying no matter what. if is_win: - call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True) + call( + ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), + shell=True, + ) # END exception handling def __del__(self) -> None: @@ -468,15 +540,15 @@ def __getattr__(self, attr: str) -> Any: return getattr(self.proc, attr) # TODO: Bad choice to mimic `proc.wait()` but with different args. - def wait(self, stderr: Union[None, str, bytes] = b'') -> int: + def wait(self, stderr: Union[None, str, bytes] = b"") -> int: """Wait for the process and return its status code. :param stderr: Previously read value of stderr, in case stderr is already closed. :warn: may deadlock if output or error pipes are used and not handled separately. :raise GitCommandError: if the return status is not 0""" if stderr is None: - stderr_b = b'' - stderr_b = force_bytes(data=stderr, encoding='utf-8') + stderr_b = b"" + stderr_b = force_bytes(data=stderr, encoding="utf-8") status: Union[int, None] if self.proc is not None: status = self.proc.wait() @@ -485,21 +557,25 @@ def wait(self, stderr: Union[None, str, bytes] = b'') -> int: status = self.status p_stderr = None - def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: + def read_all_from_possibly_closed_stream( + stream: Union[IO[bytes], None] + ) -> bytes: if stream: try: return stderr_b + force_bytes(stream.read()) except ValueError: - return stderr_b or b'' + return stderr_b or b"" else: - return stderr_b or b'' + return stderr_b or b"" # END status handling if status != 0: errstr = read_all_from_possibly_closed_stream(p_stderr) - log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) - raise GitCommandError(remove_password_if_present(self.args), status, errstr) + log.debug("AutoInterrupt wait stderr: %r" % (errstr,)) + raise GitCommandError( + remove_password_if_present(self.args), status, errstr + ) return status # END auto interrupt @@ -513,12 +589,12 @@ class CatFileContentStream(object): If not all data is read to the end of the objects's lifetime, we read the rest to assure the underlying stream continues to work""" - __slots__: Tuple[str, ...] = ('_stream', '_nbr', '_size') + __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size") def __init__(self, size: int, stream: IO[bytes]) -> None: self._stream = stream self._size = size - self._nbr = 0 # num bytes read + self._nbr = 0 # num bytes read # special case: if the object is empty, has null bytes, get the # final newline right away. @@ -529,7 +605,7 @@ def __init__(self, size: int, stream: IO[bytes]) -> None: def read(self, size: int = -1) -> bytes: bytes_left = self._size - self._nbr if bytes_left == 0: - return b'' + return b"" if size > -1: # assure we don't try to read past our limit size = min(bytes_left, size) @@ -542,13 +618,13 @@ def read(self, size: int = -1) -> bytes: # check for depletion, read our final byte to make the stream usable by others if self._size - self._nbr == 0: - self._stream.read(1) # final newline + self._stream.read(1) # final newline # END finish reading return data def readline(self, size: int = -1) -> bytes: if self._nbr == self._size: - return b'' + return b"" # clamp size to lowest allowed value bytes_left = self._size - self._nbr @@ -589,7 +665,7 @@ def readlines(self, size: int = -1) -> List[bytes]: return out # skipcq: PYL-E0301 - def __iter__(self) -> 'Git.CatFileContentStream': + def __iter__(self) -> "Git.CatFileContentStream": return self def __next__(self) -> bytes: @@ -634,7 +710,7 @@ def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was an object. :return: Callable object that will execute call _call_process with your arguments.""" - if name[0] == '_': + if name[0] == "_": return LazyMixin.__getattr__(self, name) return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) @@ -650,27 +726,31 @@ def set_persistent_git_options(self, **kwargs: Any) -> None: """ self._persistent_git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + split_single_char_options=True, **kwargs + ) def _set_cache_(self, attr: str) -> None: - if attr == '_version_info': + if attr == "_version_info": # We only use the first 4 numbers, as everything else could be strings in fact (on windows) - process_version = self._call_process('version') # should be as default *args and **kwargs used - version_numbers = process_version.split(' ')[2] - - self._version_info = cast(Tuple[int, int, int, int], - tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit()) - ) + process_version = self._call_process( + "version" + ) # should be as default *args and **kwargs used + version_numbers = process_version.split(" ")[2] + + self._version_info = cast( + Tuple[int, int, int, int], + tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()), + ) else: super(Git, self)._set_cache_(attr) # END handle version info - @ property + @property def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" return self._working_dir - @ property + @property def version_info(self) -> Tuple[int, int, int, int]: """ :return: tuple(int, int, int, int) tuple with integers representing the major, minor @@ -678,69 +758,72 @@ def version_info(self) -> Tuple[int, int, int, int]: This value is generated on demand and is cached""" return self._version_info - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[True] - ) -> 'AutoInterrupt': + @overload + def execute( + self, command: Union[str, Sequence[Any]], *, as_process: Literal[True] + ) -> "AutoInterrupt": ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[True] - ) -> Union[str, Tuple[int, str, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[True], + ) -> Union[str, Tuple[int, str, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[False] = False - ) -> Union[bytes, Tuple[int, bytes, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[False] = False, + ) -> Union[bytes, Tuple[int, bytes, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[True] - ) -> str: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[True], + ) -> str: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[False] - ) -> bytes: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[False], + ) -> bytes: ... - def execute(self, - command: Union[str, Sequence[Any]], - istream: Union[None, BinaryIO] = None, - with_extended_output: bool = False, - with_exceptions: bool = True, - as_process: bool = False, - output_stream: Union[None, BinaryIO] = None, - stdout_as_string: bool = True, - kill_after_timeout: Union[None, float] = None, - with_stdout: bool = True, - universal_newlines: bool = False, - shell: Union[None, bool] = None, - env: Union[None, Mapping[str, str]] = None, - max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, - strip_newline_in_stdout: bool = True, - **subprocess_kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: + def execute( + self, + command: Union[str, Sequence[Any]], + istream: Union[None, BinaryIO] = None, + with_extended_output: bool = False, + with_exceptions: bool = True, + as_process: bool = False, + output_stream: Union[None, BinaryIO] = None, + stdout_as_string: bool = True, + kill_after_timeout: Union[None, float] = None, + with_stdout: bool = True, + universal_newlines: bool = False, + shell: Union[None, bool] = None, + env: Union[None, Mapping[str, str]] = None, + max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, + strip_newline_in_stdout: bool = True, + **subprocess_kwargs: Any, + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: """Handles executing the command on the shell and consumes and returns the returned information (stdout) @@ -831,8 +914,8 @@ def execute(self, you must update the execute_kwargs tuple housed in this module.""" # Remove password for the command if present redacted_command = remove_password_if_present(command) - if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != 'full' or as_process): - log.info(' '.join(redacted_command)) + if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): + log.info(" ".join(redacted_command)) # Allow the user to have the command executed in their working dir. try: @@ -858,33 +941,47 @@ def execute(self, if is_win: cmd_not_found_exception = OSError if kill_after_timeout is not None: - raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.') + raise GitCommandError( + redacted_command, + '"kill_after_timeout" feature is not supported on Windows.', + ) else: - cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable + cmd_not_found_exception = ( + FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable + ) # end handle - stdout_sink = (PIPE - if with_stdout - else getattr(subprocess, 'DEVNULL', None) or open(os.devnull, 'wb')) + stdout_sink = ( + PIPE + if with_stdout + else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") + ) istream_ok = "None" if istream: istream_ok = "" - log.debug("Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", - redacted_command, cwd, universal_newlines, shell, istream_ok) + log.debug( + "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", + redacted_command, + cwd, + universal_newlines, + shell, + istream_ok, + ) try: - proc = Popen(command, - env=env, - cwd=cwd, - bufsize=-1, - stdin=istream or DEVNULL, - stderr=PIPE, - stdout=stdout_sink, - shell=shell is not None and shell or self.USE_SHELL, - close_fds=is_posix, # unsupported on windows - universal_newlines=universal_newlines, - creationflags=PROC_CREATIONFLAGS, - **subprocess_kwargs - ) + proc = Popen( + command, + env=env, + cwd=cwd, + bufsize=-1, + stdin=istream or DEVNULL, + stderr=PIPE, + stdout=stdout_sink, + shell=shell is not None and shell or self.USE_SHELL, + close_fds=is_posix, # unsupported on windows + universal_newlines=universal_newlines, + creationflags=PROC_CREATIONFLAGS, + **subprocess_kwargs, + ) except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err @@ -897,9 +994,12 @@ def execute(self, return self.AutoInterrupt(proc, command) def _kill_process(pid: int) -> None: - """ Callback method to kill a process. """ - p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE, - creationflags=PROC_CREATIONFLAGS) + """Callback method to kill a process.""" + p = Popen( + ["ps", "--ppid", str(pid)], + stdout=PIPE, + creationflags=PROC_CREATIONFLAGS, + ) child_pids = [] if p.stdout is not None: for line in p.stdout: @@ -909,29 +1009,32 @@ def _kill_process(pid: int) -> None: child_pids.append(int(local_pid)) try: # Windows does not have SIGKILL, so use SIGTERM instead - sig = getattr(signal, 'SIGKILL', signal.SIGTERM) + sig = getattr(signal, "SIGKILL", signal.SIGTERM) os.kill(pid, sig) for child_pid in child_pids: try: os.kill(child_pid, sig) except OSError: pass - kill_check.set() # tell the main routine that the process was killed + kill_check.set() # tell the main routine that the process was killed except OSError: # It is possible that the process gets completed in the duration after timeout # happens and before we try to kill the process. pass return + # end if kill_after_timeout is not None: kill_check = threading.Event() - watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,)) + watchdog = threading.Timer( + kill_after_timeout, _kill_process, args=(proc.pid,) + ) # Wait for the process to return status = 0 - stdout_value: Union[str, bytes] = b'' - stderr_value: Union[str, bytes] = b'' + stdout_value: Union[str, bytes] = b"" + stderr_value: Union[str, bytes] = b"" newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: @@ -941,8 +1044,10 @@ def _kill_process(pid: int) -> None: if kill_after_timeout is not None: watchdog.cancel() if kill_check.is_set(): - stderr_value = ('Timeout: the command "%s" did not complete in %d ' - 'secs.' % (" ".join(redacted_command), kill_after_timeout)) + stderr_value = ( + 'Timeout: the command "%s" did not complete in %d ' + "secs." % (" ".join(redacted_command), kill_after_timeout) + ) if not universal_newlines: stderr_value = stderr_value.encode(defenc) # strip trailing "\n" @@ -953,12 +1058,16 @@ def _kill_process(pid: int) -> None: status = proc.returncode else: - max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + max_chunk_size = ( + max_chunk_size + if max_chunk_size and max_chunk_size > 0 + else io.DEFAULT_BUFFER_SIZE + ) stream_copy(proc.stdout, output_stream, max_chunk_size) stdout_value = proc.stdout.read() stderr_value = proc.stderr.read() # strip trailing "\n" - if stderr_value.endswith(newline): # type: ignore + if stderr_value.endswith(newline): # type: ignore stderr_value = stderr_value[:-1] status = proc.wait() # END stdout handling @@ -966,18 +1075,28 @@ def _kill_process(pid: int) -> None: proc.stdout.close() proc.stderr.close() - if self.GIT_PYTHON_TRACE == 'full': + if self.GIT_PYTHON_TRACE == "full": cmdstr = " ".join(redacted_command) def as_text(stdout_value: Union[bytes, str]) -> str: - return not output_stream and safe_decode(stdout_value) or '' + return ( + not output_stream and safe_decode(stdout_value) or "" + ) + # end if stderr_value: - log.info("%s -> %d; stdout: '%s'; stderr: '%s'", - cmdstr, status, as_text(stdout_value), safe_decode(stderr_value)) + log.info( + "%s -> %d; stdout: '%s'; stderr: '%s'", + cmdstr, + status, + as_text(stdout_value), + safe_decode(stderr_value), + ) elif stdout_value: - log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) + log.info( + "%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value) + ) else: log.info("%s -> %d", cmdstr, status) # END handle debug printing @@ -985,7 +1104,9 @@ def as_text(stdout_value: Union[bytes, str]) -> str: if with_exceptions and status != 0: raise GitCommandError(redacted_command, status, stderr_value, stdout_value) - if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream + if ( + isinstance(stdout_value, bytes) and stdout_as_string + ): # could also be output_stream stdout_value = safe_decode(stdout_value) # Allow access to the command's status code @@ -1042,7 +1163,9 @@ def custom_environment(self, **kwargs: Any) -> Iterator[None]: finally: self.update_environment(**old_env) - def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: + def transform_kwarg( + self, name: str, value: Any, split_single_char_options: bool + ) -> List[str]: if len(name) == 1: if value is True: return ["-%s" % name] @@ -1058,7 +1181,9 @@ def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool return ["--%s=%s" % (dashify(name), value)] return [] - def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: + def transform_kwargs( + self, split_single_char_options: bool = True, **kwargs: Any + ) -> List[str]: """Transforms Python style kwargs into git command line options.""" args = [] for k, v in kwargs.items(): @@ -1081,7 +1206,7 @@ def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: return outlist - def __call__(self, **kwargs: Any) -> 'Git': + def __call__(self, **kwargs: Any) -> "Git": """Specify command line options to the git executable for a subcommand call @@ -1094,28 +1219,34 @@ def __call__(self, **kwargs: Any) -> 'Git': ``Examples``:: git(work_tree='/tmp').difftool()""" self._git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + split_single_char_options=True, **kwargs + ) return self @overload - def _call_process(self, method: str, *args: None, **kwargs: None - ) -> str: + def _call_process(self, method: str, *args: None, **kwargs: None) -> str: ... # if no args given, execute called with all defaults @overload - def _call_process(self, method: str, - istream: int, - as_process: Literal[True], - *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': ... + def _call_process( + self, + method: str, + istream: int, + as_process: Literal[True], + *args: Any, + **kwargs: Any, + ) -> "Git.AutoInterrupt": + ... @overload - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: """Run the given git command with the specified arguments and return the result as a String @@ -1145,13 +1276,13 @@ def _call_process(self, method: str, *args: Any, **kwargs: Any :return: Same as ``execute`` if no args given used execute default (esp. as_process = False, stdout_as_string = True) - and return str """ + and return str""" # Handle optional arguments prior to calling transform_kwargs # otherwise these'll end up in args, which is bad. exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} - insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None) + insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) # Prepare the argument list @@ -1164,10 +1295,12 @@ def _call_process(self, method: str, *args: Any, **kwargs: Any try: index = ext_args.index(insert_after_this_arg) except ValueError as err: - raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after" - % (insert_after_this_arg, str(ext_args))) from err + raise ValueError( + "Couldn't find argument '%s' in args %s to insert cmd options after" + % (insert_after_this_arg, str(ext_args)) + ) from err # end handle error - args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:] + args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] # end handle opts_kwargs call = [self.GIT_PYTHON_GIT_EXECUTABLE] @@ -1197,9 +1330,15 @@ def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: tokens = header_line.split() if len(tokens) != 3: if not tokens: - raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip())) + raise ValueError( + "SHA could not be resolved, git returned: %r" + % (header_line.strip()) + ) else: - raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) + raise ValueError( + "SHA %s could not be resolved, git returned: %r" + % (tokens[0], header_line.strip()) + ) # END handle actual return value # END error handling @@ -1211,9 +1350,9 @@ def _prepare_ref(self, ref: AnyStr) -> bytes: # required for command to separate refs on stdin, as bytes if isinstance(ref, bytes): # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text - refstr: str = ref.decode('ascii') + refstr: str = ref.decode("ascii") elif not isinstance(ref, str): - refstr = str(ref) # could be ref-object + refstr = str(ref) # could be ref-object else: refstr = ref @@ -1221,8 +1360,9 @@ def _prepare_ref(self, ref: AnyStr) -> bytes: refstr += "\n" return refstr.encode(defenc) - def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': + def _get_persistent_cmd( + self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any + ) -> "Git.AutoInterrupt": cur_val = getattr(self, attr_name) if cur_val is not None: return cur_val @@ -1232,10 +1372,12 @@ def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwarg cmd = self._call_process(cmd_name, *args, **options) setattr(self, attr_name, cmd) - cmd = cast('Git.AutoInterrupt', cmd) + cmd = cast("Git.AutoInterrupt", cmd) return cmd - def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[str, str, int]: + def __get_object_header( + self, cmd: "Git.AutoInterrupt", ref: AnyStr + ) -> Tuple[str, str, int]: if cmd.stdin and cmd.stdout: cmd.stdin.write(self._prepare_ref(ref)) cmd.stdin.flush() @@ -1244,7 +1386,7 @@ def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[st raise ValueError("cmd stdin was empty") def get_object_header(self, ref: str) -> Tuple[str, str, int]: - """ Use this method to quickly examine the type and size of the object behind + """Use this method to quickly examine the type and size of the object behind the given ref. :note: The method will only suffer from the costs of command invocation @@ -1255,16 +1397,18 @@ def get_object_header(self, ref: str) -> Tuple[str, str, int]: return self.__get_object_header(cmd, ref) def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: - """ As get_object_header, but returns object data as well + """As get_object_header, but returns object data as well :return: (hexsha, type_string, size_as_int,data_string) :note: not threadsafe""" hexsha, typename, size, stream = self.stream_object_data(ref) data = stream.read(size) - del(stream) + del stream return (hexsha, typename, size, data) - def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']: - """ As get_object_header, but returns the data as a stream + def stream_object_data( + self, ref: str + ) -> Tuple[str, str, int, "Git.CatFileContentStream"]: + """As get_object_header, but returns the data as a stream :return: (hexsha, type_string, size_as_int, stream) :note: This method is not threadsafe, you need one independent Command instance per thread to be safe !""" @@ -1273,7 +1417,7 @@ def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileConte cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) - def clear_cache(self) -> 'Git': + def clear_cache(self) -> "Git": """Clear all kinds of internal caches to release resources. Currently persistent commands will be interrupted. diff --git a/git/compat.py b/git/compat.py index 988c04eff..e7ef28c30 100644 --- a/git/compat.py +++ b/git/compat.py @@ -12,8 +12,8 @@ import sys from gitdb.utils.encoding import ( - force_bytes, # @UnusedImport - force_text # @UnusedImport + force_bytes, # @UnusedImport + force_text, # @UnusedImport ) # typing -------------------------------------------------------------------- @@ -29,21 +29,24 @@ Union, overload, ) + # --------------------------------------------------------------------------- -is_win: bool = (os.name == 'nt') -is_posix = (os.name == 'posix') -is_darwin = (os.name == 'darwin') +is_win: bool = os.name == "nt" +is_posix = os.name == "posix" +is_darwin = os.name == "darwin" defenc = sys.getfilesystemencoding() @overload -def safe_decode(s: None) -> None: ... +def safe_decode(s: None) -> None: + ... @overload -def safe_decode(s: AnyStr) -> str: ... +def safe_decode(s: AnyStr) -> str: + ... def safe_decode(s: Union[AnyStr, None]) -> Optional[str]: @@ -51,19 +54,21 @@ def safe_decode(s: Union[AnyStr, None]) -> Optional[str]: if isinstance(s, str): return s elif isinstance(s, bytes): - return s.decode(defenc, 'surrogateescape') + return s.decode(defenc, "surrogateescape") elif s is None: return None else: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) @overload -def safe_encode(s: None) -> None: ... +def safe_encode(s: None) -> None: + ... @overload -def safe_encode(s: AnyStr) -> bytes: ... +def safe_encode(s: AnyStr) -> bytes: + ... def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: @@ -75,15 +80,17 @@ def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: elif s is None: return None else: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) @overload -def win_encode(s: None) -> None: ... +def win_encode(s: None) -> None: + ... @overload -def win_encode(s: AnyStr) -> bytes: ... +def win_encode(s: AnyStr) -> bytes: + ... def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: @@ -93,5 +100,5 @@ def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: elif isinstance(s, bytes): return s elif s is not None: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) return None diff --git a/git/config.py b/git/config.py index 1ac3c9cec..24c2b2013 100644 --- a/git/config.py +++ b/git/config.py @@ -30,8 +30,20 @@ # typing------------------------------------------------------- -from typing import (Any, Callable, Generic, IO, List, Dict, Sequence, - TYPE_CHECKING, Tuple, TypeVar, Union, cast) +from typing import ( + Any, + Callable, + Generic, + IO, + List, + Dict, + Sequence, + TYPE_CHECKING, + Tuple, + TypeVar, + Union, + cast, +) from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T @@ -39,23 +51,25 @@ from git.repo.base import Repo from io import BytesIO -T_ConfigParser = TypeVar('T_ConfigParser', bound='GitConfigParser') -T_OMD_value = TypeVar('T_OMD_value', str, bytes, int, float, bool) +T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") +T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) if sys.version_info[:3] < (3, 7, 2): # typing.Ordereddict not added until py 3.7.2 from collections import OrderedDict + OrderedDict_OMD = OrderedDict else: from typing import OrderedDict + OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] # ------------------------------------------------------------- -__all__ = ('GitConfigParser', 'SectionConstraint') +__all__ = ("GitConfigParser", "SectionConstraint") -log = logging.getLogger('git.config') +log = logging.getLogger("git.config") log.addHandler(logging.NullHandler()) # invariants @@ -67,26 +81,37 @@ # Section pattern to detect conditional includes. # https://git-scm.com/docs/git-config#_conditional_includes -CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"") +CONDITIONAL_INCLUDE_REGEXP = re.compile( + r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"" +) class MetaParserBuilder(abc.ABCMeta): """Utility class wrapping base-class methods into decorators that assure read-only properties""" - def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> 'MetaParserBuilder': + + def __new__( + cls, name: str, bases: Tuple, clsdict: Dict[str, Any] + ) -> "MetaParserBuilder": """ Equip all base-class methods with a needs_values decorator, and all non-const methods with a set_dirty_and_flush_changes decorator in addition to that.""" - kmm = '_mutating_methods_' + kmm = "_mutating_methods_" if kmm in clsdict: mutating_methods = clsdict[kmm] for base in bases: - methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) + methods = ( + t + for t in inspect.getmembers(base, inspect.isroutine) + if not t[0].startswith("_") + ) for name, method in methods: if name in clsdict: continue method_with_values = needs_values(method) if name in mutating_methods: - method_with_values = set_dirty_and_flush_changes(method_with_values) + method_with_values = set_dirty_and_flush_changes( + method_with_values + ) # END mutating methods handling clsdict[name] = method_with_values @@ -102,9 +127,10 @@ def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: """Returns method assuring we read values (on demand) before we try to access them""" @wraps(func) - def assure_data_present(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T: + def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: self.read() return func(self, *args, **kwargs) + # END wrapper method return assure_data_present @@ -114,11 +140,12 @@ def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[. If so, the instance will be set dirty. Additionally, we flush the changes right to disk""" - def flush_changes(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T: + def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: rval = non_const_func(self, *args, **kwargs) self._dirty = True self.write() return rval + # END wrapper method flush_changes.__name__ = non_const_func.__name__ return flush_changes @@ -133,9 +160,21 @@ class SectionConstraint(Generic[T_ConfigParser]): :note: If used as a context manager, will release the wrapped ConfigParser.""" + __slots__ = ("_config", "_section_name") - _valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option", - "remove_section", "remove_option", "options") + _valid_attrs_ = ( + "get_value", + "set_value", + "get", + "set", + "getint", + "getfloat", + "getboolean", + "has_option", + "remove_section", + "remove_option", + "options", + ) def __init__(self, config: T_ConfigParser, section: str) -> None: self._config = config @@ -166,11 +205,13 @@ def release(self) -> None: """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance""" return self._config.release() - def __enter__(self) -> 'SectionConstraint[T_ConfigParser]': + def __enter__(self) -> "SectionConstraint[T_ConfigParser]": self._config.__enter__() return self - def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: + def __exit__( + self, exception_type: str, exception_value: str, traceback: str + ) -> None: self._config.__exit__(exception_type, exception_value, traceback) @@ -228,16 +269,22 @@ def get_config_path(config_level: Lit_config_levels) -> str: if config_level == "system": return "/etc/gitconfig" elif config_level == "user": - config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", '~'), ".config") + config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join( + os.environ.get("HOME", "~"), ".config" + ) return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") + raise ValueError( + "No repo to get repository configuration from. Use Repo._get_config_path" + ) else: # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs - assert_never(config_level, # type: ignore[unreachable] - ValueError(f"Invalid configuration level: {config_level!r}")) + assert_never( + config_level, # type: ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}"), + ) class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): @@ -258,30 +305,36 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): must match perfectly. If used as a context manager, will release the locked file.""" - #{ Configuration + # { Configuration # The lock type determines the type of lock to use in new configuration readers. # They must be compatible to the LockFile interface. # A suitable alternative would be the BlockingLockFile t_lock = LockFile - re_comment = re.compile(r'^\s*[#;]') + re_comment = re.compile(r"^\s*[#;]") - #} END configuration + # } END configuration - optvalueonly_source = r'\s*(?P