Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/databricks/labs/lakebridge/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ def _apply_upgrades(self):
def _upload_wheel(self):
wheels = self._product_info.wheels(self._ws)
with wheels:
wheel_paths = [wheels.upload_to_wsfs()]
wheel_paths = [f"/Workspace{wheel}" for wheel in wheel_paths]
return wheel_paths
wheel_path = wheels.upload_to_wsfs()
return f"/Workspace{wheel_path}"

def install(self, config: LakebridgeConfiguration):
self._apply_upgrades()
wheel_paths: list[str] = self._upload_wheel()
wheel_path = self._upload_wheel()
if config.reconcile:
logger.info("Installing Lakebridge reconcile Metadata components.")
self._recon_deployment.install(config.reconcile, wheel_paths)
self._recon_deployment.install(config.reconcile, wheel_path)

def uninstall(self, config: LakebridgeConfiguration):
# This will remove all the Lakebridge modules
Expand Down
54 changes: 35 additions & 19 deletions src/databricks/labs/lakebridge/deployment/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dataclasses
import logging
from datetime import datetime, timezone, timedelta
from typing import Any
Expand Down Expand Up @@ -32,18 +31,18 @@ def __init__(
self._install_state = install_state
self._product_info = product_info

def deploy_recon_job(self, name, recon_config: ReconcileConfig, remorph_wheel_path: str):
def deploy_recon_job(self, name, recon_config: ReconcileConfig, lakebridge_wheel_path: str):
logger.info("Deploying reconciliation job.")
job_id = self._update_or_create_recon_job(name, recon_config, remorph_wheel_path)
job_id = self._update_or_create_recon_job(name, recon_config, lakebridge_wheel_path)
logger.info(f"Reconciliation job deployed with job_id={job_id}")
logger.info(f"Job URL: {self._ws.config.host}#job/{job_id}")
self._install_state.save()

def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, remorph_wheel_path: str) -> str:
def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, lakebridge_wheel_path: str) -> str:
description = "Run the reconciliation process"
task_key = "run_reconciliation"

job_settings = self._recon_job_settings(name, task_key, description, recon_config, remorph_wheel_path)
job_settings = self._recon_job_settings(name, task_key, description, recon_config, lakebridge_wheel_path)
if name in self._install_state.jobs:
try:
job_id = int(self._install_state.jobs[name])
Expand All @@ -53,7 +52,7 @@ def _update_or_create_recon_job(self, name, recon_config: ReconcileConfig, remor
except InvalidParameterValue:
del self._install_state.jobs[name]
logger.warning(f"Job `{name}` does not exist anymore for some reason")
return self._update_or_create_recon_job(name, recon_config, remorph_wheel_path)
return self._update_or_create_recon_job(name, recon_config, lakebridge_wheel_path)

logger.info(f"Creating new job configuration for job `{name}`")
new_job = self._ws.jobs.create(**job_settings)
Expand All @@ -67,7 +66,7 @@ def _recon_job_settings(
task_key: str,
description: str,
recon_config: ReconcileConfig,
remorph_wheel_path: str,
lakebridge_wheel_path: str,
) -> dict[str, Any]:
latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True)
version = self._product_info.version()
Expand Down Expand Up @@ -95,22 +94,21 @@ def _recon_job_settings(
],
"tasks": [
self._job_recon_task(
Task(
task_key=task_key,
description=description,
job_cluster_key="Remorph_Reconciliation_Cluster",
),
task_key,
description,
recon_config,
remorph_wheel_path,
lakebridge_wheel_path,
),
],
"max_concurrent_runs": 2,
"parameters": [JobParameterDefinition(name="operation_name", default="reconcile")],
}

def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig, remorph_wheel_path: str) -> Task:
def _job_recon_task(
self, task_key: str, description: str, recon_config: ReconcileConfig, lakebridge_wheel_path: str
) -> Task:
libraries = [
compute.Library(whl=remorph_wheel_path),
compute.Library(whl=lakebridge_wheel_path),
]

if recon_config.data_source == ReconSourceType.ORACLE.value:
Expand All @@ -122,18 +120,21 @@ def _job_recon_task(self, jobs_task: Task, recon_config: ReconcileConfig, remorp
),
)

return dataclasses.replace(
jobs_task,
return Task(
task_key=task_key,
description=description,
job_cluster_key="Remorph_Reconciliation_Cluster",
libraries=libraries,
python_wheel_task=PythonWheelTask(
package_name="databricks_labs_remorph",
package_name=self.parse_package_name(lakebridge_wheel_path),
entry_point="reconcile",
parameters=["{{job.parameters.[operation_name]}}"],
),
)

# TODO: DRY: delete as it is already implemented in install.py
def _is_testing(self):
return self._product_info.product_name() != "remorph"
return self._product_info.product_name() != "lakebridge"

@staticmethod
def _get_test_purge_time() -> str:
Expand All @@ -145,3 +146,18 @@ def _get_default_node_type_id(self) -> str:
def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
return f"{prefix.upper()}_{name}".replace(" ", "_")

def parse_package_name(self, wheel_path: str) -> str:
default_name = "databricks_labs_lakebridge"

try:
name = wheel_path.split("/")[-1].split("-")[0]
except IndexError:
logger.warning(f"Cannot parse package name from wheel path {wheel_path}, using default.")
name = default_name

if self._product_info.product_name() not in name:
logger.warning(f"Parsed package name {name} does not match product name, using default.")
name = default_name

return name
10 changes: 4 additions & 6 deletions src/databricks/labs/lakebridge/deployment/recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@ def __init__(
self._job_deployer = job_deployer
self._dashboard_deployer = dashboard_deployer

def install(self, recon_config: ReconcileConfig | None, wheel_paths: list[str]):
def install(self, recon_config: ReconcileConfig | None, wheel_path: str):
if not recon_config:
logger.warning("Recon Config is empty.")
return
logger.info("Installing reconcile components.")
self._deploy_tables(recon_config)
self._deploy_dashboards(recon_config)
# TODO INVESTIGATE: Why is this needed?
remorph_wheel_path = [whl for whl in wheel_paths if "lakebridge" in whl][0]
self._deploy_jobs(recon_config, remorph_wheel_path)
self._deploy_jobs(recon_config, wheel_path)
self._install_state.save()
logger.info("Installation of reconcile components completed successfully.")

Expand Down Expand Up @@ -108,9 +106,9 @@ def _remove_dashboards(self):
logger.warning(f"Dashboard with id={dashboard_id} doesn't exist anymore for some reason.")
continue

def _deploy_jobs(self, recon_config: ReconcileConfig, remorph_wheel_path: str):
def _deploy_jobs(self, recon_config: ReconcileConfig, lakebridge_wheel_path: str):
logger.info("Deploying reconciliation jobs.")
self._job_deployer.deploy_recon_job(RECON_JOB_NAME, recon_config, remorph_wheel_path)
self._job_deployer.deploy_recon_job(RECON_JOB_NAME, recon_config, lakebridge_wheel_path)
for job_name, job_id in self._get_deprecated_jobs():
try:
logger.info(f"Removing job_id={job_id}, as it is no longer needed.")
Expand Down
6 changes: 3 additions & 3 deletions src/databricks/labs/lakebridge/reconcile/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

logger = logging.getLogger(__name__)

_RECON_README_URL = "https://github.com/databrickslabs/lakebridge/blob/main/docs/recon_configurations/README.md"
_RECON_DOCS_URL = "https://databrickslabs.github.io/lakebridge/docs/reconcile/"


class ReconcileRunner:
Expand Down Expand Up @@ -76,15 +76,15 @@ def _verify_recon_table_config(self, recon_config):
"Cannot find recon table configuration in existing `reconcile` installation. "
f"Please provide the configuration file {filename} in the workspace."
)
logger.error(f"{err_msg}. For more details, please refer to {_RECON_README_URL}")
logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}")
raise SystemExit(err_msg) from e
except (PermissionDenied, SerdeError, ValueError, AttributeError) as e:
install_dir = self._installation.install_folder()
err_msg = (
f"Cannot load corrupted recon table configuration from {install_dir}/{filename}. "
f"Please validate the file."
)
logger.error(f"{err_msg}. For more details, please refer to {_RECON_README_URL}")
logger.error(f"{err_msg}. For more details, please refer to the docs {_RECON_DOCS_URL}")
raise SystemExit(err_msg) from e

def _get_recon_job_id(self, reconcile_config: ReconcileConfig) -> int:
Expand Down
27 changes: 24 additions & 3 deletions tests/unit/deployment/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_deploy_new_job(oracle_recon_config):
product_info = ProductInfo.from_class(LakebridgeConfiguration)
name = "Recon Job"
job_deployer = JobDeployment(workspace_client, installation, install_state, product_info)
job_deployer.deploy_recon_job(name, oracle_recon_config, "remorph-x.y.z-py3-none-any.whl")
job_deployer.deploy_recon_job(name, oracle_recon_config, "lakebridge-x.y.z-py3-none-any.whl")
workspace_client.jobs.create.assert_called_once()
assert install_state.jobs[name] == str(job.job_id)

Expand All @@ -80,7 +80,7 @@ def test_deploy_existing_job(snowflake_recon_config):
install_state = InstallState.from_installation(installation)
product_info = ProductInfo.for_testing(LakebridgeConfiguration)
job_deployer = JobDeployment(workspace_client, installation, install_state, product_info)
job_deployer.deploy_recon_job(name, snowflake_recon_config, "remorph-x.y.z-py3-none-any.whl")
job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl")
workspace_client.jobs.reset.assert_called_once()
assert install_state.jobs[name] == str(job.job_id)

Expand All @@ -96,6 +96,27 @@ def test_deploy_missing_job(snowflake_recon_config):
install_state = InstallState.from_installation(installation)
product_info = ProductInfo.for_testing(LakebridgeConfiguration)
job_deployer = JobDeployment(workspace_client, installation, install_state, product_info)
job_deployer.deploy_recon_job(name, snowflake_recon_config, "remorph-x.y.z-py3-none-any.whl")
job_deployer.deploy_recon_job(name, snowflake_recon_config, "lakebridge-x.y.z-py3-none-any.whl")
workspace_client.jobs.create.assert_called_once()
assert install_state.jobs[name] == str(job.job_id)


def test_parse_package_name():
workspace_client = create_autospec(WorkspaceClient)
installation = MockInstallation(is_global=False)
install_state = InstallState.from_installation(installation)
product_info = ProductInfo.from_class(LakebridgeConfiguration)
job_deployer = JobDeployment(workspace_client, installation, install_state, product_info)

assert job_deployer.parse_package_name("lakebridge-1.2.3-py3-none-any.whl") == "lakebridge"
assert job_deployer.parse_package_name("remorph-1.2.3-py3-none-any.whl") == "databricks_labs_lakebridge"
assert (
job_deployer.parse_package_name("databricks_labs_lakebridge-1.2.3-py3-none-any.whl")
== "databricks_labs_lakebridge"
)
assert (
job_deployer.parse_package_name(
"/Workspace/Users/[email protected]/.lakebridge/wheels/databricks_labs_lakebridge-0.10.7-py3-none-any.whl"
)
== "databricks_labs_lakebridge"
)
2 changes: 1 addition & 1 deletion tests/unit/deployment/test_recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_install_missing_config(ws):
dashboard_deployer,
)
remorph_config = None
recon_deployer.install(remorph_config, ["remorph-x.y.z-py3-none-any.whl"])
recon_deployer.install(remorph_config, ["lakebridge-x.y.z-py3-none-any.whl"])
table_deployer.deploy_table_from_ddl_file.assert_not_called()
job_deployer.deploy_recon_job.assert_not_called()
dashboard_deployer.deploy.assert_not_called()
Expand Down
Loading