Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 187 additions & 4 deletions shared/python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import secrets
import base64
import inspect
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

from typing import Any, Optional, Tuple
Expand All @@ -31,10 +33,19 @@
BOLD_G = '\x1b[1;32m' # green
BOLD_R = '\x1b[1;31m' # red
BOLD_Y = '\x1b[1;33m' # yellow
BOLD_C = '\x1b[1;36m' # cyan
BOLD_M = '\x1b[1;35m' # magenta
BOLD_W = '\x1b[1;37m' # white
RESET = '\x1b[0m'

# Thread colors for parallel operations
THREAD_COLORS = [BOLD_B, BOLD_G, BOLD_Y, BOLD_C, BOLD_M, BOLD_W]

CONSOLE_WIDTH = 175

# Thread-safe print lock
_print_lock = threading.Lock()


# ------------------------------
# HELPER FUNCTIONS
Expand Down Expand Up @@ -1212,10 +1223,111 @@ def read_policy_xml(policy_xml_filepath_or_filename: str, named_values: dict[str
return policy_template_xml


def _cleanup_resources_thread_safe(deployment_name: str, rg_name: str, thread_prefix: str, thread_color: str) -> tuple[bool, str]:
"""
Thread-safe wrapper for _cleanup_resources with formatted output.

Args:
deployment_name (str): The deployment name (string).
rg_name (str): The resource group name.
thread_prefix (str): The thread prefix for output formatting.
thread_color (str): ANSI color code for this thread.

Returns:
tuple[bool, str]: (success, error_message)
"""
try:
with _print_lock:
_print_log(f"{thread_prefix}Starting cleanup for resource group: {rg_name}", '👉🏽 ', thread_color)

# Create a modified version of _cleanup_resources that uses thread-safe printing
_cleanup_resources_with_thread_safe_printing(deployment_name, rg_name, thread_prefix, thread_color)

with _print_lock:
_print_log(f"{thread_prefix}Completed cleanup for resource group: {rg_name}", '👉🏽 ', thread_color)

return True, ""

except Exception as e:
error_msg = f'An error occurred during cleanup of {rg_name}: {str(e)}'
with _print_lock:
_print_log(f"{thread_prefix}{error_msg}", '⛔ ', BOLD_R, show_time=True)
traceback.print_exc()
return False, error_msg


def _cleanup_resources_with_thread_safe_printing(deployment_name: str, rg_name: str, thread_prefix: str, thread_color: str) -> None:
"""
Clean up resources with thread-safe printing (internal implementation for parallel execution).
This is a modified version of _cleanup_resources that uses thread-safe output.
"""
if not deployment_name:
with _print_lock:
_print_log(f"{thread_prefix}Missing deployment name parameter.", '⛔ ', BOLD_R)
return

if not rg_name:
with _print_lock:
_print_log(f"{thread_prefix}Missing resource group name parameter.", '⛔ ', BOLD_R)
return

try:
with _print_lock:
_print_log(f"{thread_prefix}Resource group : {rg_name}", '👉🏽 ', thread_color)

# Show the deployment details
output = run(f'az deployment group show --name {deployment_name} -g {rg_name} -o json', 'Deployment retrieved', 'Failed to retrieve the deployment', print_command_to_run = False)

if output.success and output.json_data:
# Delete and purge CognitiveService accounts
output = run(f' az cognitiveservices account list -g {rg_name}', f'Listed CognitiveService accounts', f'Failed to list CognitiveService accounts', print_command_to_run = False)

if output.success and output.json_data:
for resource in output.json_data:
with _print_lock:
_print_log(f"{thread_prefix}Deleting and purging Cognitive Service Account '{resource['name']}'...", '👉🏽 ', thread_color)
output = run(f"az cognitiveservices account delete -g {rg_name} -n {resource['name']}", f"Cognitive Services '{resource['name']}' deleted", f"Failed to delete Cognitive Services '{resource['name']}'", print_command_to_run = False)
output = run(f"az cognitiveservices account purge -g {rg_name} -n {resource['name']} --location \"{resource['location']}\"", f"Cognitive Services '{resource['name']}' purged", f"Failed to purge Cognitive Services '{resource['name']}'", print_command_to_run = False)

# Delete and purge APIM resources
output = run(f' az apim list -g {rg_name}', f'Listed APIM resources', f'Failed to list APIM resources', print_command_to_run = False)

if output.success and output.json_data:
for resource in output.json_data:
with _print_lock:
_print_log(f"{thread_prefix}Deleting and purging API Management '{resource['name']}'...", '👉🏽 ', thread_color)
output = run(f"az apim delete -n {resource['name']} -g {rg_name} -y", f"API Management '{resource['name']}' deleted", f"Failed to delete API Management '{resource['name']}'", print_command_to_run = False)
output = run(f"az apim deletedservice purge --service-name {resource['name']} --location \"{resource['location']}\"", f"API Management '{resource['name']}' purged", f"Failed to purge API Management '{resource['name']}'", print_command_to_run = False)

# Delete and purge Key Vault resources
output = run(f' az keyvault list -g {rg_name}', f'Listed Key Vault resources', f'Failed to list Key Vault resources', print_command_to_run = False)

if output.success and output.json_data:
for resource in output.json_data:
with _print_lock:
_print_log(f"{thread_prefix}Deleting and purging Key Vault '{resource['name']}'...", '👉🏽 ', thread_color)
output = run(f"az keyvault delete -n {resource['name']} -g {rg_name}", f"Key Vault '{resource['name']}' deleted", f"Failed to delete Key Vault '{resource['name']}'", print_command_to_run = False)
output = run(f"az keyvault purge -n {resource['name']} --location \"{resource['location']}\"", f"Key Vault '{resource['name']}' purged", f"Failed to purge Key Vault '{resource['name']}'", print_command_to_run = False)

# Delete the resource group last
with _print_lock:
_print_log(f"{thread_prefix}Deleting resource group '{rg_name}'...", 'ℹ️ ', thread_color, show_time=True)
output = run(f'az group delete --name {rg_name} -y', f"Resource group '{rg_name}' deleted', f'Failed to delete resource group '{rg_name}'", print_command_to_run = False)

with _print_lock:
_print_log(f"{thread_prefix}Cleanup completed.", 'ℹ️ ', thread_color, show_time=True)

except Exception as e:
with _print_lock:
_print_log(f"{thread_prefix}An error occurred during cleanup: {e}", '⛔ ', BOLD_R)
traceback.print_exc()


def cleanup_infra_deployments(deployment: INFRASTRUCTURE, indexes: int | list[int] | None = None) -> None:
"""
Clean up infrastructure deployments by deployment enum and index/indexes.
Obtains the infra resource group name for each index and calls the private cleanup method.
For multiple indexes, runs cleanup operations in parallel for better performance.

Args:
deployment (INFRASTRUCTURE): The infrastructure deployment enum value.
Expand All @@ -1229,13 +1341,84 @@ def cleanup_infra_deployments(deployment: INFRASTRUCTURE, indexes: int | list[in
else:
indexes_list = [indexes]

i = 1
for idx in indexes_list:
print_info(f'{i}/{len(indexes_list)}: Cleaning up resources for {deployment} - {idx}', True)
# If only one index, run sequentially (no need for threading overhead)
if len(indexes_list) <= 1:
idx = indexes_list[0] if indexes_list else None
print_info(f'Cleaning up resources for {deployment.value} - {idx}', True)
rg_name = get_infra_rg_name(deployment, idx)
_cleanup_resources(deployment.value, rg_name)
i += 1
print_ok('Cleanup completed!')
return

# For multiple indexes, run in parallel
print_info(f'Starting parallel cleanup for {len(indexes_list)} infrastructure instances', True)
print_info(f'Infrastructure: {deployment.value}')
print_info(f'Indexes: {indexes_list}')
print()

# Determine max workers (reasonable limit to avoid overwhelming the system)
max_workers = min(len(indexes_list), 4) # Cap at 4 concurrent threads

cleanup_tasks = []
for i, idx in enumerate(indexes_list):
rg_name = get_infra_rg_name(deployment, idx)
thread_color = THREAD_COLORS[i % len(THREAD_COLORS)]
thread_prefix = f"{thread_color}[{deployment.value}-{idx}]{RESET}: "

cleanup_tasks.append({
'deployment_name': deployment.value,
'rg_name': rg_name,
'thread_prefix': thread_prefix,
'thread_color': thread_color,
'index': idx
})

# Execute cleanup tasks in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(
_cleanup_resources_thread_safe,
task['deployment_name'],
task['rg_name'],
task['thread_prefix'],
task['thread_color']
): task for task in cleanup_tasks
}

# Track results
completed_count = 0
failed_count = 0

# Wait for completion and handle results
for future in as_completed(future_to_task):
task = future_to_task[future]
try:
success, error_msg = future.result()
completed_count += 1

if success:
with _print_lock:
print_ok(f"Completed cleanup for {deployment.value}-{task['index']} ({completed_count}/{len(indexes_list)})")
else:
failed_count += 1
with _print_lock:
print_error(f"❌ Failed cleanup for {deployment.value}-{task['index']}: {error_msg}")

except Exception as e:
failed_count += 1
with _print_lock:
print_error(f"❌ Exception during cleanup for {deployment.value}-{task['index']}: {str(e)}")

# Final summary
print()
if failed_count == 0:
print_ok(f'All {len(indexes_list)} infrastructure cleanups completed successfully!')
else:
print_warning(f'Completed with {failed_count} failures out of {len(indexes_list)} total cleanups.')
if completed_count > 0:
print_info(f'{completed_count} cleanups succeeded.')

print_ok('All done!')

def extract_json(text: str) -> Any:
Expand Down
4 changes: 2 additions & 2 deletions tests/python/test_infrastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def test_infrastructure_base_policy_fragments_creation(mock_utils):
)

# Initialize policy fragments
pfs = infra._define_policy_fragments()
infra._define_policy_fragments()

# Check that all base policy fragments are created
expected_fragment_names = [
Expand All @@ -195,7 +195,7 @@ def test_infrastructure_base_apis_creation(mock_utils):
)

# Initialize APIs
apis = infra._define_apis()
infra._define_apis()

# Check that hello-world API is created
assert len(infra.base_apis) == 1
Expand Down
Loading
Loading