Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e4e074b
Docs
mdrakiburrahman Jan 26, 2026
ed9dfba
Bring in changes from previous branch
mdrakiburrahman Jan 26, 2026
2da3f41
Factor out exclude
mdrakiburrahman Jan 26, 2026
05ade34
Make API mapping a constant
mdrakiburrahman Jan 26, 2026
c7bfdcb
Encapsulate environments and pipelines better
mdrakiburrahman Jan 26, 2026
2a21653
Unnecessary change
mdrakiburrahman Jan 26, 2026
b71024b
Push more logic into the base publisher
mdrakiburrahman Jan 26, 2026
3282403
Whitespace
mdrakiburrahman Jan 26, 2026
4e76a5f
Factor out dupe logic for validate_items_to_include
mdrakiburrahman Jan 26, 2026
0d4bf0c
English
mdrakiburrahman Jan 26, 2026
f19a476
Missed one
mdrakiburrahman Jan 26, 2026
1c35265
Merge origin/main into dev/mdrrahman/parallelize-everything
mdrakiburrahman Jan 28, 2026
4104c4f
Remove unused feature flag
mdrakiburrahman Jan 28, 2026
7ddd4d0
Make validate_experimental_param generic
mdrakiburrahman Jan 28, 2026
0cc63be
Prefix with item type
mdrakiburrahman Jan 28, 2026
8edc409
Lint fix
mdrakiburrahman Jan 28, 2026
feee24c
Remove location output for terminal
mdrakiburrahman Jan 28, 2026
11a6175
Update tracer to be thread safe with OS agnostic file locker
mdrakiburrahman Jan 28, 2026
d332147
Lint
mdrakiburrahman Jan 28, 2026
e3f3da8
Remove orgapp support as of https://github.com/microsoft/fabric-cicd/…
mdrakiburrahman Jan 28, 2026
33f1230
Reorder functions, public first
mdrakiburrahman Jan 28, 2026
6bf1b45
Docstrings
mdrakiburrahman Jan 28, 2026
d889839
Merge branch 'main' into dev/mdrrahman/parallelize-everything
mdrakiburrahman Jan 29, 2026
659c259
Restore formatting to how it used to be but add item and name into fu…
mdrakiburrahman Jan 29, 2026
fbc25e6
Merge branch 'main' into dev/mdrrahman/parallelize-everything
shirasassoon Feb 2, 2026
943210a
Swallow error into log
mdrakiburrahman Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ cython_debug/

# http traces should only be committed at the fixture root
/http_trace.json
/http_trace.json.lock
/http_trace.json.gz
3 changes: 3 additions & 0 deletions src/fabric_cicd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import fabric_cicd.constants as constants
from fabric_cicd._common._check_utils import check_version
from fabric_cicd._common._logging import configure_logger, exception_handler
from fabric_cicd.constants import FeatureFlag, ItemType
from fabric_cicd.fabric_workspace import FabricWorkspace
from fabric_cicd.publish import deploy_with_config, publish_all_items, unpublish_all_orphan_items

Expand Down Expand Up @@ -56,6 +57,8 @@ def change_log_level(level: str = "DEBUG") -> None:

__all__ = [
"FabricWorkspace",
"FeatureFlag",
"ItemType",
"append_feature_flag",
"change_log_level",
"deploy_with_config",
Expand Down
21 changes: 21 additions & 0 deletions src/fabric_cicd/_common/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,24 @@ class FailedPublishedItemStatusError(BaseCustomError):

class MissingFileError(BaseCustomError):
pass


class PublishError(BaseCustomError):
"""Exception raised when one or more publish operations fail.

Attributes:
errors: List of (item_name, exception) tuples for all failed items.
"""

def __init__(self, errors: list[tuple[str, Exception]], logger: Logger) -> None:
"""Initialize with a list of (item_name, exception) tuples."""
self.errors = errors
failed_names = [name for name, _ in errors]
message = f"Failed to publish {len(errors)} item(s): {failed_names}"

additional_info_parts = []
for item_name, exc in errors:
additional_info_parts.append(f"\n--- {item_name} ---\n{exc!s}")
additional_info = "\n".join(additional_info_parts) if additional_info_parts else None

super().__init__(message, logger, additional_info)
64 changes: 64 additions & 0 deletions src/fabric_cicd/_common/_file_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Cross-platform file locking."""

import sys
from pathlib import Path
from types import TracebackType
from typing import Callable, Optional, TypeVar

T = TypeVar("T")


class FileLock:
"""File lock context manager."""

def __init__(self, lock_file: str) -> None:
self.lock_path = Path(f"{lock_file}.lock")
self._lock_file: Optional[object] = None

def __enter__(self) -> "FileLock":
self._lock_file = self.lock_path.open("w")
if sys.platform == "win32":
import msvcrt

msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_LOCK, 1)
else:
import fcntl

fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_EX)
return self

def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
if self._lock_file:
if sys.platform == "win32":
import msvcrt

msvcrt.locking(self._lock_file.fileno(), msvcrt.LK_UNLCK, 1)
else:
import fcntl

fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN)
self._lock_file.close()
return False

@staticmethod
def run_with_lock(lock_file: str, func: Callable[[], T]) -> T:
"""
Execute a function while holding an exclusive file lock.

Args:
lock_file: Path to the file to lock (a .lock suffix will be added)
func: The function to execute while holding the lock

Returns:
The return value of the function
"""
with FileLock(lock_file):
return func()
66 changes: 35 additions & 31 deletions src/fabric_cicd/_common/_http_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import requests

from fabric_cicd._common._file_lock import FileLock
from fabric_cicd.constants import AUTHORIZATION_HEADER, EnvVar

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -192,40 +193,43 @@ def save(self) -> None:
return

try:
output_path = Path(self.output_file)
existing_traces: list[dict] = []
if output_path.exists():
with output_path.open("r") as f:
existing_data = json.load(f)
existing_traces = existing_data.get("traces", [])

for capture in self.captures:
request_b64 = capture.get("request_b64", "")
response_b64 = capture.get("response_b64", "")

request_data = None
response_data = None

if request_b64:
request_data = json.loads(base64.b64decode(request_b64).decode())
if response_b64:
response_data = json.loads(base64.b64decode(response_b64).decode())

existing_traces.append({"request": request_data, "response": response_data})

existing_traces.sort(key=lambda x: x["request"].get("timestamp", "") if x.get("request") else "")
output_data = {
"description": "HTTP trace data from Fabric API interactions",
"total_traces": len(existing_traces),
"traces": existing_traces,
}

with output_path.open("w") as f:
json.dump(output_data, f, indent=2)

FileLock.run_with_lock(self.output_file, self._flush_traces_to_file)
except Exception as e:
logger.warning(f"Failed to save HTTP trace: {e}")

def _flush_traces_to_file(self) -> None:
"""Flush captured traces to the output file (called within lock)."""
output_path = Path(self.output_file)
existing_traces: list[dict] = []
if output_path.exists() and output_path.stat().st_size > 0:
with output_path.open("r") as f:
existing_data = json.load(f)
existing_traces = existing_data.get("traces", [])

for capture in self.captures:
request_b64 = capture.get("request_b64", "")
response_b64 = capture.get("response_b64", "")

request_data = None
response_data = None

if request_b64:
request_data = json.loads(base64.b64decode(request_b64).decode())
if response_b64:
response_data = json.loads(base64.b64decode(response_b64).decode())

existing_traces.append({"request": request_data, "response": response_data})

existing_traces.sort(key=lambda x: x["request"].get("timestamp", "") if x.get("request") else "")
output_data = {
"description": "HTTP trace data from Fabric API interactions",
"total_traces": len(existing_traces),
"traces": existing_traces,
}

with output_path.open("w") as f:
json.dump(output_data, f, indent=2)


class HTTPTracerFactory:
"""Factory class for creating HTTP tracer instances."""
Expand Down
96 changes: 96 additions & 0 deletions src/fabric_cicd/_common/_validate_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import fabric_cicd.constants as constants
from fabric_cicd._common._exceptions import InputError
from fabric_cicd.constants import FeatureFlag, OperationType
from fabric_cicd.fabric_workspace import FabricWorkspace

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -156,3 +157,98 @@ def validate_token_credential(input_value: TokenCredential) -> TokenCredential:
validate_data_type("TokenCredential", "credential", input_value)

return input_value


def validate_experimental_param(
param_value: Optional[str],
required_flag: "FeatureFlag",
warning_message: str,
risk_warning: str,
) -> None:
"""
Generic validation for optional parameters requiring experimental feature flags.

Args:
param_value: The parameter value (None means skip validation).
required_flag: The specific feature flag required (in addition to experimental).
warning_message: Primary warning message when feature is enabled.
risk_warning: Risk/caution warning message.

Raises:
InputError: If required feature flags are not enabled.
"""
from fabric_cicd.constants import FeatureFlag

if param_value is None:
return

if (
FeatureFlag.ENABLE_EXPERIMENTAL_FEATURES.value not in constants.FEATURE_FLAG
or required_flag.value not in constants.FEATURE_FLAG
):
msg = f"Feature flags 'enable_experimental_features' and '{required_flag.value}' must be set."
raise InputError(msg, logger)

logger.warning(warning_message)
logger.warning(risk_warning)


def validate_items_to_include(items_to_include: Optional[list[str]], operation: "OperationType") -> None:
"""
Validate items_to_include parameter and check required feature flags.

Args:
items_to_include: List of items in "item_name.item_type" format, or None.
operation: The type of operation being performed (publish or unpublish).

Raises:
InputError: If required feature flags are not enabled.
"""
from fabric_cicd.constants import FeatureFlag

validate_experimental_param(
param_value=items_to_include,
required_flag=FeatureFlag.ENABLE_ITEMS_TO_INCLUDE,
warning_message=f"Selective {operation.value} is enabled.",
risk_warning=f"Using items_to_include is risky as it can prevent needed dependencies from being {operation.value}. Use at your own risk.",
)


def validate_folder_path_exclude_regex(folder_path_exclude_regex: Optional[str]) -> None:
"""
Validate folder_path_exclude_regex parameter and check required feature flags.

Args:
folder_path_exclude_regex: Regex pattern to exclude items based on their folder path, or None.

Raises:
InputError: If required feature flags are not enabled.
"""
from fabric_cicd.constants import FeatureFlag

validate_experimental_param(
param_value=folder_path_exclude_regex,
required_flag=FeatureFlag.ENABLE_EXCLUDE_FOLDER,
warning_message="Folder path exclusion is enabled.",
risk_warning="Using folder_path_exclude_regex is risky as it can prevent needed dependencies from being deployed. Use at your own risk.",
)


def validate_shortcut_exclude_regex(shortcut_exclude_regex: Optional[str]) -> None:
"""
Validate shortcut_exclude_regex parameter and check required feature flags.

Args:
shortcut_exclude_regex: Regex pattern to exclude specific shortcuts from being published, or None.

Raises:
InputError: If required feature flags are not enabled.
"""
from fabric_cicd.constants import FeatureFlag

validate_experimental_param(
param_value=shortcut_exclude_regex,
required_flag=FeatureFlag.ENABLE_SHORTCUT_EXCLUDE,
warning_message="Shortcut exclusion is enabled.",
risk_warning="Using shortcut_exclude_regex will selectively exclude shortcuts from being deployed to lakehouses. Use with caution.",
)
59 changes: 5 additions & 54 deletions src/fabric_cicd/_items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,11 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from fabric_cicd._items._activator import publish_activators
from fabric_cicd._items._apacheairflowjob import publish_apacheairflowjobs
from fabric_cicd._items._copyjob import publish_copyjobs
from fabric_cicd._items._dataagent import publish_dataagents
from fabric_cicd._items._dataflowgen2 import publish_dataflows
from fabric_cicd._items._datapipeline import find_referenced_datapipelines, publish_datapipelines
from fabric_cicd._items._environment import check_environment_publish_state, publish_environments
from fabric_cicd._items._eventhouse import publish_eventhouses
from fabric_cicd._items._eventstream import publish_eventstreams
from fabric_cicd._items._graphqlapi import publish_graphqlapis
from fabric_cicd._items._kqldashboard import publish_kqldashboard
from fabric_cicd._items._kqldatabase import publish_kqldatabases
from fabric_cicd._items._kqlqueryset import publish_kqlquerysets
from fabric_cicd._items._lakehouse import publish_lakehouses
from fabric_cicd._items._manage_dependencies import set_unpublish_order
from fabric_cicd._items._mirroreddatabase import publish_mirroreddatabase
from fabric_cicd._items._mlexperiment import publish_mlexperiments
from fabric_cicd._items._mounteddatafactory import publish_mounteddatafactories
from fabric_cicd._items._notebook import publish_notebooks
from fabric_cicd._items._report import publish_reports
from fabric_cicd._items._semanticmodel import publish_semanticmodels
from fabric_cicd._items._sparkjobdefinition import publish_sparkjobdefinitions
from fabric_cicd._items._sqldatabase import publish_sqldatabases
from fabric_cicd._items._userdatafunction import publish_userdatafunctions
from fabric_cicd._items._variablelibrary import publish_variablelibraries
from fabric_cicd._items._warehouse import publish_warehouses
from fabric_cicd._common._exceptions import PublishError
from fabric_cicd._items._base_publisher import ItemPublisher, ParallelConfig

__all__ = [
"check_environment_publish_state",
"find_referenced_datapipelines",
"publish_activators",
"publish_apacheairflowjobs",
"publish_copyjobs",
"publish_dataagents",
"publish_dataflows",
"publish_datapipelines",
"publish_environments",
"publish_eventhouses",
"publish_eventstreams",
"publish_graphqlapis",
"publish_kqldashboard",
"publish_kqldatabases",
"publish_kqlquerysets",
"publish_lakehouses",
"publish_mirroreddatabase",
"publish_mlexperiments",
"publish_mounteddatafactories",
"publish_notebooks",
"publish_reports",
"publish_semanticmodels",
"publish_sparkjobdefinitions",
"publish_sqldatabases",
"publish_userdatafunctions",
"publish_variablelibraries",
"publish_warehouses",
"set_unpublish_order",
"ItemPublisher",
"ParallelConfig",
"PublishError",
]
20 changes: 5 additions & 15 deletions src/fabric_cicd/_items/_activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,11 @@

"""Functions to process and deploy Reflex item."""

import logging
from fabric_cicd._items._base_publisher import ItemPublisher
from fabric_cicd.constants import ItemType

from fabric_cicd import FabricWorkspace

logger = logging.getLogger(__name__)
class ActivatorPublisher(ItemPublisher):
"""Publisher for Reflex AKA Activator items."""


def publish_activators(fabric_workspace_obj: FabricWorkspace) -> None:
"""
Publishes all reflex items from the repository.

Args:
fabric_workspace_obj: The FabricWorkspace object containing the items to be published
"""
item_type = "Reflex"

for item_name in fabric_workspace_obj.repository_items.get(item_type, {}):
fabric_workspace_obj._publish_item(item_name=item_name, item_type=item_type)
item_type = ItemType.REFLEX.value
Loading