diff --git a/.flake8 b/.flake8 index c55fe35d4..08001ffac 100644 --- a/.flake8 +++ b/.flake8 @@ -25,7 +25,6 @@ ignore = E265,E266,E731,E704, A, D, RST, RST3 -max-line-length = 120 exclude = .tox,.venv,build,dist,doc,git/ext/,test @@ -33,3 +32,7 @@ rst-roles = # for flake8-RST-docstrings attr,class,func,meth,mod,obj,ref,term,var # used by sphinx min-python-version = 3.7.0 + +# for `black` compatibility +max-line-length = 120 +extend-ignore = E203,W503 diff --git a/doc/source/conf.py b/doc/source/conf.py index 286058fdc..54f1f4723 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -20,38 +20,38 @@ # If your extensions are in another directory, add it here. If the directory # is relative to the documentation root, use os.path.abspath to make it # absolute, like shown here. -#sys.path.append(os.path.abspath('.')) -sys.path.insert(0, os.path.abspath('../..')) +# sys.path.append(os.path.abspath('.')) +sys.path.insert(0, os.path.abspath("../..")) # General configuration # --------------------- # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest'] +extensions = ["sphinx.ext.autodoc", "sphinx.ext.doctest"] # Add any paths that contain templates here, relative to this directory. templates_path = [] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. -#source_encoding = 'utf-8' +# source_encoding = 'utf-8' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = 'GitPython' -copyright = 'Copyright (C) 2008, 2009 Michael Trier and contributors, 2010-2015 Sebastian Thiel' +project = "GitPython" +copyright = "Copyright (C) 2008, 2009 Michael Trier and contributors, 2010-2015 Sebastian Thiel" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -with open(os.path.join(os.path.dirname(__file__), "..", "..", 'VERSION')) as fd: +with open(os.path.join(os.path.dirname(__file__), "..", "..", "VERSION")) as fd: VERSION = fd.readline().strip() version = VERSION # The full version, including alpha/beta/rc tags. @@ -59,61 +59,60 @@ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. -#language = None +# language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: -#today = '' +# today = '' # Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' +# today_fmt = '%B %d, %Y' # List of documents that shouldn't be included in the build. -#unused_docs = [] +# unused_docs = [] # List of directories, relative to source directory, that shouldn't be searched # for source files. -exclude_trees = ['build'] +exclude_trees = ["build"] # The reST default role (used for this markup: `text`) to use for all documents. -#default_role = None +# default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +# add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). -#add_module_names = True +# add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. -#show_authors = False +# show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # Options for HTML output # ----------------------- -html_theme = 'sphinx_rtd_theme' -html_theme_options = { -} +html_theme = "sphinx_rtd_theme" +html_theme_options = {} # The name for this set of Sphinx documents. If None, it defaults to # " v documentation". -#html_title = None +# html_title = None # A shorter title for the navigation bar. Default is the same as html_title. -#html_short_title = None +# html_short_title = None # The name of an image file (relative to this directory) to place at the top # of the sidebar. -#html_logo = None +# html_logo = None # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. -#html_favicon = None +# html_favicon = None # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, @@ -122,72 +121,71 @@ # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' +# html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. -#html_use_smartypants = True +# html_use_smartypants = True # Custom sidebar templates, maps document names to template names. -#html_sidebars = {} +# html_sidebars = {} # Additional templates that should be rendered to pages, maps page names to # template names. -#html_additional_pages = {} +# html_additional_pages = {} # If false, no module index is generated. -#html_use_modindex = True +# html_use_modindex = True # If false, no index is generated. -#html_use_index = True +# html_use_index = True # If true, the index is split into individual pages for each letter. -#html_split_index = False +# html_split_index = False # If true, the reST sources are included in the HTML build as _sources/. -#html_copy_source = True +# html_copy_source = True # If true, an OpenSearch description file will be output, and all pages will # contain a tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. -#html_use_opensearch = '' +# html_use_opensearch = '' # If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = '' +# html_file_suffix = '' # Output file base name for HTML help builder. -htmlhelp_basename = 'gitpythondoc' +htmlhelp_basename = "gitpythondoc" # Options for LaTeX output # ------------------------ # The paper size ('letter' or 'a4'). -#latex_paper_size = 'letter' +# latex_paper_size = 'letter' # The font size ('10pt', '11pt' or '12pt'). -#latex_font_size = '10pt' +# latex_font_size = '10pt' # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, document class [howto/manual]). latex_documents = [ - ('index', 'GitPython.tex', r'GitPython Documentation', - r'Michael Trier', 'manual'), + ("index", "GitPython.tex", r"GitPython Documentation", r"Michael Trier", "manual"), ] # The name of an image file (relative to this directory) to place at the top of # the title page. -#latex_logo = None +# latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. -#latex_use_parts = False +# latex_use_parts = False # Additional stuff for the LaTeX preamble. -#latex_preamble = '' +# latex_preamble = '' # Documents to append as an appendix to all manuals. -#latex_appendices = [] +# latex_appendices = [] # If false, no module index is generated. -#latex_use_modindex = True +# latex_use_modindex = True diff --git a/git/__init__.py b/git/__init__.py index ae9254a26..f746e1fca 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -4,8 +4,8 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php # flake8: noqa -#@PydevCodeAnalysisIgnore -from git.exc import * # @NoMove @IgnorePep8 +# @PydevCodeAnalysisIgnore +from git.exc import * # @NoMove @IgnorePep8 import inspect import os import sys @@ -14,14 +14,14 @@ from typing import Optional from git.types import PathLike -__version__ = 'git' +__version__ = "git" -#{ Initialization +# { Initialization def _init_externals() -> None: """Initialize external projects by putting them into the path""" - if __version__ == 'git' and 'PYOXIDIZER' not in os.environ: - sys.path.insert(1, osp.join(osp.dirname(__file__), 'ext', 'gitdb')) + if __version__ == "git" and "PYOXIDIZER" not in os.environ: + sys.path.insert(1, osp.join(osp.dirname(__file__), "ext", "gitdb")) try: import gitdb @@ -29,26 +29,27 @@ def _init_externals() -> None: raise ImportError("'gitdb' could not be found in your PYTHONPATH") from e # END verify import -#} END initialization + +# } END initialization ################# _init_externals() ################# -#{ Imports +# { Imports try: from git.config import GitConfigParser # @NoMove @IgnorePep8 - from git.objects import * # @NoMove @IgnorePep8 - from git.refs import * # @NoMove @IgnorePep8 - from git.diff import * # @NoMove @IgnorePep8 - from git.db import * # @NoMove @IgnorePep8 - from git.cmd import Git # @NoMove @IgnorePep8 - from git.repo import Repo # @NoMove @IgnorePep8 - from git.remote import * # @NoMove @IgnorePep8 - from git.index import * # @NoMove @IgnorePep8 - from git.util import ( # @NoMove @IgnorePep8 + from git.objects import * # @NoMove @IgnorePep8 + from git.refs import * # @NoMove @IgnorePep8 + from git.diff import * # @NoMove @IgnorePep8 + from git.db import * # @NoMove @IgnorePep8 + from git.cmd import Git # @NoMove @IgnorePep8 + from git.repo import Repo # @NoMove @IgnorePep8 + from git.remote import * # @NoMove @IgnorePep8 + from git.index import * # @NoMove @IgnorePep8 + from git.util import ( # @NoMove @IgnorePep8 LockFile, BlockingLockFile, Stats, @@ -56,15 +57,14 @@ def _init_externals() -> None: rmtree, ) except GitError as exc: - raise ImportError('%s: %s' % (exc.__class__.__name__, exc)) from exc + raise ImportError("%s: %s" % (exc.__class__.__name__, exc)) from exc -#} END imports +# } END imports -__all__ = [name for name, obj in locals().items() - if not (name.startswith('_') or inspect.ismodule(obj))] +__all__ = [name for name, obj in locals().items() if not (name.startswith("_") or inspect.ismodule(obj))] -#{ Initialize git executable path +# { Initialize git executable path GIT_OK = None @@ -79,12 +79,14 @@ def refresh(path: Optional[PathLike] = None) -> None: return GIT_OK = True -#} END initialize git executable path + + +# } END initialize git executable path ################# try: refresh() except Exception as exc: - raise ImportError('Failed to initialize: {0}'.format(exc)) from exc + raise ImportError("Failed to initialize: {0}".format(exc)) from exc ################# diff --git a/git/cmd.py b/git/cmd.py index 1ddf9e03f..0d2913678 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -9,12 +9,7 @@ import logging import os import signal -from subprocess import ( - call, - Popen, - PIPE, - DEVNULL -) +from subprocess import call, Popen, PIPE, DEVNULL import subprocess import threading from textwrap import dedent @@ -29,10 +24,7 @@ from git.exc import CommandError from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present -from .exc import ( - GitCommandError, - GitCommandNotFound -) +from .exc import GitCommandError, GitCommandNotFound from .util import ( LazyMixin, stream_copy, @@ -40,8 +32,24 @@ # typing --------------------------------------------------------------------------- -from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping, - Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload) +from typing import ( + Any, + AnyStr, + BinaryIO, + Callable, + Dict, + IO, + Iterator, + List, + Mapping, + Sequence, + TYPE_CHECKING, + TextIO, + Tuple, + Union, + cast, + overload, +) from git.types import PathLike, Literal, TBD @@ -52,15 +60,26 @@ # --------------------------------------------------------------------------------- -execute_kwargs = {'istream', 'with_extended_output', - 'with_exceptions', 'as_process', 'stdout_as_string', - 'output_stream', 'with_stdout', 'kill_after_timeout', - 'universal_newlines', 'shell', 'env', 'max_chunk_size', 'strip_newline_in_stdout'} +execute_kwargs = { + "istream", + "with_extended_output", + "with_exceptions", + "as_process", + "stdout_as_string", + "output_stream", + "with_stdout", + "kill_after_timeout", + "universal_newlines", + "shell", + "env", + "max_chunk_size", + "strip_newline_in_stdout", +} log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) -__all__ = ('Git',) +__all__ = ("Git",) # ============================================================================== @@ -69,18 +88,20 @@ # Documentation ## @{ -def handle_process_output(process: 'Git.AutoInterrupt' | Popen, - stdout_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None], - Callable[[bytes, 'Repo', 'DiffIndex'], None]], - stderr_handler: Union[None, - Callable[[AnyStr], None], - Callable[[List[AnyStr]], None]], - finalizer: Union[None, - Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None, - decode_streams: bool = True, - kill_after_timeout: Union[None, float] = None) -> None: + +def handle_process_output( + process: "Git.AutoInterrupt" | Popen, + stdout_handler: Union[ + None, + Callable[[AnyStr], None], + Callable[[List[AnyStr]], None], + Callable[[bytes, "Repo", "DiffIndex"], None], + ], + stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], + finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None, + decode_streams: bool = True, + kill_after_timeout: Union[None, float] = None, +) -> None: """Registers for notifications to learn that process output is ready to read, and dispatches lines to the respective line handlers. This function returns once the finalizer returns @@ -101,8 +122,13 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen, should be killed. """ # Use 2 "pump" threads and wait for both to finish. - def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool, - handler: Union[None, Callable[[Union[bytes, str]], None]]) -> None: + def pump_stream( + cmdline: List[str], + name: str, + stream: Union[BinaryIO, TextIO], + is_decode: bool, + handler: Union[None, Callable[[Union[bytes, str]], None]], + ) -> None: try: for line in stream: if handler: @@ -117,18 +143,18 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") if "I/O operation on closed file" not in str(ex): # Only reraise if the error was not due to the stream closing - raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex + raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex finally: stream.close() - if hasattr(process, 'proc'): - process = cast('Git.AutoInterrupt', process) - cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, 'args', '') + if hasattr(process, "proc"): + process = cast("Git.AutoInterrupt", process) + cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") p_stdout = process.proc.stdout if process.proc else None p_stderr = process.proc.stderr if process.proc else None else: process = cast(Popen, process) - cmdline = getattr(process, 'args', '') + cmdline = getattr(process, "args", "") p_stdout = process.stdout p_stderr = process.stderr @@ -137,15 +163,14 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] if p_stdout: - pumps.append(('stdout', p_stdout, stdout_handler)) + pumps.append(("stdout", p_stdout, stdout_handler)) if p_stderr: - pumps.append(('stderr', p_stderr, stderr_handler)) + pumps.append(("stderr", p_stderr, stderr_handler)) threads: List[threading.Thread] = [] for name, stream, handler in pumps: - t = threading.Thread(target=pump_stream, - args=(cmdline, name, stream, decode_streams, handler)) + t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) t.daemon = True t.start() threads.append(t) @@ -158,12 +183,14 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], if isinstance(process, Git.AutoInterrupt): process._terminate() else: # Don't want to deal with the other case - raise RuntimeError("Thread join() timed out in cmd.handle_process_output()." - f" kill_after_timeout={kill_after_timeout} seconds") + raise RuntimeError( + "Thread join() timed out in cmd.handle_process_output()." + f" kill_after_timeout={kill_after_timeout} seconds" + ) if stderr_handler: error_str: Union[str, bytes] = ( - "error: process killed because it timed out." - f" kill_after_timeout={kill_after_timeout} seconds") + "error: process killed because it timed out." f" kill_after_timeout={kill_after_timeout} seconds" + ) if not decode_streams and isinstance(p_stderr, BinaryIO): # Assume stderr_handler needs binary input error_str = cast(str, error_str) @@ -179,7 +206,7 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], def dashify(string: str) -> str: - return string.replace('_', '-') + return string.replace("_", "-") def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]: @@ -192,6 +219,7 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc for k in excluded: setattr(self, k, None) + ## -- End Utilities -- @} @@ -200,8 +228,9 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc ## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, # see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal -PROC_CREATIONFLAGS = (CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] - if is_win else 0) # mypy error if not windows +PROC_CREATIONFLAGS = ( + CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0 # type: ignore[attr-defined] +) # mypy error if not windows class Git(LazyMixin): @@ -220,10 +249,18 @@ class Git(LazyMixin): of the command to stdout. Set its value to 'full' to see details about the returned values. """ - __slots__ = ("_working_dir", "cat_file_all", "cat_file_header", "_version_info", - "_git_options", "_persistent_git_options", "_environment") - _excluded_ = ('cat_file_all', 'cat_file_header', '_version_info') + __slots__ = ( + "_working_dir", + "cat_file_all", + "cat_file_header", + "_version_info", + "_git_options", + "_persistent_git_options", + "_environment", + ) + + _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) @@ -233,7 +270,7 @@ def __setstate__(self, d: Dict[str, Any]) -> None: # CONFIGURATION - git_exec_name = "git" # default that should work on linux and windows + git_exec_name = "git" # default that should work on linux and windows # Enables debugging of GitPython's git commands GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) @@ -282,13 +319,18 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: # warn or raise exception if test failed if not has_git: - err = dedent("""\ + err = ( + dedent( + """\ Bad git executable. The git executable must be specified in one of the following ways: - be included in your $PATH - be set via $%s - explicitly set via git.refresh() - """) % cls._git_exec_env_var + """ + ) + % cls._git_exec_env_var + ) # revert to whatever the old_git was cls.GIT_PYTHON_GIT_EXECUTABLE = old_git @@ -314,7 +356,9 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: if mode in quiet: pass elif mode in warn or mode in error: - err = dedent("""\ + err = ( + dedent( + """\ %s All git commands will error until this is rectified. @@ -326,32 +370,42 @@ def refresh(cls, path: Union[None, PathLike] = None) -> bool: Example: export %s=%s - """) % ( - err, - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error), - cls._refresh_env_var, - quiet[0]) + """ + ) + % ( + err, + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + cls._refresh_env_var, + quiet[0], + ) + ) if mode in warn: print("WARNING: %s" % err) else: raise ImportError(err) else: - err = dedent("""\ + err = ( + dedent( + """\ %s environment variable has been set but it has been set with an invalid value. Use only the following values: - %s: for no warning or exception - %s: for a printed warning - %s: for a raised exception - """) % ( - cls._refresh_env_var, - "|".join(quiet), - "|".join(warn), - "|".join(error)) + """ + ) + % ( + cls._refresh_env_var, + "|".join(quiet), + "|".join(warn), + "|".join(error), + ) + ) raise ImportError(err) # we get here if this was the init refresh and the refresh mode @@ -395,7 +449,7 @@ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: Hence we undo the escaping just to be sure. """ url = os.path.expandvars(url) - if url.startswith('~'): + if url.startswith("~"): url = os.path.expanduser(url) url = url.replace("\\\\", "\\").replace("\\", "/") return url @@ -441,7 +495,7 @@ def _terminate(self) -> None: log.info("Ignored error after process had died: %r", ex) # can be that nothing really exists anymore ... - if os is None or getattr(os, 'kill', None) is None: + if os is None or getattr(os, "kill", None) is None: return None # try to kill it @@ -458,7 +512,10 @@ def _terminate(self) -> None: # we simply use the shell and redirect to nul. Its slower than CreateProcess, question # is whether we really want to see all these messages. Its annoying no matter what. if is_win: - call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True) + call( + ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), + shell=True, + ) # END exception handling def __del__(self) -> None: @@ -468,15 +525,15 @@ def __getattr__(self, attr: str) -> Any: return getattr(self.proc, attr) # TODO: Bad choice to mimic `proc.wait()` but with different args. - def wait(self, stderr: Union[None, str, bytes] = b'') -> int: + def wait(self, stderr: Union[None, str, bytes] = b"") -> int: """Wait for the process and return its status code. :param stderr: Previously read value of stderr, in case stderr is already closed. :warn: may deadlock if output or error pipes are used and not handled separately. :raise GitCommandError: if the return status is not 0""" if stderr is None: - stderr_b = b'' - stderr_b = force_bytes(data=stderr, encoding='utf-8') + stderr_b = b"" + stderr_b = force_bytes(data=stderr, encoding="utf-8") status: Union[int, None] if self.proc is not None: status = self.proc.wait() @@ -490,15 +547,15 @@ def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> byte try: return stderr_b + force_bytes(stream.read()) except ValueError: - return stderr_b or b'' + return stderr_b or b"" else: - return stderr_b or b'' + return stderr_b or b"" # END status handling if status != 0: errstr = read_all_from_possibly_closed_stream(p_stderr) - log.debug('AutoInterrupt wait stderr: %r' % (errstr,)) + log.debug("AutoInterrupt wait stderr: %r" % (errstr,)) raise GitCommandError(remove_password_if_present(self.args), status, errstr) return status @@ -513,12 +570,12 @@ class CatFileContentStream(object): If not all data is read to the end of the objects's lifetime, we read the rest to assure the underlying stream continues to work""" - __slots__: Tuple[str, ...] = ('_stream', '_nbr', '_size') + __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size") def __init__(self, size: int, stream: IO[bytes]) -> None: self._stream = stream self._size = size - self._nbr = 0 # num bytes read + self._nbr = 0 # num bytes read # special case: if the object is empty, has null bytes, get the # final newline right away. @@ -529,7 +586,7 @@ def __init__(self, size: int, stream: IO[bytes]) -> None: def read(self, size: int = -1) -> bytes: bytes_left = self._size - self._nbr if bytes_left == 0: - return b'' + return b"" if size > -1: # assure we don't try to read past our limit size = min(bytes_left, size) @@ -542,13 +599,13 @@ def read(self, size: int = -1) -> bytes: # check for depletion, read our final byte to make the stream usable by others if self._size - self._nbr == 0: - self._stream.read(1) # final newline + self._stream.read(1) # final newline # END finish reading return data def readline(self, size: int = -1) -> bytes: if self._nbr == self._size: - return b'' + return b"" # clamp size to lowest allowed value bytes_left = self._size - self._nbr @@ -589,7 +646,7 @@ def readlines(self, size: int = -1) -> List[bytes]: return out # skipcq: PYL-E0301 - def __iter__(self) -> 'Git.CatFileContentStream': + def __iter__(self) -> "Git.CatFileContentStream": return self def __next__(self) -> bytes: @@ -634,7 +691,7 @@ def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was an object. :return: Callable object that will execute call _call_process with your arguments.""" - if name[0] == '_': + if name[0] == "_": return LazyMixin.__getattr__(self, name) return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) @@ -649,28 +706,28 @@ def set_persistent_git_options(self, **kwargs: Any) -> None: the subcommand. """ - self._persistent_git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) def _set_cache_(self, attr: str) -> None: - if attr == '_version_info': + if attr == "_version_info": # We only use the first 4 numbers, as everything else could be strings in fact (on windows) - process_version = self._call_process('version') # should be as default *args and **kwargs used - version_numbers = process_version.split(' ')[2] + process_version = self._call_process("version") # should be as default *args and **kwargs used + version_numbers = process_version.split(" ")[2] - self._version_info = cast(Tuple[int, int, int, int], - tuple(int(n) for n in version_numbers.split('.')[:4] if n.isdigit()) - ) + self._version_info = cast( + Tuple[int, int, int, int], + tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()), + ) else: super(Git, self)._set_cache_(attr) # END handle version info - @ property + @property def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" return self._working_dir - @ property + @property def version_info(self) -> Tuple[int, int, int, int]: """ :return: tuple(int, int, int, int) tuple with integers representing the major, minor @@ -678,69 +735,70 @@ def version_info(self) -> Tuple[int, int, int, int]: This value is generated on demand and is cached""" return self._version_info - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[True] - ) -> 'AutoInterrupt': + @overload + def execute(self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]) -> "AutoInterrupt": ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[True] - ) -> Union[str, Tuple[int, str, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[True], + ) -> Union[str, Tuple[int, str, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - as_process: Literal[False] = False, - stdout_as_string: Literal[False] = False - ) -> Union[bytes, Tuple[int, bytes, str]]: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + as_process: Literal[False] = False, + stdout_as_string: Literal[False] = False, + ) -> Union[bytes, Tuple[int, bytes, str]]: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[True] - ) -> str: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[True], + ) -> str: ... - @ overload - def execute(self, - command: Union[str, Sequence[Any]], - *, - with_extended_output: Literal[False], - as_process: Literal[False], - stdout_as_string: Literal[False] - ) -> bytes: + @overload + def execute( + self, + command: Union[str, Sequence[Any]], + *, + with_extended_output: Literal[False], + as_process: Literal[False], + stdout_as_string: Literal[False], + ) -> bytes: ... - def execute(self, - command: Union[str, Sequence[Any]], - istream: Union[None, BinaryIO] = None, - with_extended_output: bool = False, - with_exceptions: bool = True, - as_process: bool = False, - output_stream: Union[None, BinaryIO] = None, - stdout_as_string: bool = True, - kill_after_timeout: Union[None, float] = None, - with_stdout: bool = True, - universal_newlines: bool = False, - shell: Union[None, bool] = None, - env: Union[None, Mapping[str, str]] = None, - max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, - strip_newline_in_stdout: bool = True, - **subprocess_kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: + def execute( + self, + command: Union[str, Sequence[Any]], + istream: Union[None, BinaryIO] = None, + with_extended_output: bool = False, + with_exceptions: bool = True, + as_process: bool = False, + output_stream: Union[None, BinaryIO] = None, + stdout_as_string: bool = True, + kill_after_timeout: Union[None, float] = None, + with_stdout: bool = True, + universal_newlines: bool = False, + shell: Union[None, bool] = None, + env: Union[None, Mapping[str, str]] = None, + max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, + strip_newline_in_stdout: bool = True, + **subprocess_kwargs: Any, + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: """Handles executing the command on the shell and consumes and returns the returned information (stdout) @@ -831,8 +889,8 @@ def execute(self, you must update the execute_kwargs tuple housed in this module.""" # Remove password for the command if present redacted_command = remove_password_if_present(command) - if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != 'full' or as_process): - log.info(' '.join(redacted_command)) + if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): + log.info(" ".join(redacted_command)) # Allow the user to have the command executed in their working dir. try: @@ -858,33 +916,41 @@ def execute(self, if is_win: cmd_not_found_exception = OSError if kill_after_timeout is not None: - raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.') + raise GitCommandError( + redacted_command, + '"kill_after_timeout" feature is not supported on Windows.', + ) else: cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable # end handle - stdout_sink = (PIPE - if with_stdout - else getattr(subprocess, 'DEVNULL', None) or open(os.devnull, 'wb')) + stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") istream_ok = "None" if istream: istream_ok = "" - log.debug("Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", - redacted_command, cwd, universal_newlines, shell, istream_ok) + log.debug( + "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", + redacted_command, + cwd, + universal_newlines, + shell, + istream_ok, + ) try: - proc = Popen(command, - env=env, - cwd=cwd, - bufsize=-1, - stdin=istream or DEVNULL, - stderr=PIPE, - stdout=stdout_sink, - shell=shell is not None and shell or self.USE_SHELL, - close_fds=is_posix, # unsupported on windows - universal_newlines=universal_newlines, - creationflags=PROC_CREATIONFLAGS, - **subprocess_kwargs - ) + proc = Popen( + command, + env=env, + cwd=cwd, + bufsize=-1, + stdin=istream or DEVNULL, + stderr=PIPE, + stdout=stdout_sink, + shell=shell is not None and shell or self.USE_SHELL, + close_fds=is_posix, # unsupported on windows + universal_newlines=universal_newlines, + creationflags=PROC_CREATIONFLAGS, + **subprocess_kwargs, + ) except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err @@ -897,9 +963,12 @@ def execute(self, return self.AutoInterrupt(proc, command) def _kill_process(pid: int) -> None: - """ Callback method to kill a process. """ - p = Popen(['ps', '--ppid', str(pid)], stdout=PIPE, - creationflags=PROC_CREATIONFLAGS) + """Callback method to kill a process.""" + p = Popen( + ["ps", "--ppid", str(pid)], + stdout=PIPE, + creationflags=PROC_CREATIONFLAGS, + ) child_pids = [] if p.stdout is not None: for line in p.stdout: @@ -909,19 +978,20 @@ def _kill_process(pid: int) -> None: child_pids.append(int(local_pid)) try: # Windows does not have SIGKILL, so use SIGTERM instead - sig = getattr(signal, 'SIGKILL', signal.SIGTERM) + sig = getattr(signal, "SIGKILL", signal.SIGTERM) os.kill(pid, sig) for child_pid in child_pids: try: os.kill(child_pid, sig) except OSError: pass - kill_check.set() # tell the main routine that the process was killed + kill_check.set() # tell the main routine that the process was killed except OSError: # It is possible that the process gets completed in the duration after timeout # happens and before we try to kill the process. pass return + # end if kill_after_timeout is not None: @@ -930,8 +1000,8 @@ def _kill_process(pid: int) -> None: # Wait for the process to return status = 0 - stdout_value: Union[str, bytes] = b'' - stderr_value: Union[str, bytes] = b'' + stdout_value: Union[str, bytes] = b"" + stderr_value: Union[str, bytes] = b"" newline = "\n" if universal_newlines else b"\n" try: if output_stream is None: @@ -941,8 +1011,10 @@ def _kill_process(pid: int) -> None: if kill_after_timeout is not None: watchdog.cancel() if kill_check.is_set(): - stderr_value = ('Timeout: the command "%s" did not complete in %d ' - 'secs.' % (" ".join(redacted_command), kill_after_timeout)) + stderr_value = 'Timeout: the command "%s" did not complete in %d ' "secs." % ( + " ".join(redacted_command), + kill_after_timeout, + ) if not universal_newlines: stderr_value = stderr_value.encode(defenc) # strip trailing "\n" @@ -958,7 +1030,7 @@ def _kill_process(pid: int) -> None: stdout_value = proc.stdout.read() stderr_value = proc.stderr.read() # strip trailing "\n" - if stderr_value.endswith(newline): # type: ignore + if stderr_value.endswith(newline): # type: ignore stderr_value = stderr_value[:-1] status = proc.wait() # END stdout handling @@ -966,16 +1038,22 @@ def _kill_process(pid: int) -> None: proc.stdout.close() proc.stderr.close() - if self.GIT_PYTHON_TRACE == 'full': + if self.GIT_PYTHON_TRACE == "full": cmdstr = " ".join(redacted_command) def as_text(stdout_value: Union[bytes, str]) -> str: - return not output_stream and safe_decode(stdout_value) or '' + return not output_stream and safe_decode(stdout_value) or "" + # end if stderr_value: - log.info("%s -> %d; stdout: '%s'; stderr: '%s'", - cmdstr, status, as_text(stdout_value), safe_decode(stderr_value)) + log.info( + "%s -> %d; stdout: '%s'; stderr: '%s'", + cmdstr, + status, + as_text(stdout_value), + safe_decode(stderr_value), + ) elif stdout_value: log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) else: @@ -1081,7 +1159,7 @@ def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: return outlist - def __call__(self, **kwargs: Any) -> 'Git': + def __call__(self, **kwargs: Any) -> "Git": """Specify command line options to the git executable for a subcommand call @@ -1093,29 +1171,33 @@ def __call__(self, **kwargs: Any) -> 'Git': ``Examples``:: git(work_tree='/tmp').difftool()""" - self._git_options = self.transform_kwargs( - split_single_char_options=True, **kwargs) + self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) return self @overload - def _call_process(self, method: str, *args: None, **kwargs: None - ) -> str: + def _call_process(self, method: str, *args: None, **kwargs: None) -> str: ... # if no args given, execute called with all defaults @overload - def _call_process(self, method: str, - istream: int, - as_process: Literal[True], - *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': ... + def _call_process( + self, + method: str, + istream: int, + as_process: Literal[True], + *args: Any, + **kwargs: Any, + ) -> "Git.AutoInterrupt": + ... @overload - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... - def _call_process(self, method: str, *args: Any, **kwargs: Any - ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']: + def _call_process( + self, method: str, *args: Any, **kwargs: Any + ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: """Run the given git command with the specified arguments and return the result as a String @@ -1145,13 +1227,13 @@ def _call_process(self, method: str, *args: Any, **kwargs: Any :return: Same as ``execute`` if no args given used execute default (esp. as_process = False, stdout_as_string = True) - and return str """ + and return str""" # Handle optional arguments prior to calling transform_kwargs # otherwise these'll end up in args, which is bad. exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} - insert_after_this_arg = opts_kwargs.pop('insert_kwargs_after', None) + insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) # Prepare the argument list @@ -1164,10 +1246,12 @@ def _call_process(self, method: str, *args: Any, **kwargs: Any try: index = ext_args.index(insert_after_this_arg) except ValueError as err: - raise ValueError("Couldn't find argument '%s' in args %s to insert cmd options after" - % (insert_after_this_arg, str(ext_args))) from err + raise ValueError( + "Couldn't find argument '%s' in args %s to insert cmd options after" + % (insert_after_this_arg, str(ext_args)) + ) from err # end handle error - args_list = ext_args[:index + 1] + opt_args + ext_args[index + 1:] + args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] # end handle opts_kwargs call = [self.GIT_PYTHON_GIT_EXECUTABLE] @@ -1211,9 +1295,9 @@ def _prepare_ref(self, ref: AnyStr) -> bytes: # required for command to separate refs on stdin, as bytes if isinstance(ref, bytes): # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text - refstr: str = ref.decode('ascii') + refstr: str = ref.decode("ascii") elif not isinstance(ref, str): - refstr = str(ref) # could be ref-object + refstr = str(ref) # could be ref-object else: refstr = ref @@ -1221,8 +1305,7 @@ def _prepare_ref(self, ref: AnyStr) -> bytes: refstr += "\n" return refstr.encode(defenc) - def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any - ) -> 'Git.AutoInterrupt': + def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": cur_val = getattr(self, attr_name) if cur_val is not None: return cur_val @@ -1232,10 +1315,10 @@ def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwarg cmd = self._call_process(cmd_name, *args, **options) setattr(self, attr_name, cmd) - cmd = cast('Git.AutoInterrupt', cmd) + cmd = cast("Git.AutoInterrupt", cmd) return cmd - def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[str, str, int]: + def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: if cmd.stdin and cmd.stdout: cmd.stdin.write(self._prepare_ref(ref)) cmd.stdin.flush() @@ -1244,7 +1327,7 @@ def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[st raise ValueError("cmd stdin was empty") def get_object_header(self, ref: str) -> Tuple[str, str, int]: - """ Use this method to quickly examine the type and size of the object behind + """Use this method to quickly examine the type and size of the object behind the given ref. :note: The method will only suffer from the costs of command invocation @@ -1255,16 +1338,16 @@ def get_object_header(self, ref: str) -> Tuple[str, str, int]: return self.__get_object_header(cmd, ref) def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: - """ As get_object_header, but returns object data as well + """As get_object_header, but returns object data as well :return: (hexsha, type_string, size_as_int,data_string) :note: not threadsafe""" hexsha, typename, size, stream = self.stream_object_data(ref) data = stream.read(size) - del(stream) + del stream return (hexsha, typename, size, data) - def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']: - """ As get_object_header, but returns the data as a stream + def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: + """As get_object_header, but returns the data as a stream :return: (hexsha, type_string, size_as_int, stream) :note: This method is not threadsafe, you need one independent Command instance per thread to be safe !""" @@ -1273,7 +1356,7 @@ def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileConte cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) - def clear_cache(self) -> 'Git': + def clear_cache(self) -> "Git": """Clear all kinds of internal caches to release resources. Currently persistent commands will be interrupted. diff --git a/git/compat.py b/git/compat.py index 988c04eff..e7ef28c30 100644 --- a/git/compat.py +++ b/git/compat.py @@ -12,8 +12,8 @@ import sys from gitdb.utils.encoding import ( - force_bytes, # @UnusedImport - force_text # @UnusedImport + force_bytes, # @UnusedImport + force_text, # @UnusedImport ) # typing -------------------------------------------------------------------- @@ -29,21 +29,24 @@ Union, overload, ) + # --------------------------------------------------------------------------- -is_win: bool = (os.name == 'nt') -is_posix = (os.name == 'posix') -is_darwin = (os.name == 'darwin') +is_win: bool = os.name == "nt" +is_posix = os.name == "posix" +is_darwin = os.name == "darwin" defenc = sys.getfilesystemencoding() @overload -def safe_decode(s: None) -> None: ... +def safe_decode(s: None) -> None: + ... @overload -def safe_decode(s: AnyStr) -> str: ... +def safe_decode(s: AnyStr) -> str: + ... def safe_decode(s: Union[AnyStr, None]) -> Optional[str]: @@ -51,19 +54,21 @@ def safe_decode(s: Union[AnyStr, None]) -> Optional[str]: if isinstance(s, str): return s elif isinstance(s, bytes): - return s.decode(defenc, 'surrogateescape') + return s.decode(defenc, "surrogateescape") elif s is None: return None else: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) @overload -def safe_encode(s: None) -> None: ... +def safe_encode(s: None) -> None: + ... @overload -def safe_encode(s: AnyStr) -> bytes: ... +def safe_encode(s: AnyStr) -> bytes: + ... def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: @@ -75,15 +80,17 @@ def safe_encode(s: Optional[AnyStr]) -> Optional[bytes]: elif s is None: return None else: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) @overload -def win_encode(s: None) -> None: ... +def win_encode(s: None) -> None: + ... @overload -def win_encode(s: AnyStr) -> bytes: ... +def win_encode(s: AnyStr) -> bytes: + ... def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: @@ -93,5 +100,5 @@ def win_encode(s: Optional[AnyStr]) -> Optional[bytes]: elif isinstance(s, bytes): return s elif s is not None: - raise TypeError('Expected bytes or text, but got %r' % (s,)) + raise TypeError("Expected bytes or text, but got %r" % (s,)) return None diff --git a/git/config.py b/git/config.py index 1ac3c9cec..5f07cb002 100644 --- a/git/config.py +++ b/git/config.py @@ -30,8 +30,20 @@ # typing------------------------------------------------------- -from typing import (Any, Callable, Generic, IO, List, Dict, Sequence, - TYPE_CHECKING, Tuple, TypeVar, Union, cast) +from typing import ( + Any, + Callable, + Generic, + IO, + List, + Dict, + Sequence, + TYPE_CHECKING, + Tuple, + TypeVar, + Union, + cast, +) from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T @@ -39,23 +51,25 @@ from git.repo.base import Repo from io import BytesIO -T_ConfigParser = TypeVar('T_ConfigParser', bound='GitConfigParser') -T_OMD_value = TypeVar('T_OMD_value', str, bytes, int, float, bool) +T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") +T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) if sys.version_info[:3] < (3, 7, 2): # typing.Ordereddict not added until py 3.7.2 from collections import OrderedDict + OrderedDict_OMD = OrderedDict else: from typing import OrderedDict + OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] # ------------------------------------------------------------- -__all__ = ('GitConfigParser', 'SectionConstraint') +__all__ = ("GitConfigParser", "SectionConstraint") -log = logging.getLogger('git.config') +log = logging.getLogger("git.config") log.addHandler(logging.NullHandler()) # invariants @@ -72,11 +86,12 @@ class MetaParserBuilder(abc.ABCMeta): """Utility class wrapping base-class methods into decorators that assure read-only properties""" - def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> 'MetaParserBuilder': + + def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": """ Equip all base-class methods with a needs_values decorator, and all non-const methods with a set_dirty_and_flush_changes decorator in addition to that.""" - kmm = '_mutating_methods_' + kmm = "_mutating_methods_" if kmm in clsdict: mutating_methods = clsdict[kmm] for base in bases: @@ -102,9 +117,10 @@ def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: """Returns method assuring we read values (on demand) before we try to access them""" @wraps(func) - def assure_data_present(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T: + def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: self.read() return func(self, *args, **kwargs) + # END wrapper method return assure_data_present @@ -114,11 +130,12 @@ def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[. If so, the instance will be set dirty. Additionally, we flush the changes right to disk""" - def flush_changes(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T: + def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: rval = non_const_func(self, *args, **kwargs) self._dirty = True self.write() return rval + # END wrapper method flush_changes.__name__ = non_const_func.__name__ return flush_changes @@ -133,9 +150,21 @@ class SectionConstraint(Generic[T_ConfigParser]): :note: If used as a context manager, will release the wrapped ConfigParser.""" + __slots__ = ("_config", "_section_name") - _valid_attrs_ = ("get_value", "set_value", "get", "set", "getint", "getfloat", "getboolean", "has_option", - "remove_section", "remove_option", "options") + _valid_attrs_ = ( + "get_value", + "set_value", + "get", + "set", + "getint", + "getfloat", + "getboolean", + "has_option", + "remove_section", + "remove_option", + "options", + ) def __init__(self, config: T_ConfigParser, section: str) -> None: self._config = config @@ -166,7 +195,7 @@ def release(self) -> None: """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance""" return self._config.release() - def __enter__(self) -> 'SectionConstraint[T_ConfigParser]': + def __enter__(self) -> "SectionConstraint[T_ConfigParser]": self._config.__enter__() return self @@ -228,7 +257,7 @@ def get_config_path(config_level: Lit_config_levels) -> str: if config_level == "system": return "/etc/gitconfig" elif config_level == "user": - config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", '~'), ".config") + config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) @@ -236,8 +265,10 @@ def get_config_path(config_level: Lit_config_levels) -> str: raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") else: # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs - assert_never(config_level, # type: ignore[unreachable] - ValueError(f"Invalid configuration level: {config_level!r}")) + assert_never( + config_level, # type: ignore[unreachable] + ValueError(f"Invalid configuration level: {config_level!r}"), + ) class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): @@ -258,30 +289,34 @@ class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): must match perfectly. If used as a context manager, will release the locked file.""" - #{ Configuration + # { Configuration # The lock type determines the type of lock to use in new configuration readers. # They must be compatible to the LockFile interface. # A suitable alternative would be the BlockingLockFile t_lock = LockFile - re_comment = re.compile(r'^\s*[#;]') + re_comment = re.compile(r"^\s*[#;]") - #} END configuration + # } END configuration - optvalueonly_source = r'\s*(?P