-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PMIPA-5501 Refactor repo_setup to be used by SAL (#973)
- Loading branch information
1 parent
700f51a
commit 8a39d41
Showing
1 changed file
with
176 additions
and
152 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,36 +70,14 @@ def install_package(package): | |
install_package("requests") | ||
|
||
|
||
def get_sorted_releases(path_to_run): | ||
from packaging import version | ||
|
||
command_list = ["git", "ls-remote", "--tags"] | ||
|
||
output = subprocess.check_output(command_list, cwd=path_to_run).decode() | ||
versions = [line.split("/")[-1] for line in output.split("\n") if line] | ||
versions = [v for v in versions if version.parse(v).is_devrelease == False] | ||
versions.sort(key=version.parse) | ||
|
||
return versions | ||
|
||
|
||
def get_latest_same_major(versions, base_version): | ||
from packaging import version | ||
|
||
base_major = version.parse(base_version).major | ||
same_major_versions = [v for v in versions if version.parse(v).major == base_major] | ||
|
||
if not same_major_versions: | ||
return None | ||
|
||
return max(same_major_versions, key=version.parse) | ||
|
||
|
||
class PosMambaRepoSetup: | ||
from enum import Enum | ||
|
||
repo_settings_file_name = "repo_settings.json" | ||
|
||
# Get the absolute path of the script | ||
script_dir = os.path.dirname(os.path.abspath(__file__)) | ||
message_check = "Check your settings on repo_settings.json" | ||
|
||
class CloneType(Enum): | ||
SSH = "ssh" | ||
|
@@ -130,9 +108,17 @@ def run_command(self, cmd, repo=None): | |
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL | ||
) | ||
|
||
def update_repo(self, submodule): | ||
message_check = "Check your settings on repo_settings.json" | ||
def remove_repo(self, path): | ||
if self.force or self.clone_type == PosMambaRepoSetup.CloneType.HTTPS: | ||
os.chdir(self.script_dir) | ||
print_warning(f"Removing {path} to try again...") | ||
shutil.rmtree(os.path.join(self.script_dir, path)) | ||
else: | ||
print_warning( | ||
f"Re-run repo_setup.py with -f param to fix this or remove {path} submodule dir..." | ||
) | ||
|
||
def checkout_and_pull(self, path, target_type, target, repo_cloned=True) -> bool: | ||
def add_to_gitignore(filename): | ||
""" | ||
Add a file to gitignore if it not already defined | ||
|
@@ -150,148 +136,185 @@ def add_to_gitignore(filename): | |
gitignore.write(filename + "\n") | ||
print(f"{filename} added to .gitignore.") | ||
|
||
def remove_repo(path): | ||
if self.force or self.clone_type == PosMambaRepoSetup.CloneType.HTTPS: | ||
os.chdir(self.script_dir) | ||
print_warning(f"Removing {path} to try again...") | ||
shutil.rmtree(os.path.join(self.script_dir, path)) | ||
# Changing to the repository directory | ||
full_repo_path = os.path.join(self.script_dir, path) | ||
os.chdir(full_repo_path) | ||
|
||
stash_applied = False | ||
if not repo_cloned: | ||
status_output = subprocess.check_output( | ||
["git", "status", "--porcelain"] | ||
).decode("utf-8") | ||
if status_output: | ||
_result = self.run_command( | ||
[ | ||
"git", | ||
"stash", | ||
"push", | ||
"-m", | ||
'"STASHED BY REPO_SETUP.PY"', | ||
"--include-untracked", | ||
], | ||
path, | ||
) | ||
if _result.returncode == 0: | ||
stash_applied = True | ||
|
||
fetch_command = ["git", "fetch", "origin"] | ||
result = self.run_command(fetch_command, path) | ||
|
||
if result.returncode == 0: | ||
if target_type == "tag": | ||
result = self.run_command(["git", "checkout", target, "--force"], path) | ||
elif target_type == "branch": | ||
result = self.run_command( | ||
[ | ||
"git", | ||
"checkout", | ||
"-B", | ||
target, | ||
f"origin/{target}", | ||
"--force", | ||
], | ||
path, | ||
) | ||
if result.returncode == 0: | ||
self.run_command( | ||
["git", "reset", "--hard", f"origin/{target}"], path | ||
) | ||
result = self.run_command(["git", "pull", "--force"], path) | ||
|
||
if result.returncode == 0: | ||
if stash_applied: | ||
_result = self.run_command(["git", "stash", "pop"], path) | ||
if _result.returncode: | ||
print_error("Error applying stash") | ||
|
||
print_color( | ||
f"Repo {path} updated with {target_type} {target} successfully!", | ||
GREEN, | ||
) | ||
add_to_gitignore(path) | ||
return True | ||
else: | ||
print_warning( | ||
f"Re-run repo_setup.py with -f param to fix this or remove {path} submodule dir..." | ||
print_error( | ||
f"Repo {path} update attempt with {target_type} {target} FAILED!" | ||
) | ||
self.remove_repo(path) | ||
else: | ||
print_error( | ||
f"Failed to fetch {target_type} {target} on {path}! {self.message_check}" | ||
) | ||
|
||
return False | ||
|
||
@staticmethod | ||
def get_info_by_submodule(submodule, full_repo_path=None) -> tuple: | ||
def get_latest_same_major(versions, base_version): | ||
from packaging import version | ||
|
||
base_major = version.parse(base_version).major | ||
same_major_versions = [ | ||
v for v in versions if version.parse(v).major == base_major | ||
] | ||
|
||
if not same_major_versions: | ||
return None | ||
|
||
return max(same_major_versions, key=version.parse) | ||
|
||
def get_sorted_releases(path_to_run): | ||
from packaging import version | ||
|
||
command_list = ["git", "ls-remote", "--tags"] | ||
|
||
output = subprocess.check_output(command_list, cwd=path_to_run).decode() | ||
versions = [line.split("/")[-1] for line in output.split("\n") if line] | ||
versions = [v for v in versions if version.parse(v).is_devrelease == False] | ||
versions.sort(key=version.parse) | ||
|
||
_repo_url: str = submodule["url"] | ||
_path = submodule["path"] | ||
return versions | ||
|
||
repo_url: str = submodule["url"] | ||
path = submodule["path"] | ||
_version = submodule.get("version") | ||
_minimal_version = submodule.get("minimal_version") | ||
_branch = submodule.get("branch") | ||
target = None | ||
target_type = "tag" | ||
|
||
if self.clone_type == PosMambaRepoSetup.CloneType.HTTPS: | ||
_repo_url = _repo_url.replace("[email protected]:", "https://github.com/") | ||
|
||
exec_count = 0 | ||
while exec_count < 3: | ||
repo_cloned = False | ||
# Change the current working directory to the script directory | ||
os.chdir(self.script_dir) | ||
full_repo_path = os.path.join(self.script_dir, _path) | ||
full_repo_path = ( | ||
os.path.join(PosMambaRepoSetup.script_dir, path) | ||
if not full_repo_path | ||
else full_repo_path | ||
) | ||
|
||
# Checking if the repository already exists | ||
if not os.path.exists(_path): | ||
print_color(f"Initalizing submodule {_path}", BLUE) | ||
result = self.run_command( | ||
["git", "clone", "--no-checkout", _repo_url, full_repo_path], _path | ||
if _version: | ||
target = _version | ||
elif _minimal_version: | ||
target = get_latest_same_major( | ||
get_sorted_releases(full_repo_path), _minimal_version | ||
) | ||
if target is None: | ||
print_error( | ||
f"Minimal version {_minimal_version} not found on {path}! {PosMambaRepoSetup.message_check}" | ||
) | ||
if result.returncode != 0: | ||
print_error(f"Git clone failed on {_path}") | ||
return | ||
repo_cloned = True | ||
else: | ||
print_color(f"Updating submodule {_path}", BLUE) | ||
exit(1) | ||
elif _branch: | ||
target = _branch | ||
target_type = "branch" | ||
else: | ||
print_error( | ||
f"ERROR: No version, minimum_version, or branch was specified for the repository: {path}" | ||
) | ||
return None | ||
|
||
# Changing to the repository directory | ||
os.chdir(full_repo_path) | ||
return repo_url, path, target_type, target | ||
|
||
if not os.path.exists(os.path.join(full_repo_path, ".git")): | ||
print_warning(f".git not found on {_path}") | ||
remove_repo(_path) | ||
exec_count += 1 | ||
continue | ||
def update_repo(self, submodule): | ||
submodule_info = self.get_info_by_submodule(submodule) | ||
|
||
if _version: | ||
target = _version | ||
elif _minimal_version: | ||
target = get_latest_same_major( | ||
get_sorted_releases(full_repo_path), _minimal_version | ||
) | ||
if target is None: | ||
print_error( | ||
f"Minimal version {_minimal_version} not found on {_path}! {message_check}" | ||
) | ||
exit(1) | ||
elif _branch: | ||
target = _branch | ||
target_type = "branch" | ||
else: | ||
print_error( | ||
f"ERROR: No version, minimum_version, or branch was specified for the repository: {_path}" | ||
) | ||
return | ||
|
||
stash_applied = False | ||
if not repo_cloned: | ||
status_output = subprocess.check_output( | ||
["git", "status", "--porcelain"] | ||
).decode("utf-8") | ||
if status_output: | ||
_result = self.run_command( | ||
[ | ||
"git", | ||
"stash", | ||
"push", | ||
"-m", | ||
'"STASHED BY REPO_SETUP.PY"', | ||
"--include-untracked", | ||
], | ||
_path, | ||
) | ||
if _result.returncode == 0: | ||
stash_applied = True | ||
if submodule_info: | ||
repo_url, path, target_type, target = submodule_info | ||
|
||
fetch_command = ["git", "fetch", "origin"] | ||
# if target_type == "tag": | ||
# fetch_command += ["tag"] | ||
# fetch_command += [target, "--force"] | ||
result = self.run_command(fetch_command, _path) | ||
if self.clone_type == self.CloneType.HTTPS: | ||
repo_url = repo_url.replace("[email protected]:", "https://github.com/") | ||
|
||
if result.returncode == 0: | ||
if target_type == "tag": | ||
result = self.run_command( | ||
["git", "checkout", target, "--force"], _path | ||
) | ||
elif target_type == "branch": | ||
result = self.run_command( | ||
[ | ||
"git", | ||
"checkout", | ||
"-B", | ||
_branch, | ||
f"origin/{_branch}", | ||
"--force", | ||
], | ||
_path, | ||
) | ||
if result.returncode == 0: | ||
self.run_command( | ||
["git", "reset", "--hard", f"origin/{_branch}"], _path | ||
) | ||
result = self.run_command(["git", "pull", "--force"], _path) | ||
exec_count = 0 | ||
while exec_count < 3: | ||
repo_cloned = False | ||
# Change the current working directory to the script directory | ||
os.chdir(self.script_dir) | ||
full_repo_path = os.path.join(self.script_dir, path) | ||
|
||
if result.returncode == 0: | ||
if stash_applied: | ||
_result = self.run_command(["git", "stash", "pop"], _path) | ||
if _result.returncode: | ||
print_error("Error applying stash") | ||
|
||
print_color( | ||
f"Repo {_path} updated with {target_type} {target} successfully!", | ||
GREEN, | ||
# Checking if the repository already exists | ||
if not os.path.exists(path): | ||
print_color(f"Initalizing submodule {path}", BLUE) | ||
result = self.run_command( | ||
["git", "clone", "--no-checkout", repo_url, full_repo_path], | ||
path, | ||
) | ||
add_to_gitignore(_path) | ||
break | ||
if result.returncode != 0: | ||
print_error(f"Git clone failed on {path}") | ||
return | ||
repo_cloned = True | ||
else: | ||
print_error( | ||
f"Repo {_path} update attempt with {target_type} {target} FAILED!" | ||
) | ||
remove_repo(_path) | ||
else: | ||
print_error( | ||
f"Failed to fetch {target_type} {target} on {_path}! {message_check}" | ||
) | ||
print_color(f"Updating submodule {path}", BLUE) | ||
|
||
exec_count += 1 | ||
if not os.path.exists(os.path.join(full_repo_path, ".git")): | ||
print_warning(f".git not found on {path}") | ||
self.remove_repo(path) | ||
exec_count += 1 | ||
continue | ||
|
||
result = self.checkout_and_pull(path, target_type, target, repo_cloned) | ||
|
||
if result: | ||
break | ||
|
||
exec_count += 1 | ||
else: | ||
print_error("Error loading submodule info") | ||
|
||
|
||
def main(): | ||
|
@@ -352,7 +375,7 @@ def get_latest_sdk_commit() -> str: | |
or not sdk_commit | ||
or sdk_commit.lower() == repo_setup_commit.lower() | ||
): | ||
with open("repo_settings.json", "r") as f: | ||
with open(PosMambaRepoSetup.repo_settings_file_name, "r") as f: | ||
repo_settings = json.load(f) | ||
|
||
submodules = repo_settings["submodules"] | ||
|
@@ -366,9 +389,9 @@ def get_latest_sdk_commit() -> str: | |
|
||
if repo_list: | ||
filtered_submodules = [] | ||
for string1 in repo_list: | ||
for repo in repo_list: | ||
for submodule in submodules: | ||
if string1.lower() in submodule["path"].lower(): | ||
if repo.lower() in submodule["path"].lower(): | ||
filtered_submodules.append(submodule) | ||
|
||
submodules = filtered_submodules | ||
|
@@ -389,3 +412,4 @@ def get_latest_sdk_commit() -> str: | |
|
||
if __name__ == "__main__": | ||
main() | ||
|