Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMIPA-5501 Refactor repo_setup to be used by other scripts #973

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 176 additions & 152 deletions tools/repo_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,36 +70,14 @@ def install_package(package):
install_package("requests")


def get_sorted_releases(path_to_run):
from packaging import version

command_list = ["git", "ls-remote", "--tags"]

output = subprocess.check_output(command_list, cwd=path_to_run).decode()
versions = [line.split("/")[-1] for line in output.split("\n") if line]
versions = [v for v in versions if version.parse(v).is_devrelease == False]
versions.sort(key=version.parse)

return versions


def get_latest_same_major(versions, base_version):
from packaging import version

base_major = version.parse(base_version).major
same_major_versions = [v for v in versions if version.parse(v).major == base_major]

if not same_major_versions:
return None

return max(same_major_versions, key=version.parse)


class PosMambaRepoSetup:
from enum import Enum

repo_settings_file_name = "repo_settings.json"

# Get the absolute path of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
message_check = "Check your settings on repo_settings.json"

class CloneType(Enum):
SSH = "ssh"
Expand Down Expand Up @@ -130,9 +108,17 @@ def run_command(self, cmd, repo=None):
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)

def update_repo(self, submodule):
message_check = "Check your settings on repo_settings.json"
def remove_repo(self, path):
if self.force or self.clone_type == PosMambaRepoSetup.CloneType.HTTPS:
os.chdir(self.script_dir)
print_warning(f"Removing {path} to try again...")
shutil.rmtree(os.path.join(self.script_dir, path))
else:
print_warning(
f"Re-run repo_setup.py with -f param to fix this or remove {path} submodule dir..."
)

def checkout_and_pull(self, path, target_type, target, repo_cloned=True) -> bool:
def add_to_gitignore(filename):
"""
Add a file to gitignore if it not already defined
Expand All @@ -150,148 +136,185 @@ def add_to_gitignore(filename):
gitignore.write(filename + "\n")
print(f"{filename} added to .gitignore.")

def remove_repo(path):
if self.force or self.clone_type == PosMambaRepoSetup.CloneType.HTTPS:
os.chdir(self.script_dir)
print_warning(f"Removing {path} to try again...")
shutil.rmtree(os.path.join(self.script_dir, path))
# Changing to the repository directory
full_repo_path = os.path.join(self.script_dir, path)
os.chdir(full_repo_path)

stash_applied = False
if not repo_cloned:
status_output = subprocess.check_output(
["git", "status", "--porcelain"]
).decode("utf-8")
if status_output:
_result = self.run_command(
[
"git",
"stash",
"push",
"-m",
'"STASHED BY REPO_SETUP.PY"',
"--include-untracked",
],
path,
)
if _result.returncode == 0:
stash_applied = True

fetch_command = ["git", "fetch", "origin"]
result = self.run_command(fetch_command, path)

if result.returncode == 0:
if target_type == "tag":
result = self.run_command(["git", "checkout", target, "--force"], path)
elif target_type == "branch":
result = self.run_command(
[
"git",
"checkout",
"-B",
target,
f"origin/{target}",
"--force",
],
path,
)
if result.returncode == 0:
self.run_command(
["git", "reset", "--hard", f"origin/{target}"], path
)
result = self.run_command(["git", "pull", "--force"], path)

if result.returncode == 0:
if stash_applied:
_result = self.run_command(["git", "stash", "pop"], path)
if _result.returncode:
print_error("Error applying stash")

print_color(
f"Repo {path} updated with {target_type} {target} successfully!",
GREEN,
)
add_to_gitignore(path)
return True
else:
print_warning(
f"Re-run repo_setup.py with -f param to fix this or remove {path} submodule dir..."
print_error(
f"Repo {path} update attempt with {target_type} {target} FAILED!"
)
self.remove_repo(path)
else:
print_error(
f"Failed to fetch {target_type} {target} on {path}! {self.message_check}"
)

return False

@staticmethod
def get_info_by_submodule(submodule, full_repo_path=None) -> tuple:
def get_latest_same_major(versions, base_version):
from packaging import version

base_major = version.parse(base_version).major
same_major_versions = [
v for v in versions if version.parse(v).major == base_major
]

if not same_major_versions:
return None

return max(same_major_versions, key=version.parse)

def get_sorted_releases(path_to_run):
from packaging import version

command_list = ["git", "ls-remote", "--tags"]

output = subprocess.check_output(command_list, cwd=path_to_run).decode()
versions = [line.split("/")[-1] for line in output.split("\n") if line]
versions = [v for v in versions if version.parse(v).is_devrelease == False]
versions.sort(key=version.parse)

_repo_url: str = submodule["url"]
_path = submodule["path"]
return versions

repo_url: str = submodule["url"]
path = submodule["path"]
_version = submodule.get("version")
_minimal_version = submodule.get("minimal_version")
_branch = submodule.get("branch")
target = None
target_type = "tag"

if self.clone_type == PosMambaRepoSetup.CloneType.HTTPS:
_repo_url = _repo_url.replace("[email protected]:", "https://github.com/")

exec_count = 0
while exec_count < 3:
repo_cloned = False
# Change the current working directory to the script directory
os.chdir(self.script_dir)
full_repo_path = os.path.join(self.script_dir, _path)
full_repo_path = (
os.path.join(PosMambaRepoSetup.script_dir, path)
if not full_repo_path
else full_repo_path
)

# Checking if the repository already exists
if not os.path.exists(_path):
print_color(f"Initalizing submodule {_path}", BLUE)
result = self.run_command(
["git", "clone", "--no-checkout", _repo_url, full_repo_path], _path
if _version:
target = _version
elif _minimal_version:
target = get_latest_same_major(
get_sorted_releases(full_repo_path), _minimal_version
)
if target is None:
print_error(
f"Minimal version {_minimal_version} not found on {path}! {PosMambaRepoSetup.message_check}"
)
if result.returncode != 0:
print_error(f"Git clone failed on {_path}")
return
repo_cloned = True
else:
print_color(f"Updating submodule {_path}", BLUE)
exit(1)
elif _branch:
target = _branch
target_type = "branch"
else:
print_error(
f"ERROR: No version, minimum_version, or branch was specified for the repository: {path}"
)
return None

# Changing to the repository directory
os.chdir(full_repo_path)
return repo_url, path, target_type, target

if not os.path.exists(os.path.join(full_repo_path, ".git")):
print_warning(f".git not found on {_path}")
remove_repo(_path)
exec_count += 1
continue
def update_repo(self, submodule):
submodule_info = self.get_info_by_submodule(submodule)

if _version:
target = _version
elif _minimal_version:
target = get_latest_same_major(
get_sorted_releases(full_repo_path), _minimal_version
)
if target is None:
print_error(
f"Minimal version {_minimal_version} not found on {_path}! {message_check}"
)
exit(1)
elif _branch:
target = _branch
target_type = "branch"
else:
print_error(
f"ERROR: No version, minimum_version, or branch was specified for the repository: {_path}"
)
return

stash_applied = False
if not repo_cloned:
status_output = subprocess.check_output(
["git", "status", "--porcelain"]
).decode("utf-8")
if status_output:
_result = self.run_command(
[
"git",
"stash",
"push",
"-m",
'"STASHED BY REPO_SETUP.PY"',
"--include-untracked",
],
_path,
)
if _result.returncode == 0:
stash_applied = True
if submodule_info:
repo_url, path, target_type, target = submodule_info

fetch_command = ["git", "fetch", "origin"]
# if target_type == "tag":
# fetch_command += ["tag"]
# fetch_command += [target, "--force"]
result = self.run_command(fetch_command, _path)
if self.clone_type == self.CloneType.HTTPS:
repo_url = repo_url.replace("[email protected]:", "https://github.com/")

if result.returncode == 0:
if target_type == "tag":
result = self.run_command(
["git", "checkout", target, "--force"], _path
)
elif target_type == "branch":
result = self.run_command(
[
"git",
"checkout",
"-B",
_branch,
f"origin/{_branch}",
"--force",
],
_path,
)
if result.returncode == 0:
self.run_command(
["git", "reset", "--hard", f"origin/{_branch}"], _path
)
result = self.run_command(["git", "pull", "--force"], _path)
exec_count = 0
while exec_count < 3:
repo_cloned = False
# Change the current working directory to the script directory
os.chdir(self.script_dir)
full_repo_path = os.path.join(self.script_dir, path)

if result.returncode == 0:
if stash_applied:
_result = self.run_command(["git", "stash", "pop"], _path)
if _result.returncode:
print_error("Error applying stash")

print_color(
f"Repo {_path} updated with {target_type} {target} successfully!",
GREEN,
# Checking if the repository already exists
if not os.path.exists(path):
print_color(f"Initalizing submodule {path}", BLUE)
result = self.run_command(
["git", "clone", "--no-checkout", repo_url, full_repo_path],
path,
)
add_to_gitignore(_path)
break
if result.returncode != 0:
print_error(f"Git clone failed on {path}")
return
repo_cloned = True
else:
print_error(
f"Repo {_path} update attempt with {target_type} {target} FAILED!"
)
remove_repo(_path)
else:
print_error(
f"Failed to fetch {target_type} {target} on {_path}! {message_check}"
)
print_color(f"Updating submodule {path}", BLUE)

exec_count += 1
if not os.path.exists(os.path.join(full_repo_path, ".git")):
print_warning(f".git not found on {path}")
self.remove_repo(path)
exec_count += 1
continue

result = self.checkout_and_pull(path, target_type, target, repo_cloned)

if result:
break

exec_count += 1
else:
print_error("Error loading submodule info")


def main():
Expand Down Expand Up @@ -352,7 +375,7 @@ def get_latest_sdk_commit() -> str:
or not sdk_commit
or sdk_commit.lower() == repo_setup_commit.lower()
):
with open("repo_settings.json", "r") as f:
with open(PosMambaRepoSetup.repo_settings_file_name, "r") as f:
repo_settings = json.load(f)

submodules = repo_settings["submodules"]
Expand All @@ -366,9 +389,9 @@ def get_latest_sdk_commit() -> str:

if repo_list:
filtered_submodules = []
for string1 in repo_list:
for repo in repo_list:
for submodule in submodules:
if string1.lower() in submodule["path"].lower():
if repo.lower() in submodule["path"].lower():
filtered_submodules.append(submodule)

submodules = filtered_submodules
Expand All @@ -389,3 +412,4 @@ def get_latest_sdk_commit() -> str:

if __name__ == "__main__":
main()

Loading