From 7bdcf785d49d2938601cff22ff484e4caaa647e6 Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Tue, 23 Sep 2025 10:40:11 +0200 Subject: [PATCH 1/5] fix link --- src/databricks/labs/lakebridge/reconcile/runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/lakebridge/reconcile/runner.py b/src/databricks/labs/lakebridge/reconcile/runner.py index 93dfe1566d..20919ec526 100644 --- a/src/databricks/labs/lakebridge/reconcile/runner.py +++ b/src/databricks/labs/lakebridge/reconcile/runner.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -_RECON_README_URL = "https://github.com/databrickslabs/lakebridge/blob/main/docs/recon_configurations/README.md" +_RECON_DOCS_URL = "https://databrickslabs.github.io/lakebridge/docs/reconcile/" class ReconcileRunner: @@ -76,7 +76,7 @@ def _verify_recon_table_config(self, recon_config): "Cannot find recon table configuration in existing `reconcile` installation. " f"Please provide the configuration file {filename} in the workspace." ) - logger.error(f"{err_msg}. For more details, please refer to {_RECON_README_URL}") + logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}") raise SystemExit(err_msg) from e except (PermissionDenied, SerdeError, ValueError, AttributeError) as e: install_dir = self._installation.install_folder() @@ -84,7 +84,7 @@ def _verify_recon_table_config(self, recon_config): f"Cannot load corrupted recon table configuration from {install_dir}/{filename}. " f"Please validate the file." ) - logger.error(f"{err_msg}. For more details, please refer to {_RECON_README_URL}") + logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}") raise SystemExit(err_msg) from e def _get_recon_job_id(self, reconcile_config: ReconcileConfig) -> int: From c70e43a7266178f241572ff4ce25445d93b46838 Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Tue, 23 Sep 2025 12:10:23 +0200 Subject: [PATCH 2/5] fix package name in recon job deployments --- .../lakebridge/deployment/installation.py | 9 ++-- .../labs/lakebridge/deployment/job.py | 54 ++++++++++++------- .../labs/lakebridge/deployment/recon.py | 10 ++-- tests/unit/deployment/test_job.py | 27 ++++++++-- tests/unit/deployment/test_recon.py | 2 +- 5 files changed, 68 insertions(+), 34 deletions(-) diff --git a/src/databricks/labs/lakebridge/deployment/installation.py b/src/databricks/labs/lakebridge/deployment/installation.py index 96ecf7da4b..88ba248896 100644 --- a/src/databricks/labs/lakebridge/deployment/installation.py +++ b/src/databricks/labs/lakebridge/deployment/installation.py @@ -87,16 +87,15 @@ def _apply_upgrades(self): def _upload_wheel(self): wheels = self._product_info.wheels(self._ws) with wheels: - wheel_paths = [wheels.upload_to_wsfs()] - wheel_paths = [f"/Workspace{wheel}" for wheel in wheel_paths] - return wheel_paths + wheel_path = wheels.upload_to_wsfs() + return f"/Workspace{wheel_path}" def install(self, config: LakebridgeConfiguration): self._apply_upgrades() - wheel_paths: list[str] = self._upload_wheel() + wheel_path = self._upload_wheel() if config.reconcile: logger.info("Installing Lakebridge reconcile Metadata components.") - self._recon_deployment.install(config.reconcile, wheel_paths) + self._recon_deployment.install(config.reconcile, wheel_path) def uninstall(self, config: LakebridgeConfiguration): # This will remove all the Lakebridge modules diff --git a/src/databricks/labs/lakebridge/deployment/job.py b/src/databricks/labs/lakebridge/deployment/job.py index bd86599062..480ad6443c 100644 --- a/src/databricks/labs/lakebridge/deployment/job.py +++ b/src/databricks/labs/lakebridge/deployment/job.py @@ -1,4 +1,3 @@ -import dataclasses import logging from datetime import datetime, timezone, timedelta from typing import Any @@ -32,18 +31,18 @@ def __init__( self._install_state = install_state self._product_info = product_info - def deploy_recon_job(self, name, recon_config: ReconcileConfig, remorph_wheel_path: str): + def deploy_recon_job(self, name, recon_config: ReconcileConfig, lakebridge_wheel_path: str): logger.info("Deploying reconciliation job.") - job_id = self._update_or_create_recon_job(name, recon_config, remorph_wheel_path) + job_id = self._update_or_create_recon_job(name, recon_config, lakebridge_wheel_path) logger.info(f"Reconciliation job deployed with job_id={job_id}") logger.info(f"Job URL: {self._ws.config.host}#job/{job_id}") self._install_state.save() - def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, remorph_wheel_path: str) -> str: + def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, lakebridge_wheel_path: str) -> str: description = "Run the reconciliation process" task_key = "run_reconciliation" - job_settings = self._recon_job_settings(name, task_key, description, recon_config, remorph_wheel_path) + job_settings = self._recon_job_settings(name, task_key, description, recon_config, lakebridge_wheel_path) if name in self._install_state.jobs: try: job_id = int(self._install_state.jobs[name]) @@ -53,7 +52,7 @@ def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, remor except InvalidParameterValue: del self._install_state.jobs[name] logger.warning(f"Job `{name}` does not exist anymore for some reason") - return self._update_or_create_recon_job(name, recon_config, remorph_wheel_path) + return self._update_or_create_recon_job(name, recon_config, lakebridge_wheel_path) logger.info(f"Creating new job configuration for job `{name}`") new_job = self._ws.jobs.create(**job_settings) @@ -67,7 +66,7 @@ def _recon_job_settings( task_key: str, description: str, recon_config: ReconcileConfig, - remorph_wheel_path: str, + lakebridge_wheel_path: str, ) -> dict[str, Any]: latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True) version = self._product_info.version() @@ -95,22 +94,21 @@ def _recon_job_settings( ], "tasks": [ self._job_recon_task( - Task( - task_key=task_key, - description=description, - job_cluster_key="Remorph_Reconciliation_Cluster", - ), + task_key, + description, recon_config, - remorph_wheel_path, + lakebridge_wheel_path, ), ], "max_concurrent_runs": 2, "parameters": [JobParameterDefinition(name="operation_name", default="reconcile")], } - def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig, remorph_wheel_path: str) -> Task: + def _job_recon_task( + self, task_key: str, description: str, recon_config: ReconcileConfig, lakebridge_wheel_path: str + ) -> Task: libraries = [ - compute.Library(whl=remorph_wheel_path), + compute.Library(whl=lakebridge_wheel_path), ] if recon_config.data_source == ReconSourceType.ORACLE.value: @@ -122,18 +120,21 @@ def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig, remorp ), ) - return dataclasses.replace( - jobs_task, + return Task( + task_key=task_key, + description=description, + job_cluster_key="Remorph_Reconciliation_Cluster", libraries=libraries, python_wheel_task=PythonWheelTask( - package_name="databricks_labs_remorph", + package_name=self.parse_package_name(lakebridge_wheel_path), entry_point="reconcile", parameters=["{{job.parameters.[operation_name]}}"], ), ) + # TODO: DRY: delete as it is already implemented in install.py def _is_testing(self): - return self._product_info.product_name() != "remorph" + return self._product_info.product_name() != "lakebridge" @staticmethod def _get_test_purge_time() -> str: @@ -145,3 +146,18 @@ def _get_default_node_type_id(self) -> str: def _name_with_prefix(self, name: str) -> str: prefix = self._installation.product() return f"{prefix.upper()}_{name}".replace(" ", "_") + + def parse_package_name(self, wheel_path: str) -> str: + default_name = "databricks_labs_lakebridge" + + try: + name = wheel_path.split("/")[-1].split("-")[0] + except IndexError: + logger.warning(f"Cannot parse package name from wheel path {wheel_path}, using default.") + name = default_name + + if self._product_info.product_name() not in name: + logger.warning(f"Parsed package name {name} does not match product name, using default.") + name = default_name + + return name diff --git a/src/databricks/labs/lakebridge/deployment/recon.py b/src/databricks/labs/lakebridge/deployment/recon.py index d2e4f9a995..7d466f9a41 100644 --- a/src/databricks/labs/lakebridge/deployment/recon.py +++ b/src/databricks/labs/lakebridge/deployment/recon.py @@ -39,16 +39,14 @@ def __init__( self._job_deployer = job_deployer self._dashboard_deployer = dashboard_deployer - def install(self, recon_config: ReconcileConfig | None, wheel_paths: list[str]): + def install(self, recon_config: ReconcileConfig | None, wheel_path: str): if not recon_config: logger.warning("Recon Config is empty.") return logger.info("Installing reconcile components.") self._deploy_tables(recon_config) self._deploy_dashboards(recon_config) - # TODO INVESTIGATE: Why is this needed? - remorph_wheel_path = [whl for whl in wheel_paths if "lakebridge" in whl][0] - self._deploy_jobs(recon_config, remorph_wheel_path) + self._deploy_jobs(recon_config, wheel_path) self._install_state.save() logger.info("Installation of reconcile components completed successfully.") @@ -108,9 +106,9 @@ def _remove_dashboards(self): logger.warning(f"Dashboard with id={dashboard_id} doesn't exist anymore for some reason.") continue - def _deploy_jobs(self, recon_config: ReconcileConfig, remorph_wheel_path: str): + def _deploy_jobs(self, recon_config: ReconcileConfig, lakebridge_wheel_path: str): logger.info("Deploying reconciliation jobs.") - self._job_deployer.deploy_recon_job(RECON_JOB_NAME, recon_config, remorph_wheel_path) + self._job_deployer.deploy_recon_job(RECON_JOB_NAME, recon_config, lakebridge_wheel_path) for job_name, job_id in self._get_deprecated_jobs(): try: logger.info(f"Removing job_id={job_id}, as it is no longer needed.") diff --git a/tests/unit/deployment/test_job.py b/tests/unit/deployment/test_job.py index dd96a4b528..fe885c1fe5 100644 --- a/tests/unit/deployment/test_job.py +++ b/tests/unit/deployment/test_job.py @@ -65,7 +65,7 @@ def test_deploy_new_job(oracle_recon_config): product_info = ProductInfo.from_class(LakebridgeConfiguration) name = "Recon Job" job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) - job_deployer.deploy_recon_job(name, oracle_recon_config, "remorph-x.y.z-py3-none-any.whl") + job_deployer.deploy_recon_job(name, oracle_recon_config, "lakebridge-x.y.z-py3-none-any.whl") workspace_client.jobs.create.assert_called_once() assert install_state.jobs[name] == str(job.job_id) @@ -80,7 +80,7 @@ def test_deploy_existing_job(snowflake_recon_config): install_state = InstallState.from_installation(installation) product_info = ProductInfo.for_testing(LakebridgeConfiguration) job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) - job_deployer.deploy_recon_job(name, snowflake_recon_config, "remorph-x.y.z-py3-none-any.whl") + job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") workspace_client.jobs.reset.assert_called_once() assert install_state.jobs[name] == str(job.job_id) @@ -96,6 +96,27 @@ def test_deploy_missing_job(snowflake_recon_config): install_state = InstallState.from_installation(installation) product_info = ProductInfo.for_testing(LakebridgeConfiguration) job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) - job_deployer.deploy_recon_job(name, snowflake_recon_config, "remorph-x.y.z-py3-none-any.whl") + job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl") workspace_client.jobs.create.assert_called_once() assert install_state.jobs[name] == str(job.job_id) + + +def test_parse_package_name(): + workspace_client = create_autospec(WorkspaceClient) + installation = MockInstallation(is_global=False) + install_state = InstallState.from_installation(installation) + product_info = ProductInfo.from_class(LakebridgeConfiguration) + job_deployer = JobDeployment(workspace_client, installation, install_state, product_info) + + assert job_deployer.parse_package_name("lakebridge-1.2.3-py3-none-any.whl") == "lakebridge" + assert job_deployer.parse_package_name("remorph-1.2.3-py3-none-any.whl") == "databricks_labs_lakebridge" + assert ( + job_deployer.parse_package_name("databricks_labs_lakebridge-1.2.3-py3-none-any.whl") + == "databricks_labs_lakebridge" + ) + assert ( + job_deployer.parse_package_name( + "/Workspace/Users/username@domain.com/.lakebridge/wheels/databricks_labs_lakebridge-0.10.7-py3-none-any.whl" + ) + == "databricks_labs_lakebridge" + ) diff --git a/tests/unit/deployment/test_recon.py b/tests/unit/deployment/test_recon.py index a33ad9626e..f55c62b757 100644 --- a/tests/unit/deployment/test_recon.py +++ b/tests/unit/deployment/test_recon.py @@ -46,7 +46,7 @@ def test_install_missing_config(ws): dashboard_deployer, ) remorph_config = None - recon_deployer.install(remorph_config, ["remorph-x.y.z-py3-none-any.whl"]) + recon_deployer.install(remorph_config, ["lakebridge-x.y.z-py3-none-any.whl"]) table_deployer.deploy_table_from_ddl_file.assert_not_called() job_deployer.deploy_recon_job.assert_not_called() dashboard_deployer.deploy.assert_not_called() From bca62f1d4e44541fc399d39ef7b4e70b06554772 Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Thu, 25 Sep 2025 12:48:43 +0200 Subject: [PATCH 3/5] improve parsing of package name --- .../labs/lakebridge/deployment/installation.py | 2 +- src/databricks/labs/lakebridge/deployment/job.py | 14 +++++++++----- tests/unit/deployment/test_job.py | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/lakebridge/deployment/installation.py b/src/databricks/labs/lakebridge/deployment/installation.py index 88ba248896..7ff283f0e0 100644 --- a/src/databricks/labs/lakebridge/deployment/installation.py +++ b/src/databricks/labs/lakebridge/deployment/installation.py @@ -84,7 +84,7 @@ def _apply_upgrades(self): except (InvalidParameterValue, NotFound) as err: logger.warning(f"Unable to apply Upgrades due to: {err}") - def _upload_wheel(self): + def _upload_wheel(self) -> str: wheels = self._product_info.wheels(self._ws) with wheels: wheel_path = wheels.upload_to_wsfs() diff --git a/src/databricks/labs/lakebridge/deployment/job.py b/src/databricks/labs/lakebridge/deployment/job.py index 480ad6443c..a785744cbd 100644 --- a/src/databricks/labs/lakebridge/deployment/job.py +++ b/src/databricks/labs/lakebridge/deployment/job.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timezone, timedelta +from pathlib import Path from typing import Any from databricks.labs.blueprint.installation import Installation @@ -150,11 +151,14 @@ def _name_with_prefix(self, name: str) -> str: def parse_package_name(self, wheel_path: str) -> str: default_name = "databricks_labs_lakebridge" - try: - name = wheel_path.split("/")[-1].split("-")[0] - except IndexError: - logger.warning(f"Cannot parse package name from wheel path {wheel_path}, using default.") - name = default_name + if not isinstance(wheel_path, Path): + wheel_path = str(wheel_path) + + if not isinstance(wheel_path, str): + logger.warning("wheel path is not a string, using default.") + return default_name + + name = wheel_path.split("/")[-1].split("-")[0] if self._product_info.product_name() not in name: logger.warning(f"Parsed package name {name} does not match product name, using default.") diff --git a/tests/unit/deployment/test_job.py b/tests/unit/deployment/test_job.py index fe885c1fe5..10cead759d 100644 --- a/tests/unit/deployment/test_job.py +++ b/tests/unit/deployment/test_job.py @@ -101,7 +101,7 @@ def test_deploy_missing_job(snowflake_recon_config): assert install_state.jobs[name] == str(job.job_id) -def test_parse_package_name(): +def test_parse_package_name() -> None: workspace_client = create_autospec(WorkspaceClient) installation = MockInstallation(is_global=False) install_state = InstallState.from_installation(installation) From 5adbe667952933471d19ba7fdc397913a09a5d34 Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Thu, 25 Sep 2025 15:21:23 +0200 Subject: [PATCH 4/5] remove validation --- src/databricks/labs/lakebridge/deployment/job.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/databricks/labs/lakebridge/deployment/job.py b/src/databricks/labs/lakebridge/deployment/job.py index a785744cbd..f1156e4ce6 100644 --- a/src/databricks/labs/lakebridge/deployment/job.py +++ b/src/databricks/labs/lakebridge/deployment/job.py @@ -151,13 +151,6 @@ def _name_with_prefix(self, name: str) -> str: def parse_package_name(self, wheel_path: str) -> str: default_name = "databricks_labs_lakebridge" - if not isinstance(wheel_path, Path): - wheel_path = str(wheel_path) - - if not isinstance(wheel_path, str): - logger.warning("wheel path is not a string, using default.") - return default_name - name = wheel_path.split("/")[-1].split("-")[0] if self._product_info.product_name() not in name: From f97b442f6604d9d601c8fd4f5b0ba32578977cad Mon Sep 17 00:00:00 2001 From: M Abulazm Date: Thu, 25 Sep 2025 15:42:30 +0200 Subject: [PATCH 5/5] fmt --- src/databricks/labs/lakebridge/deployment/job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/labs/lakebridge/deployment/job.py b/src/databricks/labs/lakebridge/deployment/job.py index f1156e4ce6..5a7f4e40e0 100644 --- a/src/databricks/labs/lakebridge/deployment/job.py +++ b/src/databricks/labs/lakebridge/deployment/job.py @@ -1,6 +1,5 @@ import logging from datetime import datetime, timezone, timedelta -from pathlib import Path from typing import Any from databricks.labs.blueprint.installation import Installation