Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ commands:
description: Aggregates Reconcile is an utility to streamline the reconciliation process, specific aggregate metric is compared between source and target data residing on Databricks.
- name: configure-database-profiler
description: "Configure Database Profiler"
- name: create-profiler-dashboard
description: "Upload the Profiler Results as a Databricks Dashboard."
flags:
- name: extract-file
description: (Optional) Path Location of the Profiler Extract File
- name: source-tech
description: (Optional) Name of the Source System Technology that was Profiled
- name: install-transpile
description: "Install & Configure Necessary Transpiler Dependencies"
flags:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ classifiers = [
]

dependencies = [
"databricks-sdk~=0.51.0",
"databricks-sdk~=0.67.0",
"standard-distutils~=3.11.9; python_version>='3.11'",
"databricks-bb-analyzer~=0.1.9",
"sqlglot==26.1.3",
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import json

import logging
from pathlib import Path

from databricks.sdk.service.dashboards import Dashboard
from databricks.sdk.service.iam import User
from databricks.sdk import WorkspaceClient

from databricks.labs.blueprint.wheels import find_project_root

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a Comment to say why we are deviating from LSQL deployment, so other reviewers don't block this PR.

class DashboardTemplateLoader:
"""
Class for loading the JSON representation of a Databricks dashboard
according to the source system.
"""

def __init__(self, templates_dir: Path | None):
self.templates_dir = templates_dir

def load(self, source_system: str) -> dict:
"""
Loads a profiler summary dashboard.
:param source_system: - the name of the source data warehouse
"""
if self.templates_dir is None:
raise ValueError("Dashboard template path cannot be empty.")

filename = f"{source_system.lower()}_dashboard.lvdash.json"
filepath = os.path.join(self.templates_dir, filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f"Could not find dashboard template matching '{source_system}'.")
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)


class DashboardManager:
"""
Class for managing the lifecycle of a profiler dashboard summary, a.k.a. "local dashboards"
"""

DASHBOARD_NAME = "Lakebridge Profiler Assessment"

def __init__(self, ws: WorkspaceClient, current_user: User, is_debug: bool = False):
self._ws = ws
self._current_user = current_user
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need this attribute, you can retrive form workspace client

self._dashboard_location = f"/Workspace/Users/{self._current_user}/Lakebridge/Dashboards"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rely on the blueprint to save the file workspace. It should be inside the .lakebridge folder

self._is_debug = is_debug

def create_profiler_summary_dashboard(
self,
extract_file: str | None,
source_tech: str | None,
catalog_name: str = "lakebridge_profiler",
schema_name: str = "synapse_runs",
) -> None:
logger.info("Deploying profiler summary dashboard.")

# Load the AI/BI Dashboard template for the source system
template_folder = (
find_project_root(__file__)
/ f"src/databricks/labs/lakebridge/resources/assessments/dashboards/{source_tech}"
)
dashboard_loader = DashboardTemplateLoader(template_folder)
dashboard_json = dashboard_loader.load(source_system="synapse")
dashboard_str = json.dumps(dashboard_json)

dashboard_str = dashboard_str.replace("`PROFILER_CATALOG`", f"`{catalog_name}`")
dashboard_str = dashboard_str.replace("`PROFILER_SCHEMA`", f"`{schema_name}`")

# TODO: check if the dashboard exists and unpublish it if it does
# TODO: create a warehouse ID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be taken care during deployment this part assumes that necessary infra is setup already

dashboard_ws_location = f"/Workspace/Users/{self._current_user}/Lakebridge/Dashboards/"
dashboard = Dashboard(
display_name=self.DASHBOARD_NAME,
parent_path=dashboard_ws_location,
warehouse_id=None,
serialized_dashboard=dashboard_str,
)
self._ws.lakeview.create(dashboard=dashboard)
13 changes: 13 additions & 0 deletions src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,19 @@ def configure_database_profiler() -> None:
assessment.run()


@lakebridge.command(is_unauthenticated=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@lakebridge.command(is_unauthenticated=False)
@lakebridge.command()

def create_profiler_dashboard(
*,
w: WorkspaceClient,
extract_file: str | None = None,
source_tech: str | None = None,
) -> None:
"""Uploads profiler output summary as a Databricks dashboard."""
with_user_agent_extra("cmd", "create-profiler-dashboard")
ctx = ApplicationContext(w)
ctx.dashboard_manager.create_profiler_summary_dashboard(extract_file, source_tech)


@lakebridge.command
def install_transpile(
*,
Expand Down
6 changes: 6 additions & 0 deletions src/databricks/labs/lakebridge/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from databricks.sdk.service.iam import User

from databricks.labs.lakebridge.analyzer.lakebridge_analyzer import LakebridgeAnalyzer
from databricks.labs.lakebridge.assessments.dashboards.dashboard_manager import DashboardManager
from databricks.labs.lakebridge.config import TranspileConfig, ReconcileConfig, LakebridgeConfiguration
from databricks.labs.lakebridge.deployment.configurator import ResourceConfigurator
from databricks.labs.lakebridge.deployment.dashboard import DashboardDeployment
Expand Down Expand Up @@ -107,6 +108,11 @@ def job_deployment(self) -> JobDeployment:
def dashboard_deployment(self) -> DashboardDeployment:
return DashboardDeployment(self.workspace_client, self.installation, self.install_state)

@cached_property
def dashboard_manager(self) -> DashboardManager:
is_debug = logger.getEffectiveLevel() == logging.DEBUG
return DashboardManager(self.workspace_client, self.current_user, is_debug)

@cached_property
def recon_deployment(self) -> ReconDeployment:
return ReconDeployment(
Expand Down
88 changes: 87 additions & 1 deletion src/databricks/labs/lakebridge/deployment/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.service import compute
from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings, JobParameterDefinition
from databricks.sdk.service.jobs import (
Task,
PythonWheelTask,
JobCluster,
JobSettings,
JobParameterDefinition,
NotebookTask,
Source,
)

from databricks.labs.lakebridge.config import ReconcileConfig
from databricks.labs.lakebridge.reconcile.constants import ReconSourceType
Expand Down Expand Up @@ -145,3 +153,81 @@ def _get_default_node_type_id(self) -> str:
def _name_with_prefix(self, name: str) -> str:
prefix = self._installation.product()
return f"{prefix.upper()}_{name}".replace(" ", "_")

def deploy_profiler_ingestion_job(
self, name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
):
logger.info("Deploying profiler ingestion job.")
job_id = self._update_or_create_profiler_ingestion_job(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)
logger.info(f"Profiler ingestion job deployed with job_id={job_id}")
logger.info(f"Job URL: {self._ws.config.host}#job/{job_id}")
self._install_state.save()

def _update_or_create_profiler_ingestion_job(
self, name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
) -> str:
job_settings = self._profiler_ingestion_job_settings(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)
if name in self._install_state.jobs:
try:
job_id = int(self._install_state.jobs[name])
logger.info(f"Updating configuration for job `{name}`, job_id={job_id}")
self._ws.jobs.reset(job_id, JobSettings(**job_settings))
return str(job_id)
except InvalidParameterValue:
del self._install_state.jobs[name]
logger.warning(f"Job `{name}` does not exist anymore for some reason")
return self._update_or_create_profiler_ingestion_job(
name, source_tech, databricks_user, volume_upload_location, target_catalog
)

logger.info(f"Creating new job configuration for job `{name}`")
new_job = self._ws.jobs.create(**job_settings)
assert new_job.job_id is not None
self._install_state.jobs[name] = str(new_job.job_id)
return str(new_job.job_id)

def _profiler_ingestion_job_settings(
self, job_name: str, source_tech: str, databricks_user: str, volume_upload_location: str, target_catalog: str
) -> dict[str, Any]:
latest_lts_spark = self._ws.clusters.select_spark_version(latest=True, long_term_support=True)
version = self._product_info.version()
version = version if not self._ws.config.is_gcp else version.replace("+", "-")
tags = {"version": f"v{version}"}
if self._is_testing():
# Add RemoveAfter tag for test job cleanup
date_to_remove = self._get_test_purge_time()
tags.update({"RemoveAfter": date_to_remove})

return {
"name": self._name_with_prefix(job_name),
"tags": tags,
"job_clusters": [
JobCluster(
job_cluster_key="Lakebridge_Profiler_Ingestion_Cluster",
new_cluster=compute.ClusterSpec(
data_security_mode=compute.DataSecurityMode.USER_ISOLATION,
spark_conf={},
node_type_id=self._get_default_node_type_id(),
autoscale=compute.AutoScale(min_workers=2, max_workers=3),
spark_version=latest_lts_spark,
),
)
],
"tasks": [
NotebookTask(
notebook_path=f"/Workspace/{databricks_user}/Lakebridge/profiler/load_extracted_tables.py",
base_parameters={
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 ways we can implement this, have the ingestion job as python package and use a wheel task
Or have the notebook upload and then run the jobs.
I prefer option 1.

"extract_location": volume_upload_location,
"profiler_type": source_tech,
"target_catalog": target_catalog,
},
source=Source("WORKSPACE"),
),
],
"max_concurrent_runs": 2,
"parameters": [JobParameterDefinition(name="operation_name", default="reconcile")],
}
2 changes: 1 addition & 1 deletion src/databricks/labs/lakebridge/helpers/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def has_privileges(

@functools.lru_cache(maxsize=1024)
def _get_user_privileges(self, user: str, securable_type: SecurableType, full_name: str) -> set[Privilege]:
permissions = self._ws.grants.get_effective(securable_type, full_name, principal=user)
permissions = self._ws.grants.get_effective(str(securable_type), full_name, principal=user)
if not permissions or not permissions.privilege_assignments:
return set()
return {
Expand Down
Loading