Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
dd726b5
feat: integrate Switch transpiler with Lakebridge installer
hiroyukinakazato-db Sep 30, 2025
febb62d
Merge branch 'main' into feature/switch-installer-integration
hiroyukinakazato-db Sep 30, 2025
fa26b4c
fix: remove undefined URLError from exception handling
hiroyukinakazato-db Sep 30, 2025
6511e20
refactor: streamline SwitchInstaller deployment logic and update tests
hiroyukinakazato-db Oct 3, 2025
33ea7de
refactor: simplify SwitchInstaller test structure and improve assertions
hiroyukinakazato-db Oct 3, 2025
d0c63c3
Merge remote-tracking branch 'origin/main' into feature/switch-instal…
hiroyukinakazato-db Oct 3, 2025
7cb9ea9
feat: add Switch transpiler installer for Lakebridge integration
hiroyukinakazato-db Oct 7, 2025
467dea9
fix: support case-insensitive config lookup in SwitchInstaller
hiroyukinakazato-db Oct 8, 2025
57298b0
Merge branch 'main' into feature/switch-installer-integration
hiroyukinakazato-db Oct 8, 2025
09c0eb8
Merge branch 'main' into feature/switch-installer-integration
hiroyukinakazato-db Oct 9, 2025
8439314
refactor: separate Switch installation from workspace deployment
hiroyukinakazato-db Oct 9, 2025
5f66f3f
Merge branch 'main' into feature/switch-installer-integration
hiroyukinakazato-db Oct 9, 2025
9dc4b04
refactor: encapsulate Switch package path resolution in SwitchDeployment
hiroyukinakazato-db Oct 9, 2025
7637234
test: update Switch installation tests for refactored interface
hiroyukinakazato-db Oct 9, 2025
729cb0d
Merge branch 'main' into feature/switch-installer-integration
hiroyukinakazato-db Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ commands:
- name: interactive
description: (Optional) Whether installing in interactive mode (`true|false|auto`); configuration settings are prompted for when interactive
default: auto
- name: include-llm-transpiler
description: (Optional) Whether to include LLM-based transpiler in installation ('true'|'false')
default: false

- name: describe-transpile
description: Describe installed transpilers
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ bad-functions = ["map", "input"]
# ignored-parents =

# Maximum number of arguments for function / method.
max-args = 12
max-args = 13

# Maximum number of attributes for a class (see R0902).
max-attributes = 13
Expand Down
7 changes: 6 additions & 1 deletion src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ def install_transpile(
w: WorkspaceClient,
artifact: str | None = None,
interactive: str | None = None,
include_llm_transpiler: bool = False,
transpiler_repository: TranspilerRepository = TranspilerRepository.user_home(),
) -> None:
"""Install or upgrade the Lakebridge transpilers."""
Expand All @@ -631,9 +632,13 @@ def install_transpile(
ctx.add_user_agent_extra("cmd", "install-transpile")
if artifact:
ctx.add_user_agent_extra("artifact-overload", Path(artifact).name)
if include_llm_transpiler:
ctx.add_user_agent_extra("include-llm-transpiler", "true")
user = w.current_user
logger.debug(f"User: {user}")
transpile_installer = installer(w, transpiler_repository, is_interactive=is_interactive)
transpile_installer = installer(
w, transpiler_repository, is_interactive=is_interactive, include_llm=include_llm_transpiler
)
transpile_installer.run(module="transpile", artifact=artifact)


Expand Down
9 changes: 9 additions & 0 deletions src/databricks/labs/lakebridge/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ def prompt_for_value(self, prompts: Prompts) -> JsonValue:
raise ValueError(f"Unsupported prompt method: {self.method}")


@dataclass
class SwitchResourcesConfig:
catalog: str
schema: str
volume: str


@dataclass
class TranspileConfig:
__file__ = "config.yml"
Expand All @@ -152,9 +159,11 @@ class TranspileConfig:
error_file_path: str | None = None
sdk_config: dict[str, str] | None = None
skip_validation: bool = False
include_llm: bool = False
catalog_name: str = "remorph"
schema_name: str = "transpiler"
transpiler_options: JsonValue = None
switch_resources: SwitchResourcesConfig | None = None

@property
def transpiler_path(self) -> Path | None:
Expand Down
14 changes: 14 additions & 0 deletions src/databricks/labs/lakebridge/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from databricks.labs.lakebridge.deployment.dashboard import DashboardDeployment
from databricks.labs.lakebridge.deployment.installation import WorkspaceInstallation
from databricks.labs.lakebridge.deployment.recon import TableDeployment, JobDeployment, ReconDeployment
from databricks.labs.lakebridge.deployment.switch import SwitchDeployment
from databricks.labs.lakebridge.helpers.metastore import CatalogOperations
from databricks.labs.lakebridge.transpiler.repository import TranspilerRepository

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -119,13 +121,25 @@ def recon_deployment(self) -> ReconDeployment:
self.dashboard_deployment,
)

@cached_property
def switch_deployment(self) -> SwitchDeployment:
return SwitchDeployment(
self.workspace_client,
self.installation,
self.install_state,
self.product_info,
self.job_deployment,
TranspilerRepository.user_home(),
)

@cached_property
def workspace_installation(self) -> WorkspaceInstallation:
return WorkspaceInstallation(
self.workspace_client,
self.prompts,
self.installation,
self.recon_deployment,
self.switch_deployment,
self.product_info,
self.upgrades,
)
Expand Down
26 changes: 26 additions & 0 deletions src/databricks/labs/lakebridge/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from databricks.labs.lakebridge.config import LakebridgeConfiguration
from databricks.labs.lakebridge.deployment.recon import ReconDeployment
from databricks.labs.lakebridge.deployment.switch import SwitchDeployment

logger = logging.getLogger("databricks.labs.lakebridge.install")

Expand All @@ -24,13 +25,15 @@ def __init__(
prompts: Prompts,
installation: Installation,
recon_deployment: ReconDeployment,
switch_deployment: SwitchDeployment,
product_info: ProductInfo,
upgrades: Upgrades,
):
self._ws = ws
self._prompts = prompts
self._installation = installation
self._recon_deployment = recon_deployment
self._switch_deployment = switch_deployment
self._product_info = product_info
self._upgrades = upgrades

Expand Down Expand Up @@ -96,6 +99,16 @@ def install(self, config: LakebridgeConfiguration):
if config.reconcile:
logger.info("Installing Lakebridge reconcile Metadata components.")
self._recon_deployment.install(config.reconcile, wheel_path)
if config.transpile and config.transpile.include_llm:
resources = config.transpile.switch_resources
if resources is None:
logger.error(
"Switch resources are missing. Run `lakebridge install-transpile --include-llm-transpiler true` "
"with interactive prompts to capture the Switch catalog, schema, and volume before retrying."
)
else:
logger.info("Installing Switch transpiler to workspace.")
self._switch_deployment.install(resources)

def uninstall(self, config: LakebridgeConfiguration):
# This will remove all the Lakebridge modules
Expand All @@ -116,9 +129,22 @@ def uninstall(self, config: LakebridgeConfiguration):
f"Won't remove transpile validation schema `{config.transpile.schema_name}` "
f"from catalog `{config.transpile.catalog_name}`. Please remove it manually."
)
self._uninstall_switch_job()

if config.reconcile:
self._recon_deployment.uninstall(config.reconcile)

self._installation.remove()
logger.info("Uninstallation completed successfully.")

def _uninstall_switch_job(self) -> None:
"""Remove Switch transpiler job if exists."""
resources = self._switch_deployment.get_configured_resources()
self._switch_deployment.uninstall()

if resources:
logger.info(
f"Won't remove Switch resources: catalog=`{resources['catalog']}`, "
f"schema=`{resources['schema']}`, volume=`{resources['volume']}`. "
"Please remove them manually if needed."
)
229 changes: 229 additions & 0 deletions src/databricks/labs/lakebridge/deployment/switch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import logging
import os
import sys
from pathlib import Path

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.wheels import ProductInfo
from databricks.labs.lakebridge.deployment.job import JobDeployment
from databricks.labs.lakebridge.config import SwitchResourcesConfig
from databricks.labs.lakebridge.transpiler.repository import TranspilerRepository
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue, NotFound
from databricks.sdk.service.jobs import JobParameterDefinition, JobSettings, NotebookTask, Source, Task

logger = logging.getLogger(__name__)


class SwitchDeployment:
_INSTALL_STATE_KEY = "Switch"
_TRANSPILER_ID = "switch"

def __init__(
self,
ws: WorkspaceClient,
installation: Installation,
install_state: InstallState,
product_info: ProductInfo,
job_deployer: JobDeployment,
transpiler_repository: TranspilerRepository,
):
self._ws = ws
self._installation = installation
self._install_state = install_state
self._product_info = product_info
self._job_deployer = job_deployer
self._transpiler_repository = transpiler_repository

def install(self, resources: SwitchResourcesConfig) -> None:
"""Deploy Switch to workspace and configure resources."""
logger.info("Deploying Switch to workspace...")
self._deploy_workspace(self._get_switch_package_path())
self._setup_job()
self._record_resources(resources)
logger.info("Switch deployment completed")

def uninstall(self) -> None:
"""Remove Switch job from workspace."""
if self._INSTALL_STATE_KEY not in self._install_state.jobs:
logger.info("No Switch job found in InstallState")
return

try:
job_id = int(self._install_state.jobs[self._INSTALL_STATE_KEY])
logger.info(f"Removing Switch job with job_id={job_id}")
del self._install_state.jobs[self._INSTALL_STATE_KEY]
self._ws.jobs.delete(job_id)
self._install_state.save()
except (InvalidParameterValue, NotFound):
logger.warning(f"Switch job {job_id} doesn't exist anymore")
self._install_state.save()

def get_configured_resources(self) -> dict[str, str] | None:
"""Get configured Switch resources (catalog, schema, volume)."""
if self._install_state.switch_resources:
return {
"catalog": self._install_state.switch_resources.get("catalog"),
"schema": self._install_state.switch_resources.get("schema"),
"volume": self._install_state.switch_resources.get("volume"),
}
return None

def _deploy_workspace(self, switch_package_dir: Path) -> None:
"""Deploy Switch package to workspace from site-packages."""
try:
logger.info("Deploying Switch package to workspace...")
remote_path = f"{self._TRANSPILER_ID}/databricks"
self._upload_directory(switch_package_dir, remote_path)
logger.info("Switch workspace deployment completed")
except (OSError, ValueError, AttributeError) as e:
logger.error(f"Failed to deploy to workspace: {e}")

def _upload_directory(self, local_path: Path, remote_prefix: str) -> None:
"""Recursively upload directory to workspace, excluding cache files."""
for root, dirs, files in os.walk(local_path):
# Skip cache directories and hidden directories
dirs[:] = [d for d in dirs if d != "__pycache__" and not d.startswith(".")]

for file in files:
# Skip compiled Python files and hidden files
if file.endswith((".pyc", ".pyo")) or file.startswith("."):
continue

local_file = Path(root) / file
rel_path = local_file.relative_to(local_path)
remote_path = f"{remote_prefix}/{rel_path}"

with open(local_file, "rb") as f:
content = f.read()

self._installation.upload(remote_path, content)

def _setup_job(self) -> None:
"""Create or update Switch job."""
existing_job_id = self._get_existing_job_id()
logger.info("Setting up Switch job in workspace...")
try:
job_id = self._create_or_update_switch_job(existing_job_id)
self._install_state.jobs[self._INSTALL_STATE_KEY] = job_id
self._install_state.save()
job_url = f"{self._ws.config.host}/jobs/{job_id}"
logger.info(f"Switch job created/updated: {job_url}")
except (RuntimeError, ValueError, InvalidParameterValue) as e:
logger.error(f"Failed to create/update Switch job: {e}")

def _get_existing_job_id(self) -> str | None:
"""Check if Switch job already exists in workspace."""
if self._INSTALL_STATE_KEY not in self._install_state.jobs:
return None
try:
job_id = self._install_state.jobs[self._INSTALL_STATE_KEY]
self._ws.jobs.get(int(job_id))
return job_id
except (InvalidParameterValue, NotFound, ValueError):
return None

def _create_or_update_switch_job(self, job_id: str | None) -> str:
"""Create or update Switch job, returning job ID."""
job_settings = self._get_switch_job_settings()

# Try to update existing job
if job_id:
try:
logger.info(f"Updating Switch job: {job_id}")
self._ws.jobs.reset(int(job_id), JobSettings(**job_settings))
return job_id
except (ValueError, InvalidParameterValue):
logger.warning("Previous Switch job not found, creating new one")

# Create new job
logger.info("Creating new Switch job")
new_job = self._ws.jobs.create(**job_settings)
new_job_id = str(new_job.job_id)
assert new_job_id is not None
return new_job_id

def _get_switch_job_settings(self) -> dict:
"""Build job settings for Switch transpiler."""
product = self._installation.product()
job_name = f"{product.upper()}_Switch"
version = ProductInfo.from_class(self.__class__).version()
user_name = self._installation.username()
notebook_path = (
f"/Workspace/Users/{user_name}/.{product}/{self._TRANSPILER_ID}/"
f"databricks/labs/switch/notebooks/00_main"
)

task = Task(
task_key="run_transpilation",
notebook_task=NotebookTask(
notebook_path=notebook_path,
source=Source.WORKSPACE,
),
disable_auto_optimization=True, # To disable retries on failure
)

return {
"name": job_name,
"tags": {"created_by": user_name, "switch_version": f"v{version}"},
"tasks": [task],
"parameters": self._get_switch_job_parameters(),
"max_concurrent_runs": 100, # Allow simultaneous transpilations
}

def _get_switch_job_parameters(self) -> list[JobParameterDefinition]:
"""Build job-level parameter definitions from installed config.yml."""
configs = self._transpiler_repository.all_transpiler_configs()
config = configs.get(self._INSTALL_STATE_KEY) or configs.get(self._TRANSPILER_ID)

if not config:
raise ValueError(
"Switch config.yml not found. This indicates an incomplete installation. "
"Please reinstall Switch transpiler."
)

# Add required runtime parameters not in config at the beginning
parameters = {
"input_dir": "",
"output_dir": "",
"result_catalog": "",
"result_schema": "",
"builtin_prompt": "",
}

# Then add parameters from config.yml
for option in config.options.get("all", []):
flag = option.flag
default = option.default or ""

# Convert special values
if default == "<none>":
default = ""
elif isinstance(default, (int, float)):
default = str(default)

parameters[flag] = default

return [JobParameterDefinition(name=key, default=value) for key, value in parameters.items()]

def _record_resources(self, resources: SwitchResourcesConfig) -> None:
"""Persist configured Switch resources for later reuse."""
self._install_state.switch_resources["catalog"] = resources.catalog
self._install_state.switch_resources["schema"] = resources.schema
self._install_state.switch_resources["volume"] = resources.volume
self._install_state.save()
logger.info(
f"Switch resources stored: catalog=`{resources.catalog}`, "
f"schema=`{resources.schema}`, volume=`{resources.volume}`"
)

def _get_switch_package_path(self) -> Path:
"""Get Switch package path (databricks directory) from site-packages."""
product_path = self._transpiler_repository.transpilers_path() / self._TRANSPILER_ID
venv_path = product_path / "lib" / ".venv"

if sys.platform != "win32":
major, minor = sys.version_info[:2]
return venv_path / "lib" / f"python{major}.{minor}" / "site-packages" / "databricks"
return venv_path / "Lib" / "site-packages" / "databricks"
Loading
Loading