diff --git a/conda_smithy/feedstock_io.py b/conda_smithy/feedstock_io.py index 473c894c0..dee595961 100644 --- a/conda_smithy/feedstock_io.py +++ b/conda_smithy/feedstock_io.py @@ -7,22 +7,23 @@ def get_repo(path, search_parent_directories=True): repo = None try: - import git - - repo = git.Repo( - path, search_parent_directories=search_parent_directories - ) + import pygit2 + + if search_parent_directories: + path = pygit2.discover_repository(path) + if path is not None: + repo = pygit2.Repository( + path, pygit2.enums.RepositoryOpenFlag.NO_SEARCH + ) except ImportError: pass - except git.InvalidGitRepositoryError: - pass return repo def get_repo_root(path): try: - return get_repo(path).working_tree_dir + return get_repo(path).workdir.rstrip(os.path.sep) except AttributeError: return None @@ -32,8 +33,13 @@ def set_exe_file(filename, set_exe=True): repo = get_repo(filename) if repo: - mode = "+x" if set_exe else "-x" - repo.git.execute(["git", "update-index", f"--chmod={mode}", filename]) + index_entry = repo.index[os.path.relpath(filename, repo.workdir)] + if set_exe: + index_entry.mode |= all_execute_permissions + else: + index_entry.mode &= ~all_execute_permissions + repo.index.add(index_entry) + repo.index.write() mode = os.stat(filename).st_mode if set_exe: @@ -54,7 +60,8 @@ def write_file(filename): repo = get_repo(filename) if repo: - repo.index.add([filename]) + repo.index.add(os.path.relpath(filename, repo.workdir)) + repo.index.write() def touch_file(filename): @@ -68,7 +75,8 @@ def remove_file_or_dir(filename): repo = get_repo(filename) if repo: - repo.index.remove([filename], r=True) + repo.index.remove_all([f"filename/**"]) + repo.index.write() shutil.rmtree(filename) @@ -77,7 +85,11 @@ def remove_file(filename): repo = get_repo(filename) if repo: - repo.index.remove([filename]) + try: + repo.index.remove(os.path.relpath(filename, repo.workdir)) + repo.index.write() + except IOError: # this is specifically "file not in index" + pass os.remove(filename) @@ -106,4 +118,5 @@ def copy_file(src, dst): repo = get_repo(dst) if repo: - repo.index.add([dst]) + repo.index.add(os.path.relpath(dst, repo.workdir)) + repo.index.write() diff --git a/environment.yml b/environment.yml index 2c14cd23c..322e291c3 100644 --- a/environment.yml +++ b/environment.yml @@ -22,6 +22,7 @@ dependencies: - requests - pycryptodome - gitpython + - pygit2 - pygithub >=2,<3 - ruamel.yaml - conda-forge-pinning diff --git a/tests/test_feedstock_io.py b/tests/test_feedstock_io.py index f471b6929..b8c188cac 100644 --- a/tests/test_feedstock_io.py +++ b/tests/test_feedstock_io.py @@ -8,8 +8,7 @@ import tempfile import unittest -import git -from git.index.typ import BlobFilter +import pygit2 import conda_smithy.feedstock_io as fio @@ -27,7 +26,7 @@ def parameterize(): ]: for get_repo in [ lambda tmp_dir: None, - lambda tmp_dir: git.Repo.init(tmp_dir), + lambda tmp_dir: pygit2.init_repository(tmp_dir), ]: try: tmp_dir = tempfile.mkdtemp() @@ -64,7 +63,7 @@ def test_repo(self): self.assertTrue(fio.get_repo(pathfunc(tmp_dir)) is None) else: self.assertIsInstance( - fio.get_repo(pathfunc(tmp_dir)), git.Repo + fio.get_repo(pathfunc(tmp_dir)), pygit2.Repository ) possible_repo_subdir = os.path.join( tmp_dir, @@ -85,34 +84,34 @@ def test_set_exe_file(self): for set_exe in [True, False]: for tmp_dir, repo, pathfunc in parameterize(): - filename = "test.txt" - filename = os.path.join(tmp_dir, filename) + basename = "test.txt" + filename = os.path.join(tmp_dir, basename) with open(filename, "w", encoding="utf-8", newline="\n") as fh: fh.write("") if repo is not None: - repo.index.add([filename]) + repo.index.add(basename) + repo.index.write() fio.set_exe_file(pathfunc(filename), set_exe) file_mode = os.stat(filename).st_mode self.assertEqual(file_mode & set_mode, int(set_exe) * set_mode) if repo is not None: - blob = next(repo.index.iter_blobs(BlobFilter(filename)))[1] + repo.index.read() + blob = repo.index[basename] self.assertEqual( blob.mode & set_mode, int(set_exe) * set_mode ) def test_write_file(self): for tmp_dir, repo, pathfunc in parameterize(): - for filename in ["test.txt", "dir1/dir2/test.txt"]: - filename = os.path.join(tmp_dir, filename) + for basename in ["test.txt", "dir1/dir2/test.txt"]: + filename = os.path.join(tmp_dir, basename) write_text = "text" with fio.write_file(pathfunc(filename)) as fh: fh.write(write_text) - if repo is not None: - repo.index.add([filename]) read_text = "" with open(filename, encoding="utf-8") as fh: @@ -121,15 +120,16 @@ def test_write_file(self): self.assertEqual(write_text, read_text) if repo is not None: - blob = next(repo.index.iter_blobs(BlobFilter(filename)))[1] - read_text = blob.data_stream[3].read().decode("utf-8") + repo.index.read() + blob = repo.index[basename] + read_text = repo[blob.id].data.decode("utf-8") self.assertEqual(write_text, read_text) def test_touch_file(self): for tmp_dir, repo, pathfunc in parameterize(): - for filename in ["test.txt", "dir1/dir2/test.txt"]: - filename = os.path.join(tmp_dir, filename) + for basename in ["test.txt", "dir1/dir2/test.txt"]: + filename = os.path.join(tmp_dir, basename) fio.touch_file(pathfunc(filename)) @@ -140,33 +140,33 @@ def test_touch_file(self): self.assertEqual("", read_text) if repo is not None: - blob = next(repo.index.iter_blobs(BlobFilter(filename)))[1] - read_text = blob.data_stream[3].read().decode("utf-8") + repo.index.read() + blob = repo.index[basename] + read_bytes = repo[blob.id].data - self.assertEqual("", read_text) + self.assertEqual(b"", read_bytes) def test_remove_file(self): for tmp_dir, repo, pathfunc in parameterize(): - for filename in ["test.txt", "dir1/dir2/test.txt"]: - dirname = os.path.dirname(filename) + for basename in ["test.txt", "dir1/dir2/test.txt"]: + dirname = os.path.dirname(basename) if dirname and not os.path.exists(dirname): os.makedirs(dirname) - filename = os.path.join(tmp_dir, filename) + filename = os.path.join(tmp_dir, basename) with open(filename, "w", encoding="utf-8", newline="\n") as fh: fh.write("") if repo is not None: - repo.index.add([filename]) + repo.index.add(basename) + repo.index.write() self.assertTrue(os.path.exists(filename)) if dirname: self.assertTrue(os.path.exists(dirname)) self.assertTrue(os.path.exists(os.path.dirname(dirname))) if repo is not None: - self.assertTrue( - list(repo.index.iter_blobs(BlobFilter(filename))) - ) + self.assertIsNotNone(repo.index[basename]) fio.remove_file(pathfunc(filename)) @@ -175,17 +175,16 @@ def test_remove_file(self): self.assertFalse(os.path.exists(dirname)) self.assertFalse(os.path.exists(os.path.dirname(dirname))) if repo is not None: - self.assertFalse( - list(repo.index.iter_blobs(BlobFilter(filename))) - ) + repo.index.read() + self.assertRaises(KeyError, lambda: repo.index[basename]) def test_copy_file(self): for tmp_dir, repo, pathfunc in parameterize(): - filename1 = "test1.txt" - filename2 = "test2.txt" + basename1 = "test1.txt" + basename2 = "test2.txt" - filename1 = os.path.join(tmp_dir, filename1) - filename2 = os.path.join(tmp_dir, filename2) + filename1 = os.path.join(tmp_dir, basename1) + filename2 = os.path.join(tmp_dir, basename2) write_text = "text" with open(filename1, "w", encoding="utf-8", newline="\n") as fh: @@ -194,18 +193,15 @@ def test_copy_file(self): self.assertTrue(os.path.exists(filename1)) self.assertFalse(os.path.exists(filename2)) if repo is not None: - self.assertFalse( - list(repo.index.iter_blobs(BlobFilter(filename2))) - ) + self.assertRaises(KeyError, lambda: repo.index[basename2]) fio.copy_file(pathfunc(filename1), pathfunc(filename2)) self.assertTrue(os.path.exists(filename1)) self.assertTrue(os.path.exists(filename2)) if repo is not None: - self.assertTrue( - list(repo.index.iter_blobs(BlobFilter(filename2))) - ) + repo.index.read() + self.assertIsNotNone(repo.index[basename2]) read_text = "" with open(filename2, encoding="utf-8") as fh: @@ -214,8 +210,8 @@ def test_copy_file(self): self.assertEqual(write_text, read_text) if repo is not None: - blob = next(repo.index.iter_blobs(BlobFilter(filename2)))[1] - read_text = blob.data_stream[3].read().decode("utf-8") + blob = repo.index[basename2] + read_text = repo[blob.id].data.decode("utf-8") self.assertEqual(write_text, read_text)