diff --git a/.gitignore b/.gitignore index 46ff33d6..c910e24a 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ cython_debug/ # http traces should only be committed at the fixture root /http_trace.json +/http_trace.json.lock /http_trace.json.gz diff --git a/src/fabric_cicd/__init__.py b/src/fabric_cicd/__init__.py index 1b007ce8..53df2e13 100644 --- a/src/fabric_cicd/__init__.py +++ b/src/fabric_cicd/__init__.py @@ -9,6 +9,7 @@ import fabric_cicd.constants as constants from fabric_cicd._common._check_utils import check_version from fabric_cicd._common._logging import configure_logger, exception_handler +from fabric_cicd.constants import FeatureFlag, ItemType from fabric_cicd.fabric_workspace import FabricWorkspace from fabric_cicd.publish import deploy_with_config, publish_all_items, unpublish_all_orphan_items @@ -56,6 +57,8 @@ def change_log_level(level: str = "DEBUG") -> None: __all__ = [ "FabricWorkspace", + "FeatureFlag", + "ItemType", "append_feature_flag", "change_log_level", "deploy_with_config", diff --git a/src/fabric_cicd/_common/_exceptions.py b/src/fabric_cicd/_common/_exceptions.py index e9b4b792..6ae23c95 100644 --- a/src/fabric_cicd/_common/_exceptions.py +++ b/src/fabric_cicd/_common/_exceptions.py @@ -56,3 +56,24 @@ class FailedPublishedItemStatusError(BaseCustomError): class MissingFileError(BaseCustomError): pass + + +class PublishError(BaseCustomError): + """Exception raised when one or more publish operations fail. + + Attributes: + errors: List of (item_name, exception) tuples for all failed items. + """ + + def __init__(self, errors: list[tuple[str, Exception]], logger: Logger) -> None: + """Initialize with a list of (item_name, exception) tuples.""" + self.errors = errors + failed_names = [name for name, _ in errors] + message = f"Failed to publish {len(errors)} item(s): {failed_names}" + + additional_info_parts = [] + for item_name, exc in errors: + additional_info_parts.append(f"\n--- {item_name} ---\n{exc!s}") + additional_info = "\n".join(additional_info_parts) if additional_info_parts else None + + super().__init__(message, logger, additional_info) diff --git a/src/fabric_cicd/_common/_file_lock.py b/src/fabric_cicd/_common/_file_lock.py new file mode 100644 index 00000000..c89f8c08 --- /dev/null +++ b/src/fabric_cicd/_common/_file_lock.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Cross-platform file locking.""" + +import sys +from pathlib import Path +from types import TracebackType +from typing import Callable, Optional, TypeVar + +T = TypeVar("T") + + +class FileLock: + """File lock context manager.""" + + def __init__(self, lock_file: str) -> None: + self.lock_path = Path(f"{lock_file}.lock") + self._lock_file: Optional[object] = None + + def __enter__(self) -> "FileLock": + self._lock_file = self.lock_path.open("w") + if sys.platform == "win32": + import msvcrt + + msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_LOCK, 1) + else: + import fcntl + + fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_EX) + return self + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> bool: + if self._lock_file: + if sys.platform == "win32": + import msvcrt + + msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_UNLCK, 1) + else: + import fcntl + + fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN) + self._lock_file.close() + return False + + @staticmethod + def run_with_lock(lock_file: str, func: Callable[[], T]) -> T: + """ + Execute a function while holding an exclusive file lock. + + Args: + lock_file: Path to the file to lock (a .lock suffix will be added) + func: The function to execute while holding the lock + + Returns: + The return value of the function + """ + with FileLock(lock_file): + return func() diff --git a/src/fabric_cicd/_common/_http_tracer.py b/src/fabric_cicd/_common/_http_tracer.py index 9e71082c..56d839e2 100644 --- a/src/fabric_cicd/_common/_http_tracer.py +++ b/src/fabric_cicd/_common/_http_tracer.py @@ -16,6 +16,7 @@ import requests +from fabric_cicd._common._file_lock import FileLock from fabric_cicd.constants import AUTHORIZATION_HEADER, EnvVar logger = logging.getLogger(__name__) @@ -192,40 +193,43 @@ def save(self) -> None: return try: - output_path = Path(self.output_file) - existing_traces: list[dict] = [] - if output_path.exists(): - with output_path.open("r") as f: - existing_data = json.load(f) - existing_traces = existing_data.get("traces", []) - - for capture in self.captures: - request_b64 = capture.get("request_b64", "") - response_b64 = capture.get("response_b64", "") - - request_data = None - response_data = None - - if request_b64: - request_data = json.loads(base64.b64decode(request_b64).decode()) - if response_b64: - response_data = json.loads(base64.b64decode(response_b64).decode()) - - existing_traces.append({"request": request_data, "response": response_data}) - - existing_traces.sort(key=lambda x: x["request"].get("timestamp", "") if x.get("request") else "") - output_data = { - "description": "HTTP trace data from Fabric API interactions", - "total_traces": len(existing_traces), - "traces": existing_traces, - } - - with output_path.open("w") as f: - json.dump(output_data, f, indent=2) - + FileLock.run_with_lock(self.output_file, self._flush_traces_to_file) except Exception as e: logger.warning(f"Failed to save HTTP trace: {e}") + def _flush_traces_to_file(self) -> None: + """Flush captured traces to the output file (called within lock).""" + output_path = Path(self.output_file) + existing_traces: list[dict] = [] + if output_path.exists() and output_path.stat().st_size > 0: + with output_path.open("r") as f: + existing_data = json.load(f) + existing_traces = existing_data.get("traces", []) + + for capture in self.captures: + request_b64 = capture.get("request_b64", "") + response_b64 = capture.get("response_b64", "") + + request_data = None + response_data = None + + if request_b64: + request_data = json.loads(base64.b64decode(request_b64).decode()) + if response_b64: + response_data = json.loads(base64.b64decode(response_b64).decode()) + + existing_traces.append({"request": request_data, "response": response_data}) + + existing_traces.sort(key=lambda x: x["request"].get("timestamp", "") if x.get("request") else "") + output_data = { + "description": "HTTP trace data from Fabric API interactions", + "total_traces": len(existing_traces), + "traces": existing_traces, + } + + with output_path.open("w") as f: + json.dump(output_data, f, indent=2) + class HTTPTracerFactory: """Factory class for creating HTTP tracer instances.""" diff --git a/src/fabric_cicd/_common/_validate_input.py b/src/fabric_cicd/_common/_validate_input.py index 4a32e692..cc9b411e 100644 --- a/src/fabric_cicd/_common/_validate_input.py +++ b/src/fabric_cicd/_common/_validate_input.py @@ -16,6 +16,7 @@ import fabric_cicd.constants as constants from fabric_cicd._common._exceptions import InputError +from fabric_cicd.constants import FeatureFlag, OperationType from fabric_cicd.fabric_workspace import FabricWorkspace logger = logging.getLogger(__name__) @@ -156,3 +157,98 @@ def validate_token_credential(input_value: TokenCredential) -> TokenCredential: validate_data_type("TokenCredential", "credential", input_value) return input_value + + +def validate_experimental_param( + param_value: Optional[str], + required_flag: "FeatureFlag", + warning_message: str, + risk_warning: str, +) -> None: + """ + Generic validation for optional parameters requiring experimental feature flags. + + Args: + param_value: The parameter value (None means skip validation). + required_flag: The specific feature flag required (in addition to experimental). + warning_message: Primary warning message when feature is enabled. + risk_warning: Risk/caution warning message. + + Raises: + InputError: If required feature flags are not enabled. + """ + from fabric_cicd.constants import FeatureFlag + + if param_value is None: + return + + if ( + FeatureFlag.ENABLE_EXPERIMENTAL_FEATURES.value not in constants.FEATURE_FLAG + or required_flag.value not in constants.FEATURE_FLAG + ): + msg = f"Feature flags 'enable_experimental_features' and '{required_flag.value}' must be set." + raise InputError(msg, logger) + + logger.warning(warning_message) + logger.warning(risk_warning) + + +def validate_items_to_include(items_to_include: Optional[list[str]], operation: "OperationType") -> None: + """ + Validate items_to_include parameter and check required feature flags. + + Args: + items_to_include: List of items in "item_name.item_type" format, or None. + operation: The type of operation being performed (publish or unpublish). + + Raises: + InputError: If required feature flags are not enabled. + """ + from fabric_cicd.constants import FeatureFlag + + validate_experimental_param( + param_value=items_to_include, + required_flag=FeatureFlag.ENABLE_ITEMS_TO_INCLUDE, + warning_message=f"Selective {operation.value} is enabled.", + risk_warning=f"Using items_to_include is risky as it can prevent needed dependencies from being {operation.value}. Use at your own risk.", + ) + + +def validate_folder_path_exclude_regex(folder_path_exclude_regex: Optional[str]) -> None: + """ + Validate folder_path_exclude_regex parameter and check required feature flags. + + Args: + folder_path_exclude_regex: Regex pattern to exclude items based on their folder path, or None. + + Raises: + InputError: If required feature flags are not enabled. + """ + from fabric_cicd.constants import FeatureFlag + + validate_experimental_param( + param_value=folder_path_exclude_regex, + required_flag=FeatureFlag.ENABLE_EXCLUDE_FOLDER, + warning_message="Folder path exclusion is enabled.", + risk_warning="Using folder_path_exclude_regex is risky as it can prevent needed dependencies from being deployed. Use at your own risk.", + ) + + +def validate_shortcut_exclude_regex(shortcut_exclude_regex: Optional[str]) -> None: + """ + Validate shortcut_exclude_regex parameter and check required feature flags. + + Args: + shortcut_exclude_regex: Regex pattern to exclude specific shortcuts from being published, or None. + + Raises: + InputError: If required feature flags are not enabled. + """ + from fabric_cicd.constants import FeatureFlag + + validate_experimental_param( + param_value=shortcut_exclude_regex, + required_flag=FeatureFlag.ENABLE_SHORTCUT_EXCLUDE, + warning_message="Shortcut exclusion is enabled.", + risk_warning="Using shortcut_exclude_regex will selectively exclude shortcuts from being deployed to lakehouses. Use with caution.", + ) diff --git a/src/fabric_cicd/_items/__init__.py b/src/fabric_cicd/_items/__init__.py index b1dd8107..939d464a 100644 --- a/src/fabric_cicd/_items/__init__.py +++ b/src/fabric_cicd/_items/__init__.py @@ -1,60 +1,11 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from fabric_cicd._items._activator import publish_activators -from fabric_cicd._items._apacheairflowjob import publish_apacheairflowjobs -from fabric_cicd._items._copyjob import publish_copyjobs -from fabric_cicd._items._dataagent import publish_dataagents -from fabric_cicd._items._dataflowgen2 import publish_dataflows -from fabric_cicd._items._datapipeline import find_referenced_datapipelines, publish_datapipelines -from fabric_cicd._items._environment import check_environment_publish_state, publish_environments -from fabric_cicd._items._eventhouse import publish_eventhouses -from fabric_cicd._items._eventstream import publish_eventstreams -from fabric_cicd._items._graphqlapi import publish_graphqlapis -from fabric_cicd._items._kqldashboard import publish_kqldashboard -from fabric_cicd._items._kqldatabase import publish_kqldatabases -from fabric_cicd._items._kqlqueryset import publish_kqlquerysets -from fabric_cicd._items._lakehouse import publish_lakehouses -from fabric_cicd._items._manage_dependencies import set_unpublish_order -from fabric_cicd._items._mirroreddatabase import publish_mirroreddatabase -from fabric_cicd._items._mlexperiment import publish_mlexperiments -from fabric_cicd._items._mounteddatafactory import publish_mounteddatafactories -from fabric_cicd._items._notebook import publish_notebooks -from fabric_cicd._items._report import publish_reports -from fabric_cicd._items._semanticmodel import publish_semanticmodels -from fabric_cicd._items._sparkjobdefinition import publish_sparkjobdefinitions -from fabric_cicd._items._sqldatabase import publish_sqldatabases -from fabric_cicd._items._userdatafunction import publish_userdatafunctions -from fabric_cicd._items._variablelibrary import publish_variablelibraries -from fabric_cicd._items._warehouse import publish_warehouses +from fabric_cicd._common._exceptions import PublishError +from fabric_cicd._items._base_publisher import ItemPublisher, ParallelConfig __all__ = [ - "check_environment_publish_state", - "find_referenced_datapipelines", - "publish_activators", - "publish_apacheairflowjobs", - "publish_copyjobs", - "publish_dataagents", - "publish_dataflows", - "publish_datapipelines", - "publish_environments", - "publish_eventhouses", - "publish_eventstreams", - "publish_graphqlapis", - "publish_kqldashboard", - "publish_kqldatabases", - "publish_kqlquerysets", - "publish_lakehouses", - "publish_mirroreddatabase", - "publish_mlexperiments", - "publish_mounteddatafactories", - "publish_notebooks", - "publish_reports", - "publish_semanticmodels", - "publish_sparkjobdefinitions", - "publish_sqldatabases", - "publish_userdatafunctions", - "publish_variablelibraries", - "publish_warehouses", - "set_unpublish_order", + "ItemPublisher", + "ParallelConfig", + "PublishError", ] diff --git a/src/fabric_cicd/_items/_activator.py b/src/fabric_cicd/_items/_activator.py index 999ca150..1285ce44 100644 --- a/src/fabric_cicd/_items/_activator.py +++ b/src/fabric_cicd/_items/_activator.py @@ -3,21 +3,11 @@ """Functions to process and deploy Reflex item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class ActivatorPublisher(ItemPublisher): + """Publisher for Reflex AKA Activator items.""" - -def publish_activators(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all reflex items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "Reflex" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.REFLEX.value diff --git a/src/fabric_cicd/_items/_apacheairflowjob.py b/src/fabric_cicd/_items/_apacheairflowjob.py index 6fce26cd..bb4f6fb5 100644 --- a/src/fabric_cicd/_items/_apacheairflowjob.py +++ b/src/fabric_cicd/_items/_apacheairflowjob.py @@ -3,21 +3,11 @@ """Functions to process and deploy Apache Airflow Job item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class ApacheAirflowJobPublisher(ItemPublisher): + """Publisher for Apache Airflow Job items.""" - -def publish_apacheairflowjobs(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all Apache Airflow job items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "ApacheAirflowJob" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.APACHE_AIRFLOW_JOB.value diff --git a/src/fabric_cicd/_items/_base_publisher.py b/src/fabric_cicd/_items/_base_publisher.py new file mode 100644 index 00000000..fbbc5ee8 --- /dev/null +++ b/src/fabric_cicd/_items/_base_publisher.py @@ -0,0 +1,488 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Base interface for all item publishers.""" + +import logging +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from typing import Callable, Optional + +from fabric_cicd._common._exceptions import PublishError +from fabric_cicd._common._item import Item +from fabric_cicd.constants import ItemType +from fabric_cicd.fabric_workspace import FabricWorkspace + +logger = logging.getLogger(__name__) + + +@dataclass +class ParallelConfig: + """Configuration for parallel execution behavior of a publisher. + + This dataclass controls how the base ItemPublisher.publish_all() method + executes publish_one() calls - either in parallel or sequentially. + + Attributes: + enabled: If True, publish_one calls run in parallel using ThreadPoolExecutor. + If False, items are published sequentially. Default is True. + max_workers: Maximum number of concurrent threads. None means use ThreadPoolExecutor default. + ordered_items_func: Optional callable that returns an ordered list of item names. + When provided, items are published sequentially in this order. + This takes precedence over `enabled=True`. + """ + + enabled: bool = True + max_workers: Optional[int] = None + ordered_items_func: Optional[Callable[["ItemPublisher"], list[str]]] = None + + +class Publisher(ABC): + """Base interface for all publishers.""" + + def __init__(self, fabric_workspace_obj: "FabricWorkspace") -> None: + """ + Initialize the publisher with a FabricWorkspace object. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing items to be published. + """ + self.fabric_workspace_obj = fabric_workspace_obj + + @abstractmethod + def publish_one(self, name: str, obj: object) -> None: + """ + Publish a single object. + + Args: + name: The name of the object to publish. + obj: The object to publish. + """ + raise NotImplementedError + + @abstractmethod + def publish_all(self) -> None: + """Publish all objects.""" + raise NotImplementedError + + +class ItemPublisher(Publisher): + """ + Base interface for all item type publishers. + + Provides a default parallel publish_all() implementation that: + - Executes publish_one() calls in parallel using ThreadPoolExecutor + - Aggregates errors from all failed items into a single PublishError + - Supports pre/post hooks via pre_publish_all() and post_publish_all() + - Can be configured via the parallel_config class attribute + + Subclasses can customize behavior by: + - Setting parallel_config to control parallelization + - Overriding pre_publish_all() for setup before publishing + - Overriding post_publish_all() for cleanup after publishing + - Overriding get_items_to_publish() to filter or order items + - Overriding get_unpublish_order() for dependency-aware unpublishing + - Overriding post_publish_all_check() for async publish state verification + + Publish Lifecycle: + 1. pre_publish_all() + 2. get_items_to_publish() + 3. publish_one() - called for each item + 4. post_publish_all() + 5. post_publish_all_check() - if has_async_publish_check + + Unpublish Hook: + - get_unpublish_order() - if has_dependency_tracking + """ + + # region Class Attributes + + item_type: str + """Mandatory property to be set by each publisher subclass""" + + parallel_config: ParallelConfig = ParallelConfig() + """Configuration for parallel execution - subclasses can override with their own ParallelConfig""" + + has_async_publish_check: bool = False + """Set to True if this publisher implements post_publish_all_check() for async state verification""" + + has_dependency_tracking: bool = False + """Set to True if this publisher implements get_unpublish_order() for dependency ordering""" + + # endregion + + # region Initialization & Factory + + def __init__(self, fabric_workspace_obj: "FabricWorkspace") -> None: + """ + Initialize the publisher with a FabricWorkspace object. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing items to be published. + """ + super().__init__(fabric_workspace_obj) + + @staticmethod + def create(item_type: ItemType, fabric_workspace_obj: "FabricWorkspace") -> "ItemPublisher": + """ + Factory method to create the appropriate publisher for a given item type. + + Args: + item_type: The ItemType enum value for which to create a publisher. + fabric_workspace_obj: The FabricWorkspace object containing items to be published. + + Returns: + An instance of the appropriate ItemPublisher subclass. + + Raises: + ValueError: If the item type is not supported. + """ + from fabric_cicd._items._activator import ActivatorPublisher + from fabric_cicd._items._apacheairflowjob import ApacheAirflowJobPublisher + from fabric_cicd._items._copyjob import CopyJobPublisher + from fabric_cicd._items._dataagent import DataAgentPublisher + from fabric_cicd._items._dataflowgen2 import DataflowPublisher + from fabric_cicd._items._datapipeline import DataPipelinePublisher + from fabric_cicd._items._environment import EnvironmentPublisher + from fabric_cicd._items._eventhouse import EventhousePublisher + from fabric_cicd._items._eventstream import EventstreamPublisher + from fabric_cicd._items._graphqlapi import GraphQLApiPublisher + from fabric_cicd._items._kqldashboard import KQLDashboardPublisher + from fabric_cicd._items._kqldatabase import KQLDatabasePublisher + from fabric_cicd._items._kqlqueryset import KQLQuerysetPublisher + from fabric_cicd._items._lakehouse import LakehousePublisher + from fabric_cicd._items._mirroreddatabase import MirroredDatabasePublisher + from fabric_cicd._items._mlexperiment import MLExperimentPublisher + from fabric_cicd._items._mounteddatafactory import MountedDataFactoryPublisher + from fabric_cicd._items._notebook import NotebookPublisher + from fabric_cicd._items._report import ReportPublisher + from fabric_cicd._items._semanticmodel import SemanticModelPublisher + from fabric_cicd._items._sparkjobdefinition import SparkJobDefinitionPublisher + from fabric_cicd._items._sqldatabase import SQLDatabasePublisher + from fabric_cicd._items._userdatafunction import UserDataFunctionPublisher + from fabric_cicd._items._variablelibrary import VariableLibraryPublisher + from fabric_cicd._items._warehouse import WarehousePublisher + + publisher_mapping = { + ItemType.VARIABLE_LIBRARY: VariableLibraryPublisher, + ItemType.WAREHOUSE: WarehousePublisher, + ItemType.MIRRORED_DATABASE: MirroredDatabasePublisher, + ItemType.LAKEHOUSE: LakehousePublisher, + ItemType.SQL_DATABASE: SQLDatabasePublisher, + ItemType.ENVIRONMENT: EnvironmentPublisher, + ItemType.USER_DATA_FUNCTION: UserDataFunctionPublisher, + ItemType.EVENTHOUSE: EventhousePublisher, + ItemType.SPARK_JOB_DEFINITION: SparkJobDefinitionPublisher, + ItemType.NOTEBOOK: NotebookPublisher, + ItemType.SEMANTIC_MODEL: SemanticModelPublisher, + ItemType.REPORT: ReportPublisher, + ItemType.COPY_JOB: CopyJobPublisher, + ItemType.KQL_DATABASE: KQLDatabasePublisher, + ItemType.KQL_QUERYSET: KQLQuerysetPublisher, + ItemType.REFLEX: ActivatorPublisher, + ItemType.EVENTSTREAM: EventstreamPublisher, + ItemType.KQL_DASHBOARD: KQLDashboardPublisher, + ItemType.DATAFLOW: DataflowPublisher, + ItemType.DATA_PIPELINE: DataPipelinePublisher, + ItemType.GRAPHQL_API: GraphQLApiPublisher, + ItemType.APACHE_AIRFLOW_JOB: ApacheAirflowJobPublisher, + ItemType.MOUNTED_DATA_FACTORY: MountedDataFactoryPublisher, + ItemType.DATA_AGENT: DataAgentPublisher, + ItemType.ML_EXPERIMENT: MLExperimentPublisher, + } + + publisher_class = publisher_mapping.get(item_type) + if publisher_class is None: + msg = f"No publisher found for item type: {item_type}" + raise ValueError(msg) + + return publisher_class(fabric_workspace_obj) + + @staticmethod + def get_item_types_to_publish(fabric_workspace_obj: "FabricWorkspace") -> list[tuple[int, ItemType]]: + """ + Get the ordered list of item types that should be published. + + Returns item types that are both in scope and have items in the repository, + ordered according to SERIAL_ITEM_PUBLISH_ORDER. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing scope and repository info. + + Returns: + List of (order_num, ItemType) tuples for item types that should be published. + """ + from fabric_cicd import constants + + result = [] + for order_num, item_type in constants.SERIAL_ITEM_PUBLISH_ORDER.items(): + if ( + item_type.value in fabric_workspace_obj.item_type_in_scope + and item_type.value in fabric_workspace_obj.repository_items + ): + result.append((order_num, item_type)) + return result + + @staticmethod + def get_item_types_to_unpublish(fabric_workspace_obj: "FabricWorkspace") -> list[str]: + """ + Get the ordered list of item types that should be unpublished. + + Returns item types in reverse publish order that are in scope, have deployed items, + and meet feature flag requirements. Logs warnings for skipped item types. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing scope and deployed items info. + + Returns: + List of item type strings in the order they should be unpublished. + """ + from fabric_cicd import constants + + unpublish_order = [] + for item_type in reversed(list(constants.SERIAL_ITEM_PUBLISH_ORDER.values())): + if ( + item_type.value in fabric_workspace_obj.item_type_in_scope + and item_type.value in fabric_workspace_obj.deployed_items + ): + unpublish_flag = constants.UNPUBLISH_FLAG_MAPPING.get(item_type.value) + # Append item_type if no feature flag is required or the corresponding flag is enabled + if not unpublish_flag or unpublish_flag in constants.FEATURE_FLAG: + unpublish_order.append(item_type.value) + elif unpublish_flag and unpublish_flag not in constants.FEATURE_FLAG: + # Log warning when unpublish is skipped due to missing feature flag + logger.warning( + f"Skipping unpublish for {item_type.value} items because the '{unpublish_flag}' feature flag is not enabled." + ) + return unpublish_order + + @staticmethod + def get_orphaned_items( + fabric_workspace_obj: "FabricWorkspace", + item_type: str, + item_name_exclude_regex: Optional[str] = None, + items_to_include: Optional[list[str]] = None, + ) -> list[str]: + """ + Get the list of orphaned items that should be unpublished for a given item type. + + Orphaned items are those deployed but not present in the repository, + filtered by exclusion regex or items_to_include list. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing deployed and repository items. + item_type: The item type string to check for orphans. + item_name_exclude_regex: Optional regex pattern to exclude items from unpublishing. + items_to_include: Optional list of items in "name.type" format to include for unpublishing. + + Returns: + List of item names that should be unpublished. + """ + import re + + deployed_names = set(fabric_workspace_obj.deployed_items.get(item_type, {}).keys()) + repository_names = set(fabric_workspace_obj.repository_items.get(item_type, {}).keys()) + to_delete_set = deployed_names - repository_names + + if items_to_include is not None: + # Filter to only items in the include list + return [name for name in to_delete_set if f"{name}.{item_type}" in items_to_include] + if item_name_exclude_regex: + # Filter out items matching the exclude regex + regex_pattern = re.compile(item_name_exclude_regex) + return [name for name in to_delete_set if not regex_pattern.match(name)] + return list(to_delete_set) + + # endregion + + # region Public Methods + + def publish_all(self) -> None: + """ + Execute the publish operation for this item type. + + 1. Calls pre_publish_all() for any setup operations + 2. Gets items via get_items_to_publish() + 3. Publishes items (parallel or sequential based on parallel_config) + 4. Calls post_publish_all() for any finalization + 5. Raises PublishError if any items failed + + The parallel_config class attribute controls execution: + - If ordered_items_func is set: publishes in that order sequentially + - If enabled=True: publishes in parallel + - If enabled=False: publishes sequentially + + Raises: + PublishError: If one or more items failed to publish. + """ + self.pre_publish_all() + items = self.get_items_to_publish() + if not items: + self.post_publish_all() + return + + config = getattr(self.__class__, "parallel_config", ParallelConfig()) + + if config.ordered_items_func is not None: + order = config.ordered_items_func(self) + errors = self._publish_items_ordered(items, order) + elif config.enabled: + errors = self._publish_items_parallel(items) + else: + errors = self._publish_items_sequential(items) + + self.post_publish_all() + + if errors: + raise PublishError(errors, logger) + + def publish_one(self, item_name: str, _item: "Item") -> None: + """ + Publish a single item. + + Args: + item_name: The name of the item to publish. + _item: The Item object to publish. + + Default implementation publishes the item using _publish_item. + Subclasses can override this method for custom publishing logic. + """ + self.fabric_workspace_obj._publish_item(item_name=item_name, item_type=self.item_type) + + def get_items_to_publish(self) -> dict[str, "Item"]: + """ + Get the items to publish for this item type. + + Returns: + Dictionary mapping item names to Item objects. + + Subclasses can override to filter or transform the items. + """ + return self.fabric_workspace_obj.repository_items.get(self.item_type, {}) + + def get_unpublish_order(self, items_to_unpublish: list[str]) -> list[str]: + """ + Get the ordered list of item names based on dependencies for unpublishing. + + Args: + items_to_unpublish: List of item names to be unpublished. + + Returns: + List of item names in the order they should be unpublished (reverse dependency order). + + Default implementation returns items in their original order. + Subclasses with dependency tracking should override for proper ordering. + """ + return items_to_unpublish + + def pre_publish_all(self) -> None: + """ + Hook called before publishing any items. + + Subclasses can override to perform setup, validation, or refresh operations. + Default implementation does nothing. + """ + pass + + def post_publish_all(self) -> None: + """ + Hook called after all items have been published successfully. + + Subclasses can override to perform cleanup, binding, or finalization. + Default implementation does nothing. + """ + pass + + def post_publish_all_check(self) -> None: + """ + Hook called after publish_all completes to verify async publish state. + + Subclasses can override this to check the state of asynchronous publish + operations (e.g., Environment items that have async publish workflows). + Default implementation does nothing. + + This method is called separately from publish_all() and should be invoked + by the orchestration layer after all items of this type have been published. + """ + pass + + # endregion + + # region Publishing + + def _publish_items_parallel(self, items: dict[str, "Item"]) -> list[tuple[str, Exception]]: + """ + Publish items in parallel using ThreadPoolExecutor. + + Args: + items: Dictionary mapping item names to Item objects. + + Returns: + List of (item_name, exception) tuples for failed items. + """ + errors: list[tuple[str, Exception]] = [] + config = getattr(self.__class__, "parallel_config", ParallelConfig()) + + with ThreadPoolExecutor(max_workers=config.max_workers) as executor: + futures = { + executor.submit(self.publish_one, item_name, item): (item_name, item) + for item_name, item in items.items() + } + + for future in as_completed(futures): + item_name, _ = futures[future] + try: + future.result() + except Exception as e: + logger.error(f"Failed to publish {self.item_type} '{item_name}': {e}") + errors.append((item_name, e)) + + return errors + + def _publish_items_sequential(self, items: dict[str, "Item"]) -> list[tuple[str, Exception]]: + """ + Publish items sequentially. + + Args: + items: Dictionary mapping item names to Item objects. + + Returns: + List of (item_name, exception) tuples for failed items. + """ + errors: list[tuple[str, Exception]] = [] + + for item_name, item in items.items(): + try: + self.publish_one(item_name, item) + except Exception as e: + logger.error(f"Failed to publish {self.item_type} '{item_name}': {e}") + errors.append((item_name, e)) + + return errors + + def _publish_items_ordered(self, items: dict[str, "Item"], order: list[str]) -> list[tuple[str, Exception]]: + """ + Publish items in a specific order sequentially. + + Args: + items: Dictionary mapping item names to Item objects. + order: List of item names in the order they should be published. + + Returns: + List of (item_name, exception) tuples for failed items. + """ + errors: list[tuple[str, Exception]] = [] + + for item_name in order: + if item_name in items: + item = items[item_name] + try: + self.publish_one(item_name, item) + except Exception as e: + logger.error(f"Failed to publish {self.item_type} '{item_name}': {e}") + errors.append((item_name, e)) + + return errors + + # endregion diff --git a/src/fabric_cicd/_items/_copyjob.py b/src/fabric_cicd/_items/_copyjob.py index fd191ea8..26080dbf 100644 --- a/src/fabric_cicd/_items/_copyjob.py +++ b/src/fabric_cicd/_items/_copyjob.py @@ -3,21 +3,11 @@ """Functions to process and deploy Copy Job item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class CopyJobPublisher(ItemPublisher): + """Publisher for Copy Job items.""" - -def publish_copyjobs(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all copyjob items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "CopyJob" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.COPY_JOB.value diff --git a/src/fabric_cicd/_items/_dataagent.py b/src/fabric_cicd/_items/_dataagent.py index 5f8a33f7..b7046bbc 100644 --- a/src/fabric_cicd/_items/_dataagent.py +++ b/src/fabric_cicd/_items/_dataagent.py @@ -5,20 +5,20 @@ import logging -from fabric_cicd import FabricWorkspace +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import EXCLUDE_PATH_REGEX_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_dataagents(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all data agent items from the repository. +class DataAgentPublisher(ItemPublisher): + """Publisher for Data Agent items.""" - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "DataAgent" + item_type = ItemType.DATA_AGENT.value - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - exclude_path = r".*\.pbi[/\\].*" - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type, exclude_path=exclude_path) + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Data Agent item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, exclude_path=EXCLUDE_PATH_REGEX_MAPPING.get(self.item_type) + ) diff --git a/src/fabric_cicd/_items/_dataflowgen2.py b/src/fabric_cicd/_items/_dataflowgen2.py index f32be62b..7d38660f 100644 --- a/src/fabric_cicd/_items/_dataflowgen2.py +++ b/src/fabric_cicd/_items/_dataflowgen2.py @@ -10,34 +10,18 @@ from fabric_cicd._common._exceptions import ParsingError from fabric_cicd._common._file import File from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher, ParallelConfig from fabric_cicd._parameter._utils import ( check_replacement, extract_find_value, extract_parameter_filters, extract_replace_value, ) +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_dataflows(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all dataflow items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "Dataflow" - - # Set the publish order based on dependencies (when dataflow references another dataflow) - publish_order = set_dataflow_publish_order(fabric_workspace_obj, item_type) - - for item_name in publish_order: - fabric_workspace_obj._publish_item( - item_name=item_name, item_type=item_type, func_process_file=func_process_file - ) - - def set_dataflow_publish_order(workspace_obj: FabricWorkspace, item_type: str) -> list[str]: """ Sets the publish order where the source dataflow, if present always proceeds the referencing dataflow. @@ -184,7 +168,9 @@ def get_source_dataflow_name( for param in workspace_obj.environment_parameter.get("find_replace", []): # Extract values from the parameter input_type, input_name, input_path = extract_parameter_filters(workspace_obj, param) - filter_match = check_replacement(input_type, input_name, input_path, "Dataflow", item_name, file_path) + filter_match = check_replacement( + input_type, input_name, input_path, ItemType.DATAFLOW.value, item_name, file_path + ) find_info = extract_find_value(param, file_content, filter_match) # Skip if this parameter doesn't match the dataflow ID @@ -241,7 +227,9 @@ def replace_source_dataflow_ids(workspace_obj: FabricWorkspace, item_obj: Item, source_dataflow_id = source_dataflow_info["source_id"] # Get the logical ID of the source dataflow from repository items - logical_id = workspace_obj.repository_items.get("Dataflow", {}).get(source_dataflow_name, {}).logical_id + logical_id = ( + workspace_obj.repository_items.get(ItemType.DATAFLOW.value, {}).get(source_dataflow_name, {}).logical_id + ) # Replace the dataflow ID with its logical ID and the workspace ID with the default workspace ID if logical_id: @@ -254,3 +242,23 @@ def replace_source_dataflow_ids(workspace_obj: FabricWorkspace, item_obj: Item, ) return file_obj.contents + + +def _get_dataflow_publish_order(publisher: "DataflowPublisher") -> list[str]: + """Get the ordered list of dataflow names based on dependencies.""" + return set_dataflow_publish_order(publisher.fabric_workspace_obj, publisher.item_type) + + +class DataflowPublisher(ItemPublisher): + """Publisher for Dataflow items.""" + + item_type = ItemType.DATAFLOW.value + + parallel_config = ParallelConfig(enabled=False, ordered_items_func=_get_dataflow_publish_order) + """Dataflows must be published in dependency order (sequential)""" + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Dataflow item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, func_process_file=func_process_file + ) diff --git a/src/fabric_cicd/_items/_datapipeline.py b/src/fabric_cicd/_items/_datapipeline.py index e7fd778a..eb7b5c66 100644 --- a/src/fabric_cicd/_items/_datapipeline.py +++ b/src/fabric_cicd/_items/_datapipeline.py @@ -9,30 +9,14 @@ import dpath from fabric_cicd import FabricWorkspace, constants -from fabric_cicd._items._manage_dependencies import set_publish_order +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher, ParallelConfig +from fabric_cicd._items._manage_dependencies import set_publish_order, set_unpublish_order +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_datapipelines(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all data pipeline items from the repository in the correct order based on their dependencies. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "DataPipeline" - - # Set the order of data pipelines to be published based on their dependencies - publish_order = set_publish_order(fabric_workspace_obj, item_type, find_referenced_datapipelines) - - fabric_workspace_obj._refresh_deployed_items() - - # Publish - for item_name in publish_order: - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) - - def find_referenced_datapipelines(fabric_workspace_obj: FabricWorkspace, file_content: dict, lookup_type: str) -> list: """ Scan through pipeline file json dictionary and find pipeline references (including nested pipelines). @@ -42,7 +26,7 @@ def find_referenced_datapipelines(fabric_workspace_obj: FabricWorkspace, file_co file_content: Dict representation of the pipeline-content file. lookup_type: Finding references in deployed file or repo file (Deployed or Repository). """ - item_type = "DataPipeline" + item_type = ItemType.DATA_PIPELINE.value reference_list = [] guid_pattern = re.compile(constants.VALID_GUID_REGEX) @@ -61,3 +45,40 @@ def find_referenced_datapipelines(fabric_workspace_obj: FabricWorkspace, file_co reference_list.append(referenced_name) return reference_list + + +def _get_datapipeline_publish_order(publisher: "DataPipelinePublisher") -> list[str]: + """Get the ordered list of data pipeline names based on dependencies.""" + return set_publish_order(publisher.fabric_workspace_obj, publisher.item_type, find_referenced_datapipelines) + + +class DataPipelinePublisher(ItemPublisher): + """Publisher for Data Pipeline items.""" + + item_type = ItemType.DATA_PIPELINE.value + has_dependency_tracking = True + + parallel_config = ParallelConfig(enabled=False, ordered_items_func=_get_datapipeline_publish_order) + """Pipelines must be published in dependency order (sequential)""" + + def get_unpublish_order(self, items_to_unpublish: list[str]) -> list[str]: + """ + Get the ordered list of item names based on dependencies for unpublishing. + + Args: + items_to_unpublish: List of item names to be unpublished. + + Returns: + List of item names in the order they should be unpublished (reverse dependency order). + """ + return set_unpublish_order( + self.fabric_workspace_obj, self.item_type, items_to_unpublish, find_referenced_datapipelines + ) + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Data Pipeline item.""" + self.fabric_workspace_obj._publish_item(item_name=item_name, item_type=self.item_type) + + def pre_publish_all(self) -> None: + """Refresh deployed items before publishing to resolve references.""" + self.fabric_workspace_obj._refresh_deployed_items() diff --git a/src/fabric_cicd/_items/_environment.py b/src/fabric_cicd/_items/_environment.py index 4556beed..284640fd 100644 --- a/src/fabric_cicd/_items/_environment.py +++ b/src/fabric_cicd/_items/_environment.py @@ -14,47 +14,12 @@ from fabric_cicd._common._exceptions import MissingFileError from fabric_cicd._common._fabric_endpoint import handle_retry from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import EXCLUDE_PATH_REGEX_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_environments(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all environment items from the repository. - - Environments are deployed using the updateDefinition API, and then compute settings and libraries are published separately. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - logger.warning("The underlying legacy Microsoft Fabric Environment APIs will be deprecated by March 1, 2026.") - logger.warning( - "Please upgrade to the latest fabric-cicd version before March 1, 2026 to prevent broken Environment item deployments." - ) - - # Check for ongoing publish - check_environment_publish_state(fabric_workspace_obj, True) - - item_type = "Environment" - for item_name, item in fabric_workspace_obj.repository_items.get(item_type, {}).items(): - # Only deploy the environment shell when it just contains spark compute settings - is_shell_only = set_environment_deployment_type(item) - logger.debug(f"Environment '{item_name}'; shell_only deployment: {is_shell_only}") - - # Exclude Sparkcompute.yml from environment definition deployment (requires special handling) - exclude_path = r"\Setting" - fabric_workspace_obj._publish_item( - item_name=item_name, - item_type=item_type, - exclude_path=exclude_path, - skip_publish_logging=True, - shell_only_publish=is_shell_only, - ) - if item.skip_publish: - continue - _publish_environment_metadata(fabric_workspace_obj, item_name) - - def set_environment_deployment_type(item: Item) -> bool: """ Return True if this Environment deployment should be treated as "shell-only". @@ -95,9 +60,9 @@ def set_environment_deployment_type(item: Item) -> bool: return shell_only -def check_environment_publish_state(fabric_workspace_obj: FabricWorkspace, initial_check: bool = False) -> None: +def _check_environment_publish_state(fabric_workspace_obj: FabricWorkspace, initial_check: bool = False) -> None: """ - Checks the publish state of environments after deployment + Checks the publish state of environments after deployment. Args: fabric_workspace_obj: The FabricWorkspace object. @@ -106,7 +71,7 @@ def check_environment_publish_state(fabric_workspace_obj: FabricWorkspace, initi ongoing_publish = True iteration = 1 - environments = fabric_workspace_obj.repository_items.get("Environment", {}) + environments = fabric_workspace_obj.repository_items.get(ItemType.ENVIRONMENT.value, {}) filtered_environments = [ k for k in environments @@ -179,7 +144,7 @@ def _publish_environment_metadata(fabric_workspace_obj: FabricWorkspace, item_na item_name: Name of the environment item whose compute settings are to be published. is_excluded: Flag indicating if Sparkcompute.yml was excluded from definition deployment. """ - item_type = "Environment" + item_type = ItemType.ENVIRONMENT.value item_guid = fabric_workspace_obj.repository_items[item_type][item_name].guid # Update compute settings @@ -208,7 +173,7 @@ def _update_compute_settings(fabric_workspace_obj: FabricWorkspace, item_guid: s # Get Setting/Sparkcompute.yml content from repository yaml_contents = None - item = fabric_workspace_obj.repository_items["Environment"][item_name] + item = fabric_workspace_obj.repository_items[ItemType.ENVIRONMENT.value][item_name] item_files = item.item_files for file in item_files: if file.file_path.name == "Sparkcompute.yml" and file.file_path.parent.name == "Setting": @@ -280,3 +245,34 @@ def _convert_environment_compute_to_camel(fabric_workspace_obj: FabricWorkspace, new_input_dict[new_key] = value return new_input_dict + + +class EnvironmentPublisher(ItemPublisher): + """Publisher for Environment items.""" + + item_type = ItemType.ENVIRONMENT.value + has_async_publish_check = True + + def publish_one(self, item_name: str, item: Item) -> None: + """Publish a single Environment item.""" + is_shell_only = set_environment_deployment_type(item) + logger.debug(f"Environment '{item_name}'; shell_only deployment: {is_shell_only}") + + self.fabric_workspace_obj._publish_item( + item_name=item_name, + item_type=self.item_type, + exclude_path=EXCLUDE_PATH_REGEX_MAPPING.get(self.item_type), + skip_publish_logging=True, + shell_only_publish=is_shell_only, + ) + if item.skip_publish: + return + _publish_environment_metadata(self.fabric_workspace_obj, item_name) + + def pre_publish_all(self) -> None: + """Check environment publish state before publishing.""" + _check_environment_publish_state(self.fabric_workspace_obj, True) + + def post_publish_all_check(self) -> None: + """Check environment publish state after all environments have been published.""" + _check_environment_publish_state(self.fabric_workspace_obj, False) diff --git a/src/fabric_cicd/_items/_eventhouse.py b/src/fabric_cicd/_items/_eventhouse.py index 6ca16900..11e33329 100644 --- a/src/fabric_cicd/_items/_eventhouse.py +++ b/src/fabric_cicd/_items/_eventhouse.py @@ -5,20 +5,20 @@ import logging -from fabric_cicd import FabricWorkspace +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import EXCLUDE_PATH_REGEX_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_eventhouses(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all eventhouse items from the repository. +class EventhousePublisher(ItemPublisher): + """Publisher for Eventhouse items.""" - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "Eventhouse" + item_type = ItemType.EVENTHOUSE.value - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - exclude_path = r".*\.children[/\\].*" - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type, exclude_path=exclude_path) + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Eventhouse item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, exclude_path=EXCLUDE_PATH_REGEX_MAPPING.get(self.item_type) + ) diff --git a/src/fabric_cicd/_items/_eventstream.py b/src/fabric_cicd/_items/_eventstream.py index 6ad43119..23c6dfbc 100644 --- a/src/fabric_cicd/_items/_eventstream.py +++ b/src/fabric_cicd/_items/_eventstream.py @@ -3,21 +3,11 @@ """Functions to process and deploy Eventstream item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class EventstreamPublisher(ItemPublisher): + """Publisher for Eventstream items.""" - -def publish_eventstreams(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all eventstream items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "Eventstream" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.EVENTSTREAM.value diff --git a/src/fabric_cicd/_items/_graphqlapi.py b/src/fabric_cicd/_items/_graphqlapi.py index 45d1dfca..685259cb 100644 --- a/src/fabric_cicd/_items/_graphqlapi.py +++ b/src/fabric_cicd/_items/_graphqlapi.py @@ -3,21 +3,11 @@ """Functions to process and deploy API for GraphQL item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class GraphQLApiPublisher(ItemPublisher): + """Publisher for GraphQL API items.""" - -def publish_graphqlapis(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all graphqlapi items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "GraphQLApi" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.GRAPHQL_API.value diff --git a/src/fabric_cicd/_items/_kqldashboard.py b/src/fabric_cicd/_items/_kqldashboard.py index 2a6d2ee1..e8a023a8 100644 --- a/src/fabric_cicd/_items/_kqldashboard.py +++ b/src/fabric_cicd/_items/_kqldashboard.py @@ -10,27 +10,12 @@ from fabric_cicd._common._exceptions import ParsingError from fabric_cicd._common._file import File from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_kqldashboard(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all Real-Time Dashboard items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "KQLDashboard" - - fabric_workspace_obj._refresh_deployed_items() - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item( - item_name=item_name, item_type=item_type, func_process_file=func_process_file - ) - - def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: File) -> str: """ Custom file processing for KQL Dashboard items. @@ -41,7 +26,11 @@ def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: file_obj: The file object. """ # For KQL Dashboard, we do not need to process the file content - return replace_cluster_uri(workspace_obj, file_obj) if item_obj.type == "KQLDashboard" else file_obj.contents + return ( + replace_cluster_uri(workspace_obj, file_obj) + if item_obj.type == ItemType.KQL_DASHBOARD.value + else file_obj.contents + ) def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) -> str: @@ -59,7 +48,7 @@ def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) - data_sources = json_content_dict.get("dataSources") # Get the KQL Database items from the deployed items - database_items = fabric_workspace_obj.deployed_items.get("KQLDatabase", {}) + database_items = fabric_workspace_obj.deployed_items.get(ItemType.KQL_DATABASE.value, {}) for data_source in data_sources: if not data_source: @@ -88,3 +77,19 @@ def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) - data_source["clusterUri"] = kqldatabase_cluster_uri return json.dumps(json_content_dict, indent=2) + + +class KQLDashboardPublisher(ItemPublisher): + """Publisher for KQL Dashboard items.""" + + item_type = ItemType.KQL_DASHBOARD.value + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single KQL Dashboard item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, func_process_file=func_process_file + ) + + def pre_publish_all(self) -> None: + """Refresh deployed items to get KQL Database cluster URIs.""" + self.fabric_workspace_obj._refresh_deployed_items() diff --git a/src/fabric_cicd/_items/_kqldatabase.py b/src/fabric_cicd/_items/_kqldatabase.py index 942c9ec3..24ff7d17 100644 --- a/src/fabric_cicd/_items/_kqldatabase.py +++ b/src/fabric_cicd/_items/_kqldatabase.py @@ -3,21 +3,11 @@ """Functions to process and deploy KQL Database item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class KQLDatabasePublisher(ItemPublisher): + """Publisher for KQL Database items.""" - -def publish_kqldatabases(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all KQL Database items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "KQLDatabase" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.KQL_DATABASE.value diff --git a/src/fabric_cicd/_items/_kqlqueryset.py b/src/fabric_cicd/_items/_kqlqueryset.py index 29a011ff..767249ae 100644 --- a/src/fabric_cicd/_items/_kqlqueryset.py +++ b/src/fabric_cicd/_items/_kqlqueryset.py @@ -10,27 +10,12 @@ from fabric_cicd._common._exceptions import ParsingError from fabric_cicd._common._file import File from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_kqlquerysets(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all KQL Queryset items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "KQLQueryset" - - fabric_workspace_obj._refresh_deployed_items() - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item( - item_name=item_name, item_type=item_type, func_process_file=func_process_file - ) - - def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: File) -> str: """ Custom file processing for kql queryset items. @@ -40,7 +25,11 @@ def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: item_obj: The item object. file_obj: The file object. """ - return replace_cluster_uri(workspace_obj, file_obj) if item_obj.type == "KQLQueryset" else file_obj.contents + return ( + replace_cluster_uri(workspace_obj, file_obj) + if item_obj.type == ItemType.KQL_QUERYSET.value + else file_obj.contents + ) def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) -> str: @@ -62,7 +51,7 @@ def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) - return file_obj.contents # Get the KQL Database items from the deployed items - database_items = fabric_workspace_obj.deployed_items.get("KQLDatabase", {}) + database_items = fabric_workspace_obj.deployed_items.get(ItemType.KQL_DATABASE.value, {}) # If the cluster URI is empty, replace it with the cluster URI of the KQL database for data_source in data_sources: @@ -97,3 +86,19 @@ def replace_cluster_uri(fabric_workspace_obj: FabricWorkspace, file_obj: File) - logger.debug("Successfully updated all empty cluster URIs.") return json.dumps(json_content_dict, indent=2) + + +class KQLQuerysetPublisher(ItemPublisher): + """Publisher for KQL Queryset items.""" + + item_type = ItemType.KQL_QUERYSET.value + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single KQL Queryset item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, func_process_file=func_process_file + ) + + def pre_publish_all(self) -> None: + """Refresh deployed items to get KQL Database cluster URIs.""" + self.fabric_workspace_obj._refresh_deployed_items() diff --git a/src/fabric_cicd/_items/_lakehouse.py b/src/fabric_cicd/_items/_lakehouse.py index 40ae1fd3..772a2c0f 100644 --- a/src/fabric_cicd/_items/_lakehouse.py +++ b/src/fabric_cicd/_items/_lakehouse.py @@ -12,53 +12,12 @@ from fabric_cicd._common._exceptions import FailedPublishedItemStatusError from fabric_cicd._common._fabric_endpoint import handle_retry from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher, Publisher +from fabric_cicd.constants import FeatureFlag, ItemType logger = logging.getLogger(__name__) -def publish_lakehouses(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all lakehouse items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "Lakehouse" - - for item_name, item in fabric_workspace_obj.repository_items.get(item_type, {}).items(): - creation_payload = next( - ( - {"enableSchemas": True} - for file in item.item_files - if file.name == "lakehouse.metadata.json" and "defaultSchema" in file.contents - ), - None, - ) - - fabric_workspace_obj._publish_item( - item_name=item_name, - item_type=item_type, - creation_payload=creation_payload, - skip_publish_logging=True, - ) - - # Check if the item is published to avoid any post publish actions - if item.skip_publish: - continue - - check_sqlendpoint_provision_status(fabric_workspace_obj, item) - - logger.info(f"{constants.INDENT}Published") - - # Need all lakehouses published first to protect interrelationships - if "enable_shortcut_publish" in constants.FEATURE_FLAG: - for item_obj in fabric_workspace_obj.repository_items.get(item_type, {}).values(): - # Check if the item is published to avoid any post publish actions - if item_obj.skip_publish: - continue - process_shortcuts(fabric_workspace_obj, item_obj) - - def check_sqlendpoint_provision_status(fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> None: """ Check the SQL endpoint status of the published lakehouses @@ -98,100 +57,6 @@ def check_sqlendpoint_provision_status(fabric_workspace_obj: FabricWorkspace, it iteration += 1 -def process_shortcuts(fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> None: - """ - Publishes all shortcuts for a lakehouse item. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - item_obj: The item object to publish shortcuts for - """ - from fabric_cicd._common._check_utils import check_regex - - deployed_shortcuts = list_deployed_shortcuts(fabric_workspace_obj, item_obj) - - shortcut_file_obj = next((file for file in item_obj.item_files if file.name == "shortcuts.metadata.json"), None) - - if shortcut_file_obj: - shortcut_file_obj.contents = fabric_workspace_obj._replace_parameters(shortcut_file_obj, item_obj) - shortcut_file_obj.contents = fabric_workspace_obj._replace_logical_ids(shortcut_file_obj.contents) - shortcut_file_obj.contents = fabric_workspace_obj._replace_workspace_ids(shortcut_file_obj.contents) - - shortcuts = json.loads(shortcut_file_obj.contents) or [] - else: - logger.debug("No shortcuts.metadata.json found") - shortcuts = [] - - # Filter shortcuts based on exclude regex if provided - if fabric_workspace_obj.shortcut_exclude_regex: - regex_pattern = check_regex(fabric_workspace_obj.shortcut_exclude_regex) - original_count = len(shortcuts) - excluded_shortcuts = [s["name"] for s in shortcuts if "name" in s and regex_pattern.match(s["name"])] - shortcuts = [s for s in shortcuts if "name" in s and not regex_pattern.match(s["name"])] - excluded_count = original_count - len(shortcuts) - if excluded_count > 0: - logger.info( - f"{constants.INDENT}Excluded {excluded_count} shortcut(s) from {item_obj.name} deployment based on regex pattern" - ) - logger.info(f"{constants.INDENT}Excluded shortcuts: {excluded_shortcuts}") - - shortcuts_to_publish = {f"{shortcut['path']}/{shortcut['name']}": shortcut for shortcut in shortcuts} - - if shortcuts_to_publish: - logger.info(f"Publishing Lakehouse '{item_obj.name}' Shortcuts") - shortcut_paths_to_unpublish = [path for path in deployed_shortcuts if path not in shortcuts_to_publish] - unpublish_shortcuts(fabric_workspace_obj, item_obj, shortcut_paths_to_unpublish) - # Deploy and overwrite shortcuts - publish_shortcuts(fabric_workspace_obj, item_obj, shortcuts_to_publish) - - -def publish_shortcuts(fabric_workspace_obj: FabricWorkspace, item_obj: Item, shortcut_dict: dict) -> None: - """ - Publishes all shortcuts defined in the list. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - item_obj: The item object to publish shortcuts for - shortcut_dict: The dict of shortcuts to publish - """ - for shortcut in shortcut_dict.values(): - shortcut = replace_default_lakehouse_id(shortcut, item_obj) - # https://learn.microsoft.com/en-us/rest/api/fabric/core/onelake-shortcuts/create-shortcut - try: - fabric_workspace_obj.endpoint.invoke( - method="POST", - url=f"{fabric_workspace_obj.base_api_url}/items/{item_obj.guid}/shortcuts?shortcutConflictPolicy=CreateOrOverwrite", - body=shortcut, - ) - logger.info(f"{constants.INDENT}{shortcut['name']} Shortcut Published") - except Exception as e: - if "continue_on_shortcut_failure" in constants.FEATURE_FLAG: - logger.warning( - f"Failed to publish '{shortcut['name']}'. This usually happens when the lakehouse containing the source for this shortcut is published as a shell and has no data yet." - ) - logger.info("The publish process will continue with the other items.") - continue - msg = f"Failed to publish '{shortcut['name']}' for lakehouse {item_obj.name}" - raise FailedPublishedItemStatusError(msg, logger) from e - - -def unpublish_shortcuts(fabric_workspace_obj: FabricWorkspace, item_obj: Item, shortcut_paths: list) -> None: - """ - Unpublishes all shortcuts defined in the list. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - item_obj: The item object to publish shortcuts for - shortcut_paths: The list of shortcut paths to unpublish - """ - for deployed_shortcut_path in shortcut_paths: - # https://learn.microsoft.com/en-us/rest/api/fabric/core/onelake-shortcuts/delete-shortcut - fabric_workspace_obj.endpoint.invoke( - method="DELETE", - url=f"{fabric_workspace_obj.base_api_url}/items/{item_obj.guid}/shortcuts/{deployed_shortcut_path}", - ) - - def list_deployed_shortcuts(fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> list: """ Lists all deployed shortcut paths @@ -229,3 +94,148 @@ def replace_default_lakehouse_id(shortcut: dict, item_obj: Item) -> dict: shortcut["target"]["oneLake"]["itemId"] = item_obj.guid return shortcut + + +class LakehousePublisher(ItemPublisher): + """Publisher for Lakehouse items.""" + + item_type = ItemType.LAKEHOUSE.value + + def publish_one(self, item_name: str, item: Item) -> None: + """Publish a single Lakehouse item.""" + creation_payload = next( + ( + {"enableSchemas": True} + for file in item.item_files + if file.name == "lakehouse.metadata.json" and "defaultSchema" in file.contents + ), + None, + ) + + self.fabric_workspace_obj._publish_item( + item_name=item_name, + item_type=self.item_type, + creation_payload=creation_payload, + skip_publish_logging=True, + ) + + # Check if the item is published to avoid any post publish actions + if item.skip_publish: + return + + check_sqlendpoint_provision_status(self.fabric_workspace_obj, item) + + logger.info(f"{constants.INDENT}Published Lakehouse '{item_name}'") + + def post_publish_all(self) -> None: + """Publish shortcuts after all lakehouses are published to protect interrelationships.""" + if FeatureFlag.ENABLE_SHORTCUT_PUBLISH.value in constants.FEATURE_FLAG: + for item_obj in self.fabric_workspace_obj.repository_items.get(self.item_type, {}).values(): + # Check if the item is published to avoid any post publish actions + if not item_obj.skip_publish: + shortcut_publisher = ShortcutPublisher(self.fabric_workspace_obj, item_obj) + shortcut_publisher.publish_all() + + +class ShortcutPublisher(Publisher): + """Publisher for Lakehouse shortcuts.""" + + def __init__(self, fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> None: + """ + Initialize the shortcut publisher. + + Args: + fabric_workspace_obj: The FabricWorkspace object containing the items to be published. + item_obj: The lakehouse item object to publish shortcuts for. + """ + super().__init__(fabric_workspace_obj) + self.item_obj = item_obj + + def _unpublish_shortcuts(self, shortcut_paths: list) -> None: + """ + Unpublish shortcuts from the lakehouse. + + Args: + shortcut_paths: The list of shortcut paths to unpublish. + """ + for deployed_shortcut_path in shortcut_paths: + # https://learn.microsoft.com/en-us/rest/api/fabric/core/onelake-shortcuts/delete-shortcut + self.fabric_workspace_obj.endpoint.invoke( + method="DELETE", + url=f"{self.fabric_workspace_obj.base_api_url}/items/{self.item_obj.guid}/shortcuts/{deployed_shortcut_path}", + ) + + def publish_one(self, _shortcut_name: str, shortcut: dict) -> None: + """ + Publish a single shortcut. + + Args: + _shortcut_name: The name/path of the shortcut to publish. + shortcut: The shortcut definition to publish. + """ + shortcut = replace_default_lakehouse_id(shortcut, self.item_obj) + # https://learn.microsoft.com/en-us/rest/api/fabric/core/onelake-shortcuts/create-shortcut + try: + self.fabric_workspace_obj.endpoint.invoke( + method="POST", + url=f"{self.fabric_workspace_obj.base_api_url}/items/{self.item_obj.guid}/shortcuts?shortcutConflictPolicy=CreateOrOverwrite", + body=shortcut, + ) + logger.info(f"{constants.INDENT}Published Shortcut '{shortcut['name']}'") + except Exception as e: + if FeatureFlag.CONTINUE_ON_SHORTCUT_FAILURE.value in constants.FEATURE_FLAG: + logger.warning( + f"Failed to publish Shortcut '{shortcut['name']}'. This usually happens when the lakehouse containing the source for this shortcut is published as a shell and has no data yet." + ) + logger.info("The publish process will continue with the other items.") + return + msg = f"Failed to publish '{shortcut['name']}' for lakehouse {self.item_obj.name}" + raise FailedPublishedItemStatusError(msg, logger) from e + + def publish_all(self) -> None: + """ + Publish all shortcuts for the lakehouse item. + + Loads shortcuts from metadata, filters based on exclude regex, + unpublishes orphaned shortcuts, and publishes all remaining shortcuts. + """ + from fabric_cicd._common._check_utils import check_regex + + deployed_shortcuts = list_deployed_shortcuts(self.fabric_workspace_obj, self.item_obj) + + shortcut_file_obj = next( + (file for file in self.item_obj.item_files if file.name == "shortcuts.metadata.json"), None + ) + + if shortcut_file_obj: + shortcut_file_obj.contents = self.fabric_workspace_obj._replace_parameters(shortcut_file_obj, self.item_obj) + shortcut_file_obj.contents = self.fabric_workspace_obj._replace_logical_ids(shortcut_file_obj.contents) + shortcut_file_obj.contents = self.fabric_workspace_obj._replace_workspace_ids(shortcut_file_obj.contents) + + shortcuts = json.loads(shortcut_file_obj.contents) or [] + else: + logger.debug("No shortcuts.metadata.json found") + shortcuts = [] + + # Filter shortcuts based on exclude regex if provided + if self.fabric_workspace_obj.shortcut_exclude_regex: + regex_pattern = check_regex(self.fabric_workspace_obj.shortcut_exclude_regex) + original_count = len(shortcuts) + excluded_shortcuts = [s["name"] for s in shortcuts if "name" in s and regex_pattern.match(s["name"])] + shortcuts = [s for s in shortcuts if "name" in s and not regex_pattern.match(s["name"])] + excluded_count = original_count - len(shortcuts) + if excluded_count > 0: + logger.info( + f"{constants.INDENT}Excluded {excluded_count} shortcut(s) from {self.item_obj.name} deployment based on regex pattern" + ) + logger.info(f"{constants.INDENT}Excluded shortcuts: {excluded_shortcuts}") + + shortcuts_to_publish = {f"{shortcut['path']}/{shortcut['name']}": shortcut for shortcut in shortcuts} + + if shortcuts_to_publish: + logger.info(f"Publishing Lakehouse '{self.item_obj.name}' Shortcuts") + shortcut_paths_to_unpublish = [path for path in deployed_shortcuts if path not in shortcuts_to_publish] + self._unpublish_shortcuts(shortcut_paths_to_unpublish) + # Deploy and overwrite shortcuts + for shortcut_path, shortcut in shortcuts_to_publish.items(): + self.publish_one(shortcut_path, shortcut) diff --git a/src/fabric_cicd/_items/_mirroreddatabase.py b/src/fabric_cicd/_items/_mirroreddatabase.py index 51dbe6cb..1bd2444f 100644 --- a/src/fabric_cicd/_items/_mirroreddatabase.py +++ b/src/fabric_cicd/_items/_mirroreddatabase.py @@ -3,21 +3,11 @@ """Functions to process and deploy Mirrored Database item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class MirroredDatabasePublisher(ItemPublisher): + """Publisher for Mirrored Database items.""" - -def publish_mirroreddatabase(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all Mirrored Database items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "MirroredDatabase" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.MIRRORED_DATABASE.value diff --git a/src/fabric_cicd/_items/_mlexperiment.py b/src/fabric_cicd/_items/_mlexperiment.py index ad7b2e74..b0524cbf 100644 --- a/src/fabric_cicd/_items/_mlexperiment.py +++ b/src/fabric_cicd/_items/_mlexperiment.py @@ -3,22 +3,11 @@ """Functions to process and deploy ML Experiment item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class MLExperimentPublisher(ItemPublisher): + """Publisher for ML Experiment items.""" - -def publish_mlexperiments(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all experiment items from the repository. - Only Publishes the Shell item, as only type and name are supported for MLExperiment items through api. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "MLExperiment" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.ML_EXPERIMENT.value diff --git a/src/fabric_cicd/_items/_mounteddatafactory.py b/src/fabric_cicd/_items/_mounteddatafactory.py index 1cc5f0cb..f299abc6 100644 --- a/src/fabric_cicd/_items/_mounteddatafactory.py +++ b/src/fabric_cicd/_items/_mounteddatafactory.py @@ -3,21 +3,11 @@ """Functions to process and deploy Mounted Data Factory item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class MountedDataFactoryPublisher(ItemPublisher): + """Publisher for Mounted Data Factory items.""" - -def publish_mounteddatafactories(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all mounted data factory items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "MountedDataFactory" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.MOUNTED_DATA_FACTORY.value diff --git a/src/fabric_cicd/_items/_notebook.py b/src/fabric_cicd/_items/_notebook.py index 88ae3eba..aee55170 100644 --- a/src/fabric_cicd/_items/_notebook.py +++ b/src/fabric_cicd/_items/_notebook.py @@ -3,21 +3,11 @@ """Functions to process and deploy Notebook item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class NotebookPublisher(ItemPublisher): + """Publisher for Notebook items.""" - -def publish_notebooks(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all notebook items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "Notebook" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.NOTEBOOK.value diff --git a/src/fabric_cicd/_items/_report.py b/src/fabric_cicd/_items/_report.py index 3065f323..db3dbeb5 100644 --- a/src/fabric_cicd/_items/_report.py +++ b/src/fabric_cicd/_items/_report.py @@ -10,29 +10,12 @@ from fabric_cicd._common._exceptions import ItemDependencyError from fabric_cicd._common._file import File from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import EXCLUDE_PATH_REGEX_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_reports(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all report items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "Report" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - exclude_path = r".*\.pbi[/\\].*" - fabric_workspace_obj._publish_item( - item_name=item_name, - item_type=item_type, - exclude_path=exclude_path, - func_process_file=func_process_file, - ) - - def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: File) -> str: """ Custom file processing for report items. @@ -51,7 +34,7 @@ def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: ): model_rel_path = definition_body["datasetReference"]["byPath"]["path"] model_path = str((item_obj.path / model_rel_path).resolve()) - model_id = workspace_obj._convert_path_to_id("SemanticModel", model_path) + model_id = workspace_obj._convert_path_to_id(ItemType.SEMANTIC_MODEL.value, model_path) if not model_id: msg = "Semantic model not found in the repository. Cannot deploy a report with a relative path without deploying the model." @@ -74,3 +57,18 @@ def func_process_file(workspace_obj: FabricWorkspace, item_obj: Item, file_obj: return json.dumps(definition_body, indent=4) return file_obj.contents + + +class ReportPublisher(ItemPublisher): + """Publisher for Report items.""" + + item_type = ItemType.REPORT.value + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Report item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, + item_type=self.item_type, + exclude_path=EXCLUDE_PATH_REGEX_MAPPING.get(self.item_type), + func_process_file=func_process_file, + ) diff --git a/src/fabric_cicd/_items/_semanticmodel.py b/src/fabric_cicd/_items/_semanticmodel.py index 799b25d4..40b8db4f 100644 --- a/src/fabric_cicd/_items/_semanticmodel.py +++ b/src/fabric_cicd/_items/_semanticmodel.py @@ -6,49 +6,13 @@ import logging from fabric_cicd import FabricWorkspace, constants +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import EXCLUDE_PATH_REGEX_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_semanticmodels(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all semantic model items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "SemanticModel" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - exclude_path = r".*\.pbi[/\\].*" - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type, exclude_path=exclude_path) - - model_with_binding_dict = fabric_workspace_obj.environment_parameter.get("semantic_model_binding", []) - - if not model_with_binding_dict: - return - - # Build connection mapping from semantic_model_binding parameter - binding_mapping = {} - - for model in model_with_binding_dict: - model_name = model.get("semantic_model_name", []) - connection_id = model.get("connection_id") - - if isinstance(model_name, str): - model_name = [model_name] - - for name in model_name: - binding_mapping[name] = connection_id - - connections = get_connections(fabric_workspace_obj) - - if binding_mapping: - bind_semanticmodel_to_connection( - fabric_workspace_obj=fabric_workspace_obj, connections=connections, connection_details=binding_mapping - ) - - def get_connections(fabric_workspace_obj: FabricWorkspace) -> dict: """ Get all connections from the workspace. @@ -92,7 +56,7 @@ def bind_semanticmodel_to_connection( connections: Dictionary of connection objects with connection ID as key. connection_details: Dictionary mapping dataset names to connection IDs from parameter.yml. """ - item_type = "SemanticModel" + item_type = ItemType.SEMANTIC_MODEL.value # Loop through each semantic model in the semantic_model_binding section for dataset_name, connection_id in connection_details.items(): @@ -176,3 +140,44 @@ def build_request_body(body: dict) -> dict: }, } } + + +class SemanticModelPublisher(ItemPublisher): + """Publisher for Semantic Model items.""" + + item_type = ItemType.SEMANTIC_MODEL.value + + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Semantic Model item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, exclude_path=EXCLUDE_PATH_REGEX_MAPPING.get(self.item_type) + ) + + def post_publish_all(self) -> None: + """Bind semantic models to connections after all models are published.""" + model_with_binding_dict = self.fabric_workspace_obj.environment_parameter.get("semantic_model_binding", []) + + if not model_with_binding_dict: + return + + # Build connection mapping from semantic_model_binding parameter + binding_mapping = {} + + for model in model_with_binding_dict: + model_name = model.get("semantic_model_name", []) + connection_id = model.get("connection_id") + + if isinstance(model_name, str): + model_name = [model_name] + + for name in model_name: + binding_mapping[name] = connection_id + + connections = get_connections(self.fabric_workspace_obj) + + if binding_mapping: + bind_semanticmodel_to_connection( + fabric_workspace_obj=self.fabric_workspace_obj, + connections=connections, + connection_details=binding_mapping, + ) diff --git a/src/fabric_cicd/_items/_sparkjobdefinition.py b/src/fabric_cicd/_items/_sparkjobdefinition.py index c915d457..bc418cd7 100644 --- a/src/fabric_cicd/_items/_sparkjobdefinition.py +++ b/src/fabric_cicd/_items/_sparkjobdefinition.py @@ -5,19 +5,20 @@ import logging -from fabric_cicd import FabricWorkspace +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import API_FORMAT_MAPPING, ItemType logger = logging.getLogger(__name__) -def publish_sparkjobdefinitions(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all sparkjobdefinition items from the repository. +class SparkJobDefinitionPublisher(ItemPublisher): + """Publisher for Spark Job Definition items.""" - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "SparkJobDefinition" + item_type = ItemType.SPARK_JOB_DEFINITION.value - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type, api_format="SparkJobDefinitionV2") + def publish_one(self, item_name: str, _item: Item) -> None: + """Publish a single Spark Job Definition item.""" + self.fabric_workspace_obj._publish_item( + item_name=item_name, item_type=self.item_type, api_format=API_FORMAT_MAPPING.get(self.item_type) + ) diff --git a/src/fabric_cicd/_items/_sqldatabase.py b/src/fabric_cicd/_items/_sqldatabase.py index 98ef94c6..58ab911d 100644 --- a/src/fabric_cicd/_items/_sqldatabase.py +++ b/src/fabric_cicd/_items/_sqldatabase.py @@ -5,29 +5,29 @@ import logging -from fabric_cicd import FabricWorkspace, constants +from fabric_cicd import constants +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_sqldatabases(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all SQL Database items from the repository. +class SQLDatabasePublisher(ItemPublisher): + """Publisher for SQL Database items.""" - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "SQLDatabase" + item_type = ItemType.SQL_DATABASE.value - for item_name, item in fabric_workspace_obj.repository_items.get(item_type, {}).items(): - fabric_workspace_obj._publish_item( + def publish_one(self, item_name: str, item: Item) -> None: + """Publish a single SQL Database item.""" + self.fabric_workspace_obj._publish_item( item_name=item_name, - item_type=item_type, + item_type=self.item_type, skip_publish_logging=True, ) # Check if the item is published to avoid any post publish actions if item.skip_publish: - continue + return - logger.info(f"{constants.INDENT}Published") + logger.info(f"{constants.INDENT}Published SQLDatabase '{item_name}'") diff --git a/src/fabric_cicd/_items/_userdatafunction.py b/src/fabric_cicd/_items/_userdatafunction.py index 0d9ee364..207a1e0d 100644 --- a/src/fabric_cicd/_items/_userdatafunction.py +++ b/src/fabric_cicd/_items/_userdatafunction.py @@ -3,21 +3,11 @@ """Functions to process and deploy User Data Function item.""" -import logging +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType -from fabric_cicd import FabricWorkspace -logger = logging.getLogger(__name__) +class UserDataFunctionPublisher(ItemPublisher): + """Publisher for User Data Function items.""" - -def publish_userdatafunctions(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all user data function items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "UserDataFunction" - - for item_name in fabric_workspace_obj.repository_items.get(item_type, {}): - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) + item_type = ItemType.USER_DATA_FUNCTION.value diff --git a/src/fabric_cicd/_items/_variablelibrary.py b/src/fabric_cicd/_items/_variablelibrary.py index c281be09..0c0bd943 100644 --- a/src/fabric_cicd/_items/_variablelibrary.py +++ b/src/fabric_cicd/_items/_variablelibrary.py @@ -8,28 +8,12 @@ from fabric_cicd import FabricWorkspace, constants from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_variablelibraries(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all variable library items from the repository. - - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published. - """ - item_type = "VariableLibrary" - - var_libraries = fabric_workspace_obj.repository_items.get(item_type, {}) - - for item_name in var_libraries: - fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type) - if var_libraries[item_name].skip_publish: - continue - activate_value_set(fabric_workspace_obj, var_libraries[item_name]) - - def activate_value_set(fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> None: """ Activates the value set for the given Variable Library item. @@ -60,3 +44,15 @@ def activate_value_set(fabric_workspace_obj: FabricWorkspace, item_obj: Item) -> else: logger.warning(f"settings.json file not found for item {item_obj.name}. Active value set not changed.") + + +class VariableLibraryPublisher(ItemPublisher): + """Publisher for Variable Library items.""" + + item_type = ItemType.VARIABLE_LIBRARY.value + + def publish_one(self, item_name: str, item: Item) -> None: + """Publish a single Variable Library item.""" + self.fabric_workspace_obj._publish_item(item_name=item_name, item_type=self.item_type) + if not item.skip_publish: + activate_value_set(self.fabric_workspace_obj, item) diff --git a/src/fabric_cicd/_items/_warehouse.py b/src/fabric_cicd/_items/_warehouse.py index 47d3e4d3..0cf80a08 100644 --- a/src/fabric_cicd/_items/_warehouse.py +++ b/src/fabric_cicd/_items/_warehouse.py @@ -6,21 +6,21 @@ import json import logging -from fabric_cicd import FabricWorkspace, constants +from fabric_cicd import constants +from fabric_cicd._common._item import Item +from fabric_cicd._items._base_publisher import ItemPublisher +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) -def publish_warehouses(fabric_workspace_obj: FabricWorkspace) -> None: - """ - Publishes all warehouse items from the repository. +class WarehousePublisher(ItemPublisher): + """Publisher for Warehouse items.""" - Args: - fabric_workspace_obj: The FabricWorkspace object containing the items to be published - """ - item_type = "Warehouse" + item_type = ItemType.WAREHOUSE.value - for item_name, item in fabric_workspace_obj.repository_items.get(item_type, {}).items(): + def publish_one(self, item_name: str, item: Item) -> None: + """Publish a single Warehouse item.""" creation_payload = next( ( json.loads(file.contents)["metadata"]["creationPayload"] @@ -30,15 +30,15 @@ def publish_warehouses(fabric_workspace_obj: FabricWorkspace) -> None: None, ) - fabric_workspace_obj._publish_item( + self.fabric_workspace_obj._publish_item( item_name=item_name, - item_type=item_type, + item_type=self.item_type, creation_payload=creation_payload, skip_publish_logging=True, ) # Check if the item is published to avoid any post publish actions if item.skip_publish: - continue + return - logger.info(f"{constants.INDENT}Published") + logger.info(f"{constants.INDENT}Published Warehouse '{item_name}'") diff --git a/src/fabric_cicd/_parameter/_utils.py b/src/fabric_cicd/_parameter/_utils.py index 41547d7a..4db5a487 100644 --- a/src/fabric_cicd/_parameter/_utils.py +++ b/src/fabric_cicd/_parameter/_utils.py @@ -21,6 +21,7 @@ import fabric_cicd.constants as constants from fabric_cicd import FabricWorkspace from fabric_cicd._common._exceptions import InputError, ParsingError +from fabric_cicd.constants import ItemType logger = logging.getLogger(__name__) @@ -320,7 +321,7 @@ def _extract_item_attribute(workspace_obj: FabricWorkspace, variable: str, get_d if get_dataflow_name: if ( item_type in workspace_obj.repository_items - and item_type == "Dataflow" + and item_type == ItemType.DATAFLOW.value and item_name in workspace_obj.repository_items[item_type] and attribute == "id" ): diff --git a/src/fabric_cicd/constants.py b/src/fabric_cicd/constants.py index f5681208..9b4e15de 100644 --- a/src/fabric_cicd/constants.py +++ b/src/fabric_cicd/constants.py @@ -35,34 +35,124 @@ class EnvVar(str, Enum): """Override max duration for item name conflict retries. Defaults to 300 seconds.""" +class ItemType(str, Enum): + """Enumeration of supported Microsoft Fabric item types.""" + + APACHE_AIRFLOW_JOB = "ApacheAirflowJob" + COPY_JOB = "CopyJob" + DATA_AGENT = "DataAgent" + DATA_PIPELINE = "DataPipeline" + DATAFLOW = "Dataflow" + ENVIRONMENT = "Environment" + EVENTHOUSE = "Eventhouse" + EVENTSTREAM = "Eventstream" + GRAPHQL_API = "GraphQLApi" + KQL_DASHBOARD = "KQLDashboard" + KQL_DATABASE = "KQLDatabase" + KQL_QUERYSET = "KQLQueryset" + LAKEHOUSE = "Lakehouse" + MIRRORED_DATABASE = "MirroredDatabase" + ML_EXPERIMENT = "MLExperiment" + MOUNTED_DATA_FACTORY = "MountedDataFactory" + NOTEBOOK = "Notebook" + REFLEX = "Reflex" + REPORT = "Report" + SEMANTIC_MODEL = "SemanticModel" + SPARK_JOB_DEFINITION = "SparkJobDefinition" + SQL_DATABASE = "SQLDatabase" + USER_DATA_FUNCTION = "UserDataFunction" + VARIABLE_LIBRARY = "VariableLibrary" + WAREHOUSE = "Warehouse" + + +# Serial execution order for publishing items determines dependency order. +# Unpublish order is the reverse of this. +SERIAL_ITEM_PUBLISH_ORDER: dict[int, ItemType] = { + 1: ItemType.VARIABLE_LIBRARY, + 2: ItemType.WAREHOUSE, + 3: ItemType.MIRRORED_DATABASE, + 4: ItemType.LAKEHOUSE, + 5: ItemType.SQL_DATABASE, + 6: ItemType.ENVIRONMENT, + 7: ItemType.USER_DATA_FUNCTION, + 8: ItemType.EVENTHOUSE, + 9: ItemType.SPARK_JOB_DEFINITION, + 10: ItemType.NOTEBOOK, + 11: ItemType.SEMANTIC_MODEL, + 12: ItemType.REPORT, + 13: ItemType.COPY_JOB, + 14: ItemType.KQL_DATABASE, + 15: ItemType.KQL_QUERYSET, + 16: ItemType.REFLEX, + 17: ItemType.EVENTSTREAM, + 18: ItemType.KQL_DASHBOARD, + 19: ItemType.DATAFLOW, + 20: ItemType.DATA_PIPELINE, + 21: ItemType.GRAPHQL_API, + 22: ItemType.APACHE_AIRFLOW_JOB, + 23: ItemType.MOUNTED_DATA_FACTORY, + 24: ItemType.DATA_AGENT, + 25: ItemType.ML_EXPERIMENT, +} + + +class FeatureFlag(str, Enum): + """Enumeration of supported feature flags for fabric-cicd.""" + + ENABLE_LAKEHOUSE_UNPUBLISH = "enable_lakehouse_unpublish" + """Set to enable the deletion of Lakehouses.""" + ENABLE_WAREHOUSE_UNPUBLISH = "enable_warehouse_unpublish" + """Set to enable the deletion of Warehouses.""" + ENABLE_SQLDATABASE_UNPUBLISH = "enable_sqldatabase_unpublish" + """Set to enable the deletion of SQL Databases.""" + ENABLE_EVENTHOUSE_UNPUBLISH = "enable_eventhouse_unpublish" + """Set to enable the deletion of Eventhouses.""" + ENABLE_KQLDATABASE_UNPUBLISH = "enable_kqldatabase_unpublish" + """Set to enable the deletion of KQL Databases (attached to Eventhouses).""" + ENABLE_SHORTCUT_PUBLISH = "enable_shortcut_publish" + """Set to enable deploying shortcuts with the lakehouse.""" + DISABLE_WORKSPACE_FOLDER_PUBLISH = "disable_workspace_folder_publish" + """Set to disable deploying workspace sub folders.""" + CONTINUE_ON_SHORTCUT_FAILURE = "continue_on_shortcut_failure" + """Set to allow deployment to continue even when shortcuts fail to publish.""" + ENABLE_ENVIRONMENT_VARIABLE_REPLACEMENT = "enable_environment_variable_replacement" + """Set to enable the use of pipeline variables.""" + ENABLE_EXPERIMENTAL_FEATURES = "enable_experimental_features" + """Set to enable experimental features, such as selective deployments.""" + ENABLE_ITEMS_TO_INCLUDE = "enable_items_to_include" + """Set to enable selective publishing/unpublishing of items.""" + ENABLE_EXCLUDE_FOLDER = "enable_exclude_folder" + """Set to enable folder-based exclusion during publish operations.""" + ENABLE_SHORTCUT_EXCLUDE = "enable_shortcut_exclude" + """Set to enable selective publishing of shortcuts in a Lakehouse.""" + ENABLE_CONFIG_DEPLOY = "enable_config_deploy" + """Set to enable config file-based deployment.""" + ENABLE_RESPONSE_COLLECTION = "enable_response_collection" + """Set to enable collection of API responses during publish operations.""" + DISABLE_PRINT_IDENTITY = "disable_print_identity" + """Set to disable printing the executing identity name.""" + + +class OperationType(str, Enum): + """Enumeration of operation types for publish/unpublish workflows.""" + + PUBLISH = "deployment" + """Publishing items to the workspace.""" + UNPUBLISH = "unpublish" + """Unpublishing/removing items from the workspace.""" + + +# The following resources can be unpublished only if their feature flags are set +UNPUBLISH_FLAG_MAPPING = { + ItemType.LAKEHOUSE.value: FeatureFlag.ENABLE_LAKEHOUSE_UNPUBLISH.value, + ItemType.SQL_DATABASE.value: FeatureFlag.ENABLE_SQLDATABASE_UNPUBLISH.value, + ItemType.WAREHOUSE.value: FeatureFlag.ENABLE_WAREHOUSE_UNPUBLISH.value, + ItemType.EVENTHOUSE.value: FeatureFlag.ENABLE_EVENTHOUSE_UNPUBLISH.value, + ItemType.KQL_DATABASE.value: FeatureFlag.ENABLE_KQLDATABASE_UNPUBLISH.value, +} + # Item Type -ACCEPTED_ITEM_TYPES = ( - "DataPipeline", - "Environment", - "Notebook", - "Report", - "SemanticModel", - "Lakehouse", - "MirroredDatabase", - "VariableLibrary", - "CopyJob", - "Eventhouse", - "KQLDatabase", - "KQLQueryset", - "Reflex", - "Eventstream", - "Warehouse", - "SQLDatabase", - "KQLDashboard", - "Dataflow", - "GraphQLApi", - "ApacheAirflowJob", - "MountedDataFactory", - "DataAgent", - "UserDataFunction", - "MLExperiment", - "SparkJobDefinition", -) +ACCEPTED_ITEM_TYPES = tuple(item_type.value for item_type in ItemType) # API URLs DEFAULT_API_ROOT_URL = os.environ.get(EnvVar.DEFAULT_API_ROOT_URL.value, "https://api.powerbi.com") @@ -77,10 +167,29 @@ class EnvVar(str, Enum): AUTHORIZATION_HEADER = "authorization" # Publish -SHELL_ONLY_PUBLISH = ["Lakehouse", "Warehouse", "SQLDatabase", "MLExperiment"] +SHELL_ONLY_PUBLISH = [ + ItemType.LAKEHOUSE.value, + ItemType.WAREHOUSE.value, + ItemType.SQL_DATABASE.value, + ItemType.ML_EXPERIMENT.value, +] # Items that do not require assigned capacity -NO_ASSIGNED_CAPACITY_REQUIRED = ["SemanticModel", "Report"] +NO_ASSIGNED_CAPACITY_REQUIRED = [ItemType.SEMANTIC_MODEL.value, ItemType.REPORT.value] + +# Exclude Path Regex Patterns for filtering files during publish +EXCLUDE_PATH_REGEX_MAPPING = { + ItemType.DATA_AGENT.value: r".*\.pbi[/\\].*", + ItemType.REPORT.value: r".*\.pbi[/\\].*", + ItemType.SEMANTIC_MODEL.value: r".*\.pbi[/\\].*", + ItemType.EVENTHOUSE.value: r".*\.children[/\\].*", + ItemType.ENVIRONMENT.value: r"\Setting", +} + +# API Format Mapping for item types that require specific API formats +API_FORMAT_MAPPING = { + ItemType.SPARK_JOB_DEFINITION.value: "SparkJobDefinitionV2", +} # REGEX Constants VALID_GUID_REGEX = r"^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$" @@ -91,22 +200,25 @@ class EnvVar(str, Enum): INVALID_FOLDER_CHAR_REGEX = r'[~"#.%&*:<>?/\\{|}]' KQL_DATABASE_FOLDER_PATH_REGEX = r"(?i)^(.*)/[^/]+\.Eventhouse/\.children(?:/.*)?$" +# Well known file names +DATA_PIPELINE_CONTENT_FILE_JSON = "pipeline-content.json" + # Item Type to File Mapping (to check for item dependencies) -ITEM_TYPE_TO_FILE = {"DataPipeline": "pipeline-content.json"} +ITEM_TYPE_TO_FILE = {ItemType.DATA_PIPELINE.value: DATA_PIPELINE_CONTENT_FILE_JSON} # Property path to get SQL Endpoint or Eventhouse URI PROPERTY_PATH_ATTR_MAPPING = { - "Lakehouse": { + ItemType.LAKEHOUSE.value: { "sqlendpoint": "body/properties/sqlEndpointProperties/connectionString", "sqlendpointid": "body/properties/sqlEndpointProperties/id", }, - "Warehouse": { + ItemType.WAREHOUSE.value: { "sqlendpoint": "body/properties/connectionString", }, - "SQLDatabase": { + ItemType.SQL_DATABASE.value: { "sqlendpoint": "body/properties/serverFqdn", }, - "Eventhouse": { + ItemType.EVENTHOUSE.value: { "queryserviceuri": "body/properties/queryServiceUri", }, } diff --git a/src/fabric_cicd/fabric_workspace.py b/src/fabric_cicd/fabric_workspace.py index 0ed7ef74..600d5031 100644 --- a/src/fabric_cicd/fabric_workspace.py +++ b/src/fabric_cicd/fabric_workspace.py @@ -20,6 +20,7 @@ from fabric_cicd._common._fabric_endpoint import FabricEndpoint, _generate_fabric_credential, _is_fabric_runtime from fabric_cicd._common._item import Item from fabric_cicd._common._logging import print_header +from fabric_cicd.constants import FeatureFlag, ItemType logger = logging.getLogger(__name__) @@ -314,14 +315,14 @@ def _refresh_repository_items(self) -> None: # .Eventhouse/.children/ directory structure, requires extracting the # parent folder path before the Eventhouse container, not just # the immediate parent directory - if item_type == "KQLDatabase": + if item_type == ItemType.KQL_DATABASE.value: pattern = re.compile(constants.KQL_DATABASE_FOLDER_PATH_REGEX) match = pattern.match(relative_path) relative_parent_path = match.group(1) if match else None else: relative_parent_path = "/".join(relative_path.split("/")[:-1]) - if "disable_workspace_folder_publish" not in constants.FEATURE_FLAG: + if FeatureFlag.DISABLE_WORKSPACE_FOLDER_PUBLISH.value not in constants.FEATURE_FLAG: item_folder_id = self.repository_folders.get(relative_parent_path, "") else: item_folder_id = "" @@ -381,14 +382,14 @@ def _refresh_deployed_items(self) -> None: self.workspace_items[item_type] = {} # Get additional properties - if item_type in ["Lakehouse", "Warehouse", "SQLDatabase"]: + if item_type in [ItemType.LAKEHOUSE.value, ItemType.WAREHOUSE.value, ItemType.SQL_DATABASE.value]: sql_endpoint = self._get_item_attribute( self.workspace_id, item_type, item_guid, item_name, "sqlendpoint" ) sql_endpoint_id = self._get_item_attribute( self.workspace_id, item_type, item_guid, item_name, "sqlendpointid" ) - if item_type in ["Eventhouse"]: + if item_type in [ItemType.EVENTHOUSE.value]: query_service_uri = self._get_item_attribute( self.workspace_id, item_type, item_guid, item_name, "queryserviceuri" ) @@ -684,7 +685,7 @@ def _publish_item( ) api_response = metadata_update_response - if "disable_workspace_folder_publish" not in constants.FEATURE_FLAG: + if FeatureFlag.DISABLE_WORKSPACE_FOLDER_PUBLISH.value not in constants.FEATURE_FLAG: deployed_item = self.deployed_items.get(item_type, {}).get(item_name) if is_deployed else None # Check if the folder has changed if deployed_item is not None and deployed_item.folder_id != item.folder_id: @@ -716,7 +717,7 @@ def _publish_item( # skip_publish_logging provided in kwargs to suppress logging if further processing is to be done if not kwargs.get("skip_publish_logging", False): - logger.info(f"{constants.INDENT}Published") + logger.info(f"{constants.INDENT}Published {item_type} '{item_name}'") return def _unpublish_item(self, item_name: str, item_type: str) -> None: @@ -735,9 +736,9 @@ def _unpublish_item(self, item_name: str, item_type: str) -> None: # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/delete-item try: self.endpoint.invoke(method="DELETE", url=f"{self.base_api_url}/items/{item_guid}") - logger.info(f"{constants.INDENT}Unpublished") + logger.info(f"{constants.INDENT}Unpublished {item_type} '{item_name}'") except Exception as e: - logger.warning(f"Failed to unpublish {item_type} '{item_name}'. Raw exception: {e}") + logger.warning(f"Failed to unpublish {item_type} '{item_name}'. Raw exception: {e}") def _refresh_deployed_folders(self) -> None: """ diff --git a/src/fabric_cicd/publish.py b/src/fabric_cicd/publish.py index 25b8dff5..1884dd02 100644 --- a/src/fabric_cicd/publish.py +++ b/src/fabric_cicd/publish.py @@ -11,7 +11,6 @@ import fabric_cicd._items as items from fabric_cicd import constants -from fabric_cicd._common._check_utils import check_regex from fabric_cicd._common._config_utils import ( apply_config_overrides, extract_publish_settings, @@ -24,7 +23,11 @@ from fabric_cicd._common._validate_input import ( validate_environment, validate_fabric_workspace_obj, + validate_folder_path_exclude_regex, + validate_items_to_include, + validate_shortcut_exclude_regex, ) +from fabric_cicd.constants import FeatureFlag, ItemType from fabric_cicd.fabric_workspace import FabricWorkspace logger = logging.getLogger(__name__) @@ -139,10 +142,10 @@ def publish_all_items( fabric_workspace_obj = validate_fabric_workspace_obj(fabric_workspace_obj) # Initialize response collection if feature flag is enabled - if "enable_response_collection" in constants.FEATURE_FLAG: + if FeatureFlag.ENABLE_RESPONSE_COLLECTION.value in constants.FEATURE_FLAG: fabric_workspace_obj.responses = {} - # check if workspace has assigned capacity, if not, exit + # Check if workspace has assigned capacity, if not, exit has_assigned_capacity = None response_state = fabric_workspace_obj.endpoint.invoke( @@ -157,7 +160,7 @@ def publish_all_items( msg = f"Workspace {fabric_workspace_obj.workspace_id} does not have an assigned capacity. Please assign a capacity before publishing items." raise FailedPublishedItemStatusError(msg, logger) - if "disable_workspace_folder_publish" not in constants.FEATURE_FLAG: + if FeatureFlag.DISABLE_WORKSPACE_FOLDER_PUBLISH.value not in constants.FEATURE_FLAG: fabric_workspace_obj._refresh_deployed_folders() fabric_workspace_obj._refresh_repository_folders() fabric_workspace_obj._publish_folders() @@ -172,135 +175,36 @@ def publish_all_items( fabric_workspace_obj.publish_item_name_exclude_regex = item_name_exclude_regex if folder_path_exclude_regex: - if ( - "enable_experimental_features" not in constants.FEATURE_FLAG - or "enable_exclude_folder" not in constants.FEATURE_FLAG - ): - msg = "Feature flags 'enable_experimental_features' and 'enable_exclude_folder' must be set." - raise InputError(msg, logger) - logger.warning("Folder path exclusion is enabled.") - logger.warning( - "Using folder_path_exclude_regex is risky as it can prevent needed dependencies from being deployed. Use at your own risk." - ) + validate_folder_path_exclude_regex(folder_path_exclude_regex) fabric_workspace_obj.publish_folder_path_exclude_regex = folder_path_exclude_regex if items_to_include: - if ( - "enable_experimental_features" not in constants.FEATURE_FLAG - or "enable_items_to_include" not in constants.FEATURE_FLAG - ): - msg = "Feature flags 'enable_experimental_features' and 'enable_items_to_include' must be set." - raise InputError(msg, logger) - logger.warning("Selective deployment is enabled.") - logger.warning( - "Using items_to_include is risky as it can prevent needed dependencies from being deployed. Use at your own risk." - ) + validate_items_to_include(items_to_include, operation=constants.OperationType.PUBLISH) fabric_workspace_obj.items_to_include = items_to_include if shortcut_exclude_regex: - if ( - "enable_experimental_features" not in constants.FEATURE_FLAG - or "enable_shortcut_exclude" not in constants.FEATURE_FLAG - ): - msg = "Feature flags 'enable_experimental_features' and 'enable_shortcut_exclude' must be set." - raise InputError(msg, logger) - logger.warning("Shortcut exclusion is enabled.") - logger.warning( - "Using shortcut_exclude_regex will selectively exclude shortcuts from being deployed to lakehouses. Use with caution." - ) + validate_shortcut_exclude_regex(shortcut_exclude_regex) fabric_workspace_obj.shortcut_exclude_regex = shortcut_exclude_regex - def _should_publish_item_type(item_type: str) -> bool: - """Check if an item type should be published based on scope and repository content.""" - return ( - item_type in fabric_workspace_obj.item_type_in_scope and item_type in fabric_workspace_obj.repository_items - ) - - if _should_publish_item_type("VariableLibrary"): - print_header("Publishing Variable Libraries") - items.publish_variablelibraries(fabric_workspace_obj) - if _should_publish_item_type("Warehouse"): - print_header("Publishing Warehouses") - items.publish_warehouses(fabric_workspace_obj) - if _should_publish_item_type("MirroredDatabase"): - print_header("Publishing Mirrored Databases") - items.publish_mirroreddatabase(fabric_workspace_obj) - if _should_publish_item_type("Lakehouse"): - print_header("Publishing Lakehouses") - items.publish_lakehouses(fabric_workspace_obj) - if _should_publish_item_type("SQLDatabase"): - print_header("Publishing SQL Databases") - items.publish_sqldatabases(fabric_workspace_obj) - if _should_publish_item_type("Environment"): - print_header("Publishing Environments") - items.publish_environments(fabric_workspace_obj) - if _should_publish_item_type("UserDataFunction"): - print_header("Publishing User Data Functions") - items.publish_userdatafunctions(fabric_workspace_obj) - if _should_publish_item_type("Eventhouse"): - print_header("Publishing Eventhouses") - items.publish_eventhouses(fabric_workspace_obj) - if _should_publish_item_type("SparkJobDefinition"): - print_header("Publishing Spark Job Definitions") - items.publish_sparkjobdefinitions(fabric_workspace_obj) - if _should_publish_item_type("Notebook"): - print_header("Publishing Notebooks") - items.publish_notebooks(fabric_workspace_obj) - if _should_publish_item_type("SemanticModel"): - print_header("Publishing Semantic Models") - items.publish_semanticmodels(fabric_workspace_obj) - if _should_publish_item_type("Report"): - print_header("Publishing Reports") - items.publish_reports(fabric_workspace_obj) - if _should_publish_item_type("CopyJob"): - print_header("Publishing Copy Jobs") - items.publish_copyjobs(fabric_workspace_obj) - if _should_publish_item_type("KQLDatabase"): - print_header("Publishing KQL Databases") - items.publish_kqldatabases(fabric_workspace_obj) - if _should_publish_item_type("KQLQueryset"): - print_header("Publishing KQL Querysets") - items.publish_kqlquerysets(fabric_workspace_obj) - if _should_publish_item_type("Reflex"): - print_header("Publishing Activators") - items.publish_activators(fabric_workspace_obj) - if _should_publish_item_type("Eventstream"): - print_header("Publishing Eventstreams") - items.publish_eventstreams(fabric_workspace_obj) - if _should_publish_item_type("KQLDashboard"): - print_header("Publishing KQL Dashboards") - items.publish_kqldashboard(fabric_workspace_obj) - if _should_publish_item_type("Dataflow"): - print_header("Publishing Dataflows") - items.publish_dataflows(fabric_workspace_obj) - if _should_publish_item_type("DataPipeline"): - print_header("Publishing Data Pipelines") - items.publish_datapipelines(fabric_workspace_obj) - if _should_publish_item_type("GraphQLApi"): - print_header("Publishing GraphQL APIs") - items.publish_graphqlapis(fabric_workspace_obj) - if _should_publish_item_type("ApacheAirflowJob"): - print_header("Publishing Apache Airflow Jobs") - items.publish_apacheairflowjobs(fabric_workspace_obj) - if _should_publish_item_type("MountedDataFactory"): - print_header("Publishing Mounted Data Factories") - items.publish_mounteddatafactories(fabric_workspace_obj) - if _should_publish_item_type("DataAgent"): - print_header("Publishing Data Agents") - items.publish_dataagents(fabric_workspace_obj) - if _should_publish_item_type("MLExperiment"): - print_header("Publishing ML Experiments") - items.publish_mlexperiments(fabric_workspace_obj) - - # Check Environment Publish - if _should_publish_item_type("Environment"): - print_header("Checking Environment Publish State") - items.check_environment_publish_state(fabric_workspace_obj) + # Publish items in the defined order synchronously + total_item_types = len(constants.SERIAL_ITEM_PUBLISH_ORDER) + publishers_with_async_check: list[items.ItemPublisher] = [] + for order_num, item_type in items.ItemPublisher.get_item_types_to_publish(fabric_workspace_obj): + print_header(f"Publishing Item {order_num}/{total_item_types}: {item_type.value}") + publisher = items.ItemPublisher.create(item_type, fabric_workspace_obj) + publisher.publish_all() + if publisher.has_async_publish_check: + publishers_with_async_check.append(publisher) + + # Check asynchronous publish status for relevant item types + for publisher in publishers_with_async_check: + print_header(f"Checking {publisher.item_type} Publish State") + publisher.post_publish_all_check() # Return response data if feature flag is enabled and responses were collected return ( fabric_workspace_obj.responses - if "enable_response_collection" in constants.FEATURE_FLAG and fabric_workspace_obj.responses + if FeatureFlag.ENABLE_RESPONSE_COLLECTION.value in constants.FEATURE_FLAG and fabric_workspace_obj.responses else None ) @@ -358,102 +262,34 @@ def unpublish_all_orphan_items( >>> unpublish_orphaned_items(workspace, items_to_include=items_to_include) """ fabric_workspace_obj = validate_fabric_workspace_obj(fabric_workspace_obj) - - is_items_to_include_list = False - regex_pattern = check_regex(item_name_exclude_regex) + validate_items_to_include(items_to_include, operation=constants.OperationType.UNPUBLISH) fabric_workspace_obj._refresh_deployed_items() fabric_workspace_obj._refresh_repository_items() print_header("Unpublishing Orphaned Items") - if items_to_include: - if ( - "enable_experimental_features" not in constants.FEATURE_FLAG - or "enable_items_to_include" not in constants.FEATURE_FLAG - ): - msg = "Feature flags 'enable_experimental_features' and 'enable_items_to_include' must be set." - raise InputError(msg, logger) - logger.warning("Selective unpublish is enabled.") - logger.warning( - "Using items_to_include is risky as it can prevent needed dependencies from being unpublished. Use at your own risk." + # Build unpublish order based on reversed publish order, scope, and feature flags + for item_type in items.ItemPublisher.get_item_types_to_unpublish(fabric_workspace_obj): + to_delete_list = items.ItemPublisher.get_orphaned_items( + fabric_workspace_obj, + item_type, + item_name_exclude_regex=item_name_exclude_regex if not items_to_include else None, + items_to_include=items_to_include, ) - is_items_to_include_list = True - - # Lakehouses, SQL Databases, and Warehouses can only be unpublished if their feature flags are set - unpublish_flag_mapping = { - "Lakehouse": "enable_lakehouse_unpublish", - "SQLDatabase": "enable_sqldatabase_unpublish", - "Warehouse": "enable_warehouse_unpublish", - "Eventhouse": "enable_eventhouse_unpublish", - "KQLDatabase": "enable_kqldatabase_unpublish", - } - - # Define order to unpublish items - unpublish_order = [] - for item_type in [ - "MLExperiment", - "DataAgent", - "MountedDataFactory", - "ApacheAirflowJob", - "GraphQLApi", - "DataPipeline", - "Dataflow", - "KQLDashboard", - "Eventstream", - "Reflex", - "KQLQueryset", - "KQLDatabase", - "CopyJob", - "Report", - "SemanticModel", - "Notebook", - "SparkJobDefinition", - "Eventhouse", - "UserDataFunction", - "Environment", - "SQLDatabase", - "Lakehouse", - "MirroredDatabase", - "Warehouse", - "VariableLibrary", - ]: - if item_type in fabric_workspace_obj.item_type_in_scope and item_type in fabric_workspace_obj.deployed_items: - unpublish_flag = unpublish_flag_mapping.get(item_type) - # Append item_type if no feature flag is required or the corresponding flag is enabled - if not unpublish_flag or unpublish_flag in constants.FEATURE_FLAG: - unpublish_order.append(item_type) - elif unpublish_flag and unpublish_flag not in constants.FEATURE_FLAG: - # Log warning when unpublish is skipped due to missing feature flag - logger.warning( - f"Skipping unpublish for {item_type} items because the '{unpublish_flag}' feature flag is not enabled." - ) - - for item_type in unpublish_order: - deployed_names = set(fabric_workspace_obj.deployed_items.get(item_type, {}).keys()) - repository_names = set(fabric_workspace_obj.repository_items.get(item_type, {}).keys()) - - to_delete_set = deployed_names - repository_names - - if is_items_to_include_list: - to_delete_list = [name for name in to_delete_set if f"{name}.{item_type}" in items_to_include] - logger.debug(f"Items to include for unpublishing ({item_type}): {to_delete_list}") - else: - to_delete_list = [name for name in to_delete_set if not regex_pattern.match(name)] - if item_type == "DataPipeline": - find_referenced_items_func = items.find_referenced_datapipelines + if items_to_include and to_delete_list: + logger.debug(f"Items to include for unpublishing ({item_type}): {to_delete_list}") - # Determine order to delete w/o dependencies - to_delete_list = items.set_unpublish_order( - fabric_workspace_obj, item_type, to_delete_list, find_referenced_items_func - ) + publisher = items.ItemPublisher.create(ItemType(item_type), fabric_workspace_obj) + if to_delete_list and publisher.has_dependency_tracking: + to_delete_list = publisher.get_unpublish_order(to_delete_list) for item_name in to_delete_list: fabric_workspace_obj._unpublish_item(item_name=item_name, item_type=item_type) fabric_workspace_obj._refresh_deployed_items() fabric_workspace_obj._refresh_deployed_folders() - if "disable_workspace_folder_publish" not in constants.FEATURE_FLAG: + if FeatureFlag.DISABLE_WORKSPACE_FOLDER_PUBLISH.value not in constants.FEATURE_FLAG: fabric_workspace_obj._unpublish_folders() @@ -519,8 +355,8 @@ def deploy_with_config( """ # Experimental feature flags required to enable if ( - "enable_experimental_features" not in constants.FEATURE_FLAG - or "enable_config_deploy" not in constants.FEATURE_FLAG + FeatureFlag.ENABLE_EXPERIMENTAL_FEATURES.value not in constants.FEATURE_FLAG + or FeatureFlag.ENABLE_CONFIG_DEPLOY.value not in constants.FEATURE_FLAG ): msg = "Config file-based deployment is currently an experimental feature. Both 'enable_experimental_features' and 'enable_config_deploy' feature flags must be set." raise InputError(msg, logger) diff --git a/tests/test_deploy_with_config.py b/tests/test_deploy_with_config.py index f1ddfb68..974953b5 100644 --- a/tests/test_deploy_with_config.py +++ b/tests/test_deploy_with_config.py @@ -375,14 +375,17 @@ class TestConfigOverrides: @patch("fabric_cicd.constants.FEATURE_FLAG", set()) def test_apply_feature_flags(self): """Test applying feature flags from config.""" - config = {"features": ["enable_shortcut_publish", "enable_debug_mode"]} + foo = "enable_foo_feature" + bar = "enable_bar_feature" + + config = {"features": [foo, bar]} apply_config_overrides(config, "N/A") from fabric_cicd import constants - assert "enable_shortcut_publish" in constants.FEATURE_FLAG - assert "enable_debug_mode" in constants.FEATURE_FLAG + assert foo in constants.FEATURE_FLAG + assert bar in constants.FEATURE_FLAG def test_apply_constants_overrides(self): """Test applying constants overrides from config.""" diff --git a/tests/test_environment_publish.py b/tests/test_environment_publish.py index edf08fe8..b1d69149 100644 --- a/tests/test_environment_publish.py +++ b/tests/test_environment_publish.py @@ -113,7 +113,7 @@ def _publish_item(self, item_name, item_type, **kwargs): self.repository_items[item_type][item_name].skip_publish = True ws = FakeWorkspace() - env_module.publish_environments(ws) + env_module.EnvironmentPublisher(ws).publish_all() assert "shell_only_publish" in captured["called_with"] assert captured["called_with"]["shell_only_publish"] is True @@ -197,7 +197,7 @@ def _replace_parameters(self, file_obj, _item_obj): return file_obj.contents ws = FakeWorkspace() - env_module.publish_environments(ws) + env_module.EnvironmentPublisher(ws).publish_all() # Verify the sequence of important calls occurred in order urls = [c[1] for c in calls] @@ -271,7 +271,7 @@ def _replace_parameters(self, file_obj, _item_obj): return file_obj.contents ws = FakeWorkspace() - env_module.publish_environments(ws) + env_module.EnvironmentPublisher(ws).publish_all() urls = [c[1] for c in calls] assert any("/items" in u and u.endswith("/items") for u in urls), "Create item call missing" diff --git a/tests/test_publish.py b/tests/test_publish.py index 39d4aa26..3133b088 100644 --- a/tests/test_publish.py +++ b/tests/test_publish.py @@ -72,9 +72,11 @@ def test_publish_only_existing_item_types(mock_endpoint): patch.object( FabricWorkspace, "_refresh_deployed_folders", new=lambda self: setattr(self, "deployed_folders", {}) ), - patch("fabric_cicd._items.publish_notebooks") as mock_publish_notebooks, - patch("fabric_cicd._items.publish_environments") as mock_publish_environments, + patch("fabric_cicd._items._notebook.NotebookPublisher") as mock_notebook_cls, + patch("fabric_cicd._items._environment.EnvironmentPublisher") as mock_env_cls, ): + mock_notebook_instance = mock_notebook_cls.return_value + workspace = FabricWorkspace( workspace_id="12345678-1234-5678-abcd-1234567890ab", repository_directory=str(temp_path), @@ -88,9 +90,10 @@ def test_publish_only_existing_item_types(mock_endpoint): assert "Notebook" in workspace.repository_items assert "Environment" not in workspace.repository_items - # Verify that only publish_notebooks was called - mock_publish_notebooks.assert_called_once_with(workspace) - mock_publish_environments.assert_not_called() + # Verify that only NotebookPublisher was instantiated + mock_notebook_cls.assert_called_once_with(workspace) + mock_notebook_instance.publish_all.assert_called_once() + mock_env_cls.assert_not_called() def test_default_none_item_type_in_scope_includes_all_types(mock_endpoint): @@ -365,10 +368,10 @@ def test_mirrored_database_published_before_lakehouse(mock_endpoint): # Track the order of function calls call_order = [] - def mock_publish_lakehouses(_workspace): + def mock_publish_lakehouses(): call_order.append("Lakehouse") - def mock_publish_mirroreddatabase(_workspace): + def mock_publish_mirroreddatabase(): call_order.append("MirroredDatabase") with tempfile.TemporaryDirectory() as temp_dir: @@ -422,9 +425,14 @@ def mock_publish_mirroreddatabase(_workspace): patch.object( FabricWorkspace, "_refresh_deployed_folders", new=lambda self: setattr(self, "deployed_folders", {}) ), - patch("fabric_cicd._items.publish_lakehouses", side_effect=mock_publish_lakehouses), - patch("fabric_cicd._items.publish_mirroreddatabase", side_effect=mock_publish_mirroreddatabase), + patch("fabric_cicd._items._lakehouse.LakehousePublisher") as mock_lakehouse_cls, + patch("fabric_cicd._items._mirroreddatabase.MirroredDatabasePublisher") as mock_mirrored_cls, ): + mock_lakehouse_instance = mock_lakehouse_cls.return_value + mock_lakehouse_instance.publish_all.side_effect = mock_publish_lakehouses + mock_mirrored_instance = mock_mirrored_cls.return_value + mock_mirrored_instance.publish_all.side_effect = mock_publish_mirroreddatabase + workspace = FabricWorkspace( workspace_id="12345678-1234-5678-abcd-1234567890ab", repository_directory=str(temp_path), diff --git a/tests/test_shortcut_exclude.py b/tests/test_shortcut_exclude.py index 5ff19c8d..a21861c3 100644 --- a/tests/test_shortcut_exclude.py +++ b/tests/test_shortcut_exclude.py @@ -9,7 +9,7 @@ import pytest from fabric_cicd._common._item import Item -from fabric_cicd._items._lakehouse import process_shortcuts +from fabric_cicd._items._lakehouse import ShortcutPublisher from fabric_cicd.fabric_workspace import FabricWorkspace @@ -110,7 +110,7 @@ def test_process_shortcuts_with_exclude_regex_filters_shortcuts(mock_fabric_work mock_fabric_workspace.shortcut_exclude_regex = "^temp_.*" # Call process_shortcuts - process_shortcuts(mock_fabric_workspace, mock_item) + ShortcutPublisher(mock_fabric_workspace, mock_item).publish_all() # Verify that only the production_shortcut was published post_calls = [ @@ -168,7 +168,7 @@ def test_process_shortcuts_without_exclude_regex_publishes_all(mock_fabric_works mock_fabric_workspace.shortcut_exclude_regex = None # Call process_shortcuts - process_shortcuts(mock_fabric_workspace, mock_item) + ShortcutPublisher(mock_fabric_workspace, mock_item).publish_all() # Verify that both shortcuts were published post_calls = [ @@ -222,7 +222,7 @@ def test_process_shortcuts_exclude_regex_excludes_all_matching(mock_fabric_works mock_fabric_workspace.shortcut_exclude_regex = "^temp_.*" # Call process_shortcuts - process_shortcuts(mock_fabric_workspace, mock_item) + ShortcutPublisher(mock_fabric_workspace, mock_item).publish_all() # Verify that no shortcuts were published post_calls = [ @@ -289,7 +289,7 @@ def test_process_shortcuts_with_complex_regex_pattern(mock_fabric_workspace, moc mock_fabric_workspace.shortcut_exclude_regex = ".*_temp.*" # Call process_shortcuts - process_shortcuts(mock_fabric_workspace, mock_item) + ShortcutPublisher(mock_fabric_workspace, mock_item).publish_all() # Verify that only prod_shortcut was published post_calls = [