From 76337fe3b0127b95c5ecf8f2098afd1677aea313 Mon Sep 17 00:00:00 2001 From: Pablo Cachafeiro Date: Tue, 2 Dec 2025 18:22:41 +0100 Subject: [PATCH 1/2] chore: initial commit From b51f9ddc4af539f31d41d693694e6f548ce78cc8 Mon Sep 17 00:00:00 2001 From: Pablo Cachafeiro Date: Tue, 2 Dec 2025 18:23:51 +0100 Subject: [PATCH 2/2] =?UTF-8?q?feat(downloader):=20=E2=9C=A8=20Improve=20U?= =?UTF-8?q?UID=20replacement=20logic=20for=20playbooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added a method to preserve task UUIDs (`taskid` and `task.id`) during UUID replacement in playbooks. - Implemented parallel processing for downloading and parsing custom content, enhancing performance. - Updated tests to verify that task UUIDs remain unchanged while playbook IDs are correctly replaced. --- demisto_sdk/commands/download/downloader.py | 576 +++++++++++++----- .../download/tests/downloader_test.py | 73 ++- 2 files changed, 510 insertions(+), 139 deletions(-) diff --git a/demisto_sdk/commands/download/downloader.py b/demisto_sdk/commands/download/downloader.py index 00ddb5e3a2f..d3f1778e1a7 100644 --- a/demisto_sdk/commands/download/downloader.py +++ b/demisto_sdk/commands/download/downloader.py @@ -5,6 +5,7 @@ import tarfile import traceback from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed from enum import Enum from io import BytesIO, StringIO from pathlib import Path @@ -407,6 +408,67 @@ def create_uuid_to_name_mapping( logger.debug("Custom content IDs mapping created successfully.") return mapping + def should_download_file(self, file_name: str) -> bool: + """ + Check if the file should be downloaded based on the input parameters. + This is a heuristic to avoid extracting and parsing all files. + + Note: This is a pre-filter based on filename. The actual content filtering + happens later in filter_custom_content() using the parsed content name. + """ + if self.download_all_custom_content: + return True + + if self.regex: + # Try to apply regex to filename as a heuristic pre-filter. + # The actual filtering will be done on content name later, but this can + # reduce the number of files we need to parse. + # Remove common prefixes and extensions for better matching + simplified_name = file_name + for prefix in ("automation-", "playbook-", "integration-", "script-", + "classifier-", "mapper-", "layout-", "incidentfield-", + "indicatorfield-", "incidenttype-", "widget-", "dashboard-"): + if simplified_name.lower().startswith(prefix): + simplified_name = simplified_name[len(prefix):] + break + + # Remove extension + for ext in (".yml", ".yaml", ".json"): + if simplified_name.lower().endswith(ext): + simplified_name = simplified_name[:-len(ext)] + break + + # Apply regex to both original filename and simplified name + try: + compiled_regex = re.compile(self.regex) + if compiled_regex.search(file_name) or compiled_regex.search(simplified_name): + return True + # Also check with underscores replaced by spaces (common naming convention) + if compiled_regex.search(simplified_name.replace("_", " ")): + return True + # Don't filter out - let it through for final filtering + # Only skip if it's very unlikely to match (no partial overlap) + # For safety, we still allow files through if unsure + return True # Conservative: let through for final filtering + except re.error: + # Invalid regex, let through for proper error handling later + return True + + if self.input_files: + # Check if any input file matches the filename + for input_file in self.input_files: + # Check for exact match or sanitized match (spaces to underscores) + if input_file in file_name: + return True + if input_file.replace(" ", "_") in file_name: + return True + # Also check lowercase variants + if input_file.lower() in file_name.lower(): + return True + return False + + return True + def download_custom_content(self) -> dict[str, StringIO]: """ Download custom content bundle using server's API, @@ -449,26 +511,83 @@ def download_custom_content(self) -> dict[str, StringIO]: raise HandledError from e logger.debug("Custom content bundle fetched successfully.") - logger.debug( - f"Downloaded content bundle size (bytes): {len(api_response.data)}" - ) - loaded_files: dict[str, StringIO] = {} + # Get the bundle data - handle both bytes and HTTPResponse with .data attribute + bundle_data = api_response.data if hasattr(api_response, 'data') else api_response.read() + logger.debug(f"Downloaded content bundle size (bytes): {len(bundle_data)}") - with tarfile.open(fileobj=BytesIO(api_response.data), mode="r") as tar: + # First pass: Extract raw bytes from tar + raw_files: list[tuple[str, bytes]] = [] + files_skipped = 0 + + with tarfile.open(fileobj=BytesIO(bundle_data), mode="r") as tar: tar_members = tar.getmembers() logger.debug(f"Custom content bundle contains {len(tar_members)} items.") - - for file in tar_members: + + for i, file in enumerate(tar_members): file_name = file.name.lstrip("/") + if not self.should_download_file(file_name): + files_skipped += 1 + continue + if extracted_file := tar.extractfile(file): - file_data = StringIO(safe_read_unicode(extracted_file.read())) - loaded_files[file_name] = file_data + raw_files.append((file_name, extracted_file.read())) + + # Log progress every 100 files for large bundles + if (i + 1) % 100 == 0: + logger.debug(f"Extracted {i + 1} files from bundle...") + + logger.debug( + f"Tar extraction complete: {len(raw_files)} files extracted, {files_skipped} files skipped." + ) - logger.debug("Custom content items loaded to memory successfully.") + # Second pass: Decode bytes to StringIO in parallel (CPU-bound) + loaded_files: dict[str, StringIO] = {} + + if len(raw_files) > 10: # Only parallelize if worth the overhead + max_workers = min(8, len(raw_files)) + logger.debug(f"Using {max_workers} parallel workers for content decoding") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_file = { + executor.submit(self._decode_file_content, file_name, raw_bytes): file_name + for file_name, raw_bytes in raw_files + } + + for future in as_completed(future_to_file): + file_name = future_to_file[future] + try: + result = future.result() + if result: + loaded_files[file_name] = result + except Exception as e: + logger.warning(f"Failed to decode '{file_name}': {e}") + else: + # Sequential processing for small bundles + for file_name, raw_bytes in raw_files: + try: + loaded_files[file_name] = self._decode_file_content(file_name, raw_bytes) + except Exception as e: + logger.warning(f"Failed to decode '{file_name}': {e}") + + logger.debug(f"Custom content items loaded to memory successfully ({len(loaded_files)} files).") return loaded_files + @staticmethod + def _decode_file_content(file_name: str, raw_bytes: bytes) -> StringIO: + """ + Decode raw bytes to StringIO. Helper method for parallel processing. + + Args: + file_name (str): The file name (for logging purposes). + raw_bytes (bytes): The raw file content bytes. + + Returns: + StringIO: The decoded file content. + """ + return StringIO(safe_read_unicode(raw_bytes)) + def replace_uuid_ids( self, custom_content_objects: dict[str, dict], uuid_mapping: dict[str, str] ): @@ -483,23 +602,37 @@ def replace_uuid_ids( to their corresponding objects. uuid_mapping (dict[str, str]): A dictionary mapping UUID IDs to corresponding names of custom content. """ + if not custom_content_objects: + return + changed_uuids_count = 0 failed_content_items = set() - for original_file_name, file_object in custom_content_objects.items(): - try: - if self.replace_uuid_ids_for_item( - custom_content_object=file_object, uuid_mapping=uuid_mapping - ): - changed_uuids_count += 1 - - except Exception as e: - # If UUID replacement failed, we skip the file - logger.warning( - f"Could not replace UUID IDs in '{file_object['name']}'. " - f"Content item will be skipped.\nError: {e}" - ) - failed_content_items.add(original_file_name) + # Use parallel processing for better performance + max_workers = min(8, len(custom_content_objects)) # Limit to 8 workers max + if max_workers > 1: + logger.debug(f"Using {max_workers} parallel workers for UUID replacement") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all UUID replacement tasks + future_to_file = { + executor.submit(self.replace_uuid_ids_for_item, file_object, uuid_mapping): original_file_name + for original_file_name, file_object in custom_content_objects.items() + } + + # Collect results as they complete + for future in as_completed(future_to_file): + original_file_name = future_to_file[future] + try: + if future.result(): + changed_uuids_count += 1 + except Exception as e: + # If UUID replacement failed, we skip the file + logger.warning( + f"Could not replace UUID IDs in '{custom_content_objects[original_file_name]['name']}'. " + f"Content item will be skipped.\nError: {e}" + ) + failed_content_items.add(original_file_name) for failed_content_item in failed_content_items: custom_content_objects.pop(failed_content_item) @@ -509,6 +642,31 @@ def replace_uuid_ids( f"Replaced UUID IDs with names in {changed_uuids_count} custom content items." ) + def _get_playbook_task_uuids(self, data: dict) -> set[str]: + """ + Extract all taskid and task.id UUIDs from a playbook's tasks. + These UUIDs should be preserved and not replaced with names. + + Args: + data (dict): The playbook data dictionary. + + Returns: + set[str]: A set of UUIDs that belong to task identifiers. + """ + task_uuids: set[str] = set() + tasks = data.get("tasks", {}) + if isinstance(tasks, dict): + for task in tasks.values(): + # Get taskid (the UUID at the task level) + taskid = str(task.get("taskid", "")) + if re.match(UUID_REGEX, taskid): + task_uuids.add(taskid) + # Get task.id (the UUID inside the nested task object) + task_inner_id = str(task.get("task", {}).get("id", "")) + if re.match(UUID_REGEX, task_inner_id): + task_uuids.add(task_inner_id) + return task_uuids + def replace_uuid_ids_for_item( self, custom_content_object: dict, uuid_mapping: dict[str, str] ) -> bool: @@ -516,6 +674,10 @@ def replace_uuid_ids_for_item( Find and replace UUID IDs of custom content items with their names. The method first creates a mapping of a UUID to a name, and then replaces all UUIDs using this mapping. + Note: + For playbooks, taskid and task.id fields are excluded from replacement to prevent + the format command from regenerating new UUIDs for every task on each download. + Args: custom_content_object (dict): A single custom content object to update UUIDs in. uuid_mapping (dict[str, str]): A dictionary mapping UUID IDs to corresponding names of custom content. @@ -527,7 +689,26 @@ def replace_uuid_ids_for_item( uuid_matches = re.findall(UUID_REGEX, content_item_file_content) if uuid_matches: - for uuid in set(uuid_matches).intersection(uuid_mapping): + # Get unique UUIDs that need replacement + uuids_to_replace = set(uuid_matches).intersection(uuid_mapping) + + if not uuids_to_replace: + return False + + # For playbooks, exclude task UUIDs (taskid and task.id) from replacement + # to prevent the format command from regenerating new UUIDs for every task + if custom_content_object["type"] in (FileType.PLAYBOOK, FileType.TEST_PLAYBOOK): + task_uuids = self._get_playbook_task_uuids(custom_content_object.get("data", {})) + uuids_to_replace = uuids_to_replace - task_uuids + if task_uuids: + logger.debug( + f"Preserving {len(task_uuids)} task UUIDs in playbook '{custom_content_object['name']}'" + ) + + if not uuids_to_replace: + return False + + for uuid in uuids_to_replace: logger.debug( f"Replacing UUID '{uuid}' with '{uuid_mapping[uuid]}' in " f"'{custom_content_object['name']}'" @@ -592,7 +773,7 @@ def build_request_params( def get_system_automations(self, content_items: list[str]) -> dict[str, dict]: """ - Fetch system automations from server. + Fetch system automations from server in parallel. Args: content_items (list[str]): A list of system automation names to fetch. @@ -601,33 +782,54 @@ def get_system_automations(self, content_items: list[str]) -> dict[str, dict]: dict[str, dict]: A dictionary mapping downloaded automations file names, to corresponding dictionaries containing metadata and content. """ - downloaded_automations: list[bytes] = [] logger.info( - f"Fetching system automations from server ({self.client.api_client.configuration.host})..." + f"Fetching {len(content_items)} system automations from server ({self.client.api_client.configuration.host})..." ) + # Validate automation names first + valid_automations = [] for automation in content_items: - try: - # This is required due to a server issue where the '/' character - # is considered a path separator for the expected_endpoint. - if "/" in automation: - raise ValueError( - f"Automation name '{automation}' is invalid. " - f"Automation names cannot contain the '/' character." - ) - - endpoint = f"automation/load/{automation}" - api_response = demisto_client.generic_request_func( - self.client, - endpoint, - "POST", - _preload_content=False, - )[0] - - downloaded_automations.append(api_response.data) - - except Exception as e: - logger.error(f"Failed to fetch system automation '{automation}': {e}") + if "/" in automation: + logger.error( + f"Automation name '{automation}' is invalid. " + f"Automation names cannot contain the '/' character." + ) + else: + valid_automations.append(automation) + + if not valid_automations: + return {} + + # Fetch automations in parallel + downloaded_automations: list[tuple[str, bytes]] = [] + + if len(valid_automations) > 1: + max_workers = min(4, len(valid_automations)) # Limit concurrent API calls + logger.debug(f"Using {max_workers} parallel workers for automation downloads") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_automation = { + executor.submit(self._fetch_single_automation, automation): automation + for automation in valid_automations + } + + for future in as_completed(future_to_automation): + automation = future_to_automation[future] + try: + result = future.result() + if result: + downloaded_automations.append((automation, result)) + except Exception as e: + logger.error(f"Failed to fetch system automation '{automation}': {e}") + else: + # Single automation - no need for parallelization + for automation in valid_automations: + try: + result = self._fetch_single_automation(automation) + if result: + downloaded_automations.append((automation, result)) + except Exception as e: + logger.error(f"Failed to fetch system automation '{automation}': {e}") logger.debug( f"Successfully fetched {len(downloaded_automations)} system automations." @@ -635,7 +837,7 @@ def get_system_automations(self, content_items: list[str]) -> dict[str, dict]: content_items_objects: dict[str, dict] = {} - for downloaded_automation in downloaded_automations: + for automation_name, downloaded_automation in downloaded_automations: automation_bytes_data = StringIO(safe_read_unicode(downloaded_automation)) automation_data = json.load(automation_bytes_data) @@ -652,9 +854,28 @@ def get_system_automations(self, content_items: list[str]) -> dict[str, dict]: return content_items_objects + def _fetch_single_automation(self, automation: str) -> bytes | None: + """ + Fetch a single automation from the server. Helper method for parallel processing. + + Args: + automation (str): The automation name to fetch. + + Returns: + bytes | None: The automation data bytes, or None if failed. + """ + endpoint = f"automation/load/{automation}" + api_response = demisto_client.generic_request_func( + self.client, + endpoint, + "POST", + _preload_content=False, + )[0] + return api_response.data + def get_system_playbooks(self, content_items: list[str]) -> dict[str, dict]: """ - Fetch system playbooks from server. + Fetch system playbooks from server in parallel. Args: content_items (list[str]): A list of names of system playbook to fetch. @@ -663,73 +884,66 @@ def get_system_playbooks(self, content_items: list[str]) -> dict[str, dict]: dict[str, dict]: A dictionary mapping downloaded playbooks file names, to corresponding dictionaries containing metadata and content. """ - downloaded_playbooks: list[bytes] = [] logger.info( - f"Fetching system playbooks from server ({self.client.api_client.configuration.host})..." + f"Fetching {len(content_items)} system playbooks from server ({self.client.api_client.configuration.host})..." ) + # Validate playbook names first + valid_playbooks = [] for playbook in content_items: - try: - # This is required due to a server issue where the '/' character - # is considered a path separator for the expected_endpoint. - if "/" in playbook: - raise ValueError( - f"Playbook name '{playbook}' is invalid. " - f"Playbook names cannot contain the '/' character." - ) + if "/" in playbook: + logger.error( + f"Playbook name '{playbook}' is invalid. " + f"Playbook names cannot contain the '/' character." + ) + else: + valid_playbooks.append(playbook) - endpoint = f"/playbook/{playbook}/yaml" + if not valid_playbooks: + logger.info("No system playbooks were downloaded.") + return {} + + # Fetch playbooks in parallel + downloaded_playbooks: list[tuple[str, bytes]] = [] + + if len(valid_playbooks) > 1: + max_workers = min(4, len(valid_playbooks)) # Limit concurrent API calls + logger.debug(f"Using {max_workers} parallel workers for playbook downloads") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_playbook = { + executor.submit(self._fetch_single_playbook, playbook): playbook + for playbook in valid_playbooks + } + + for future in as_completed(future_to_playbook): + playbook = future_to_playbook[future] + try: + result = future.result() + if result: + downloaded_playbooks.append((playbook, result)) + except Exception as e: + logger.error(f"Failed to fetch system playbook '{playbook}': {e}") + else: + # Single playbook - no need for parallelization + for playbook in valid_playbooks: try: - api_response = demisto_client.generic_request_func( - self.client, - endpoint, - "GET", - _preload_content=False, - )[0] - - except ApiException as err: - # handling in case the id and name are not the same, - # trying to get the id by the name through a different api call - logger.debug( - f"API call using playbook's name failed:\n{err}\n" - f"Attempting to fetch using playbook's ID..." - ) - - playbook_id = self.get_playbook_id_by_playbook_name(playbook) - - if not playbook_id: - logger.debug(f"No matching ID found for playbook '{playbook}'.") - raise - - logger.debug( - f"Found matching ID for '{playbook}' - {playbook_id}.\n" - f"Attempting to fetch playbook's YAML file using the ID." - ) - - endpoint = f"/playbook/{playbook_id}/yaml" - api_response = demisto_client.generic_request_func( - self.client, - endpoint, - "GET", - _preload_content=False, - )[0] - - downloaded_playbooks.append(api_response.data) - - except Exception as e: - logger.error(f"Failed to fetch system playbook '{playbook}': {e}") + result = self._fetch_single_playbook(playbook) + if result: + downloaded_playbooks.append((playbook, result)) + except Exception as e: + logger.error(f"Failed to fetch system playbook '{playbook}': {e}") - if len(downloaded_playbooks): + if downloaded_playbooks: logger.debug( f"Successfully fetched {len(downloaded_playbooks)} system playbooks." ) - else: logger.info("No system playbooks were downloaded.") content_objects: dict[str, dict] = {} - for downloaded_playbook in downloaded_playbooks: + for playbook_name, downloaded_playbook in downloaded_playbooks: playbook_bytes_data = StringIO(safe_read_unicode(downloaded_playbook)) playbook_data = yaml.load(playbook_bytes_data) @@ -746,6 +960,54 @@ def get_system_playbooks(self, content_items: list[str]) -> dict[str, dict]: return content_objects + def _fetch_single_playbook(self, playbook: str) -> bytes | None: + """ + Fetch a single playbook from the server. Helper method for parallel processing. + + Args: + playbook (str): The playbook name to fetch. + + Returns: + bytes | None: The playbook data bytes, or None if failed. + """ + endpoint = f"/playbook/{playbook}/yaml" + try: + api_response = demisto_client.generic_request_func( + self.client, + endpoint, + "GET", + _preload_content=False, + )[0] + return api_response.data + + except ApiException as err: + # handling in case the id and name are not the same, + # trying to get the id by the name through a different api call + logger.debug( + f"API call using playbook's name failed:\n{err}\n" + f"Attempting to fetch using playbook's ID..." + ) + + playbook_id = self.get_playbook_id_by_playbook_name(playbook) + + if not playbook_id: + logger.debug(f"No matching ID found for playbook '{playbook}'.") + raise + + logger.debug( + f"Found matching ID for '{playbook}' - {playbook_id}.\n" + f"Attempting to fetch playbook's YAML file using the ID." + ) + + endpoint = f"/playbook/{playbook_id}/yaml" + api_response = demisto_client.generic_request_func( + self.client, + endpoint, + "GET", + _preload_content=False, + )[0] + return api_response.data + def generate_system_content_file_name( self, content_item_type: ContentItemType, content_item: dict ) -> str: @@ -883,48 +1145,86 @@ def parse_custom_content_data( logger.info("Parsing downloaded custom content data...") custom_content_objects: dict[str, dict] = {} - for file_name, file_data in file_name_to_content_item_data.items(): - try: - logger.debug(f"Parsing '{file_name}'...") - custom_content_object: Dict = self.create_content_item_object( - file_name=file_name, file_data=file_data - ) + # Use parallel processing for better performance + max_workers = min(8, len(file_name_to_content_item_data)) # Limit to 8 workers max + if max_workers > 1: + logger.debug(f"Using {max_workers} parallel workers for content parsing") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all parsing tasks + future_to_file = { + executor.submit(self._parse_single_content_item, file_name, file_data): file_name + for file_name, file_data in file_name_to_content_item_data.items() + } + + # Collect results as they complete + results = [] + for future in as_completed(future_to_file): + file_name = future_to_file[future] + try: + result = future.result() + if result: + results.append((file_name, result)) + except Exception as e: + # We fail the whole download process, since we might miss UUIDs to replace if not. + logger.error(f"Error while parsing '{file_name}': {e}") + raise + + # Sort results by filename for consistent ordering + results.sort(key=lambda x: x[0]) + for file_name, content_object in results: + custom_content_objects[file_name] = content_object - # Check if all required fields are present - missing_field = False - for _field in ("id", "name", "entity", "type"): - if not custom_content_object.get(_field): - logger.warning( - f"'{file_name}' will be skipped as its {_field} could not be detected." - ) - missing_field = True - break + logger.info( + f"Successfully parsed {len(custom_content_objects)} custom content objects." + ) + return custom_content_objects - # If the content is missing a required field, skip it - if missing_field: - continue + def _parse_single_content_item(self, file_name: str, file_data: StringIO) -> dict | None: + """ + Parse a single content item. Helper method for parallel processing. + + Args: + file_name (str): The file name of the custom content item. + file_data (StringIO): The file data of the custom content item. + + Returns: + dict | None: The custom content object or None if parsing failed. + """ + try: + custom_content_object: Dict = self.create_content_item_object( + file_name=file_name, file_data=file_data + ) - # If the content is written in JavaScript (not supported), skip it - if custom_content_object["type"] in ( - FileType.INTEGRATION, - FileType.SCRIPT, - ) and custom_content_object.get("code_lang") in (None, "javascript"): + # Check if all required fields are present + missing_field = False + for _field in ("id", "name", "entity", "type"): + if not custom_content_object.get(_field): logger.warning( - f"Skipping '{file_name}' as JavaScript content is not supported." + f"'{file_name}' will be skipped as its {_field} could not be detected." ) - continue + missing_field = True + break + + # If the content is missing a required field, skip it + if missing_field: + return None + + # If the content is written in JavaScript (not supported), skip it + if custom_content_object["type"] in ( + FileType.INTEGRATION, + FileType.SCRIPT, + ) and custom_content_object.get("code_lang") in (None, "javascript"): + logger.warning( + f"Skipping '{file_name}' as JavaScript content is not supported." + ) + return None - custom_content_objects[file_name] = custom_content_object + return custom_content_object - except Exception as e: - # We fail the whole download process, since we might miss UUIDs to replace if not. - logger.error(f"Error while parsing '{file_name}': {e}") - raise - - logger.info( - f"Successfully parsed {len(custom_content_objects)} custom content objects." - ) - return custom_content_objects + except Exception: + # Re-raise to be handled by the calling parallel processing code + raise def create_custom_content_table( self, custom_content_objects: dict[str, dict] diff --git a/demisto_sdk/commands/download/tests/downloader_test.py b/demisto_sdk/commands/download/tests/downloader_test.py index 181b8644062..a9694d6694c 100644 --- a/demisto_sdk/commands/download/tests/downloader_test.py +++ b/demisto_sdk/commands/download/tests/downloader_test.py @@ -1197,7 +1197,7 @@ def test_uuids_replacement_in_content_items(mocker): ): changed_uuids_count += 1 - assert changed_uuids_count == 7 + assert changed_uuids_count == 6 @pytest.mark.parametrize("content_item_name", ("Test: Test", "[Test] Test")) @@ -1289,6 +1289,77 @@ def test_uuids_replacement_in_content_items_with_quoted_id_field( ) +def test_playbook_task_uuids_preserved_during_uuid_replacement(repo): + """ + Given: + A playbook with tasks that have taskid and task.id UUIDs, and a playbook ID that is also a UUID. + When: + Calling 'replace_uuid_ids' method with a mapping that includes the playbook ID. + Then: + - Ensure the playbook ID is replaced with the playbook name. + - Ensure the taskid and task.id UUIDs are NOT replaced and remain as-is. + This prevents the format command from regenerating new UUIDs for every task on each download. + """ + playbook_uuid = "d470522f-0a68-43c7-a62f-224f04b2e0c9" + task_uuid = "a1234567-1234-1234-1234-123456789012" + playbook_name = "Test Playbook" + + repo = repo.create_pack() + playbook_data = { + "name": playbook_name, + "id": playbook_uuid, + "tasks": { + "0": { + "id": "0", + "taskid": task_uuid, + "type": "start", + "task": { + "id": task_uuid, + "version": -1, + "name": "", + }, + } + }, + } + playbook: Playbook = repo.create_playbook(yml=playbook_data) + + downloader = Downloader( + all_custom_content=True, + auto_replace_uuids=True, + ) + + file_name = playbook.obj_path.name + playbook_content = playbook.obj_path.read_bytes() + file_object = downloader.create_content_item_object( + file_name=file_name, + file_data=StringIO(safe_read_unicode(playbook_content)), + _loaded_data=playbook_data, + ) + custom_content_objects = {file_name: file_object} + + # Create a uuid_mapping that includes both the playbook ID and task ID + uuid_mapping = { + playbook_uuid: playbook_name, + task_uuid: "Should Not Be Replaced", # This should NOT be used for task UUIDs + } + + downloader.replace_uuid_ids( + custom_content_objects=custom_content_objects, uuid_mapping=uuid_mapping + ) + + updated_content = file_object["file"].getvalue() + updated_data = file_object["data"] + + # Assert the playbook ID was replaced + assert f"id: '{playbook_name}'" in updated_content + assert file_object["id"] == playbook_name + + # Assert the task UUIDs were NOT replaced (should remain as the original UUID) + assert task_uuid in updated_content + assert updated_data["tasks"]["0"]["taskid"] == task_uuid + assert updated_data["tasks"]["0"]["task"]["id"] == task_uuid + + def test_get_system_playbooks(mocker): """ Given: