-
Notifications
You must be signed in to change notification settings - Fork 80
Introduced Profiler Skeleton #2021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9e56b8e
a230739
be9b358
dcf8ee9
aeb2e00
18ff36c
26fc1ed
eee4c24
ffc33aa
ac77f81
afd8272
75c5b5b
8a631ed
26fd78a
11e38d3
e1e23f2
baef44e
9111c31
6d0de29
a604e0b
ec2c1e4
599cbf5
baf0eb3
da0c85c
e8f59a4
b8a6c70
ae841e7
354a461
d56958c
9925b44
d305754
eac40a3
c79c01a
2195df6
ccba2df
9a9e909
635b84a
c23cb0e
53b3fa9
be777f6
4763885
6f8997c
4992e6b
1a5288f
9df6d0b
9e55fe8
393dbe9
11da6d9
3dc6f88
6085fca
cb38bf5
13417bf
53429c4
b660e4a
3dd7455
5dbb664
b262013
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import logging | ||
| from pathlib import Path | ||
|
|
||
| from databricks.labs.lakebridge.assessments.pipeline import PipelineClass | ||
| from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig | ||
| from databricks.labs.lakebridge.connections.database_manager import DatabaseManager | ||
| from databricks.labs.lakebridge.connections.credential_manager import ( | ||
| create_credential_manager, | ||
| ) | ||
| from databricks.labs.lakebridge.connections.env_getter import EnvGetter | ||
| from databricks.labs.lakebridge.assessments import ( | ||
| PRODUCT_NAME, | ||
| PRODUCT_PATH_PREFIX, | ||
| PLATFORM_TO_SOURCE_TECHNOLOGY_CFG, | ||
| CONNECTOR_REQUIRED, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Profiler: | ||
|
|
||
| def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None): | ||
| self._platform = platform | ||
| self._pipeline_config = pipeline_configs | ||
|
|
||
| @classmethod | ||
| def create(cls, platform: str) -> "Profiler": | ||
| pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.get(platform, None) | ||
| pipeline_config = None | ||
| if pipeline_config_path: | ||
| pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path) | ||
| pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path) | ||
| return cls(platform, pipeline_config) | ||
|
|
||
| @classmethod | ||
| def supported_platforms(cls) -> list[str]: | ||
| return list(PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.keys()) | ||
|
|
||
| @staticmethod | ||
| def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig: | ||
| # TODO: Make this work install during developer mode | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this TODO mean
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| config = PipelineClass.load_config_from_yaml(config_file) | ||
| for step in config.steps: | ||
| step.extract_source = f"{path_prefix}/{step.extract_source}" | ||
| return config | ||
|
|
||
| def profile( | ||
| self, | ||
| *, | ||
| extractor: DatabaseManager | None = None, | ||
| pipeline_config: PipelineConfig | None = None, | ||
| ) -> None: | ||
| platform = self._platform.lower() | ||
| if not pipeline_config: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function seems to only validate that |
||
| if not self._pipeline_config: | ||
| raise ValueError(f"Cannot Proceed without a valid pipeline configuration for {platform}") | ||
| pipeline_config = self._pipeline_config | ||
| self._execute(platform, pipeline_config, extractor) | ||
|
|
||
| @staticmethod | ||
| def _setup_extractor(platform: str) -> DatabaseManager | None: | ||
| if not CONNECTOR_REQUIRED[platform]: | ||
| return None | ||
| cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter()) | ||
| connect_config = cred_manager.get_credentials(platform) | ||
| return DatabaseManager(platform, connect_config) | ||
|
|
||
| def _execute(self, platform: str, pipeline_config: PipelineConfig, extractor=None) -> None: | ||
| try: | ||
| if extractor is None: | ||
| extractor = Profiler._setup_extractor(platform) | ||
|
|
||
| result = PipelineClass(pipeline_config, extractor).execute() | ||
| logger.info(f"Profile execution has completed successfully for {platform} for more info check: {result}.") | ||
| except FileNotFoundError as e: | ||
| logger.error(f"Configuration file not found for source {platform}: {e}") | ||
| raise FileNotFoundError(f"Configuration file not found for source {platform}: {e}") from e | ||
| except Exception as e: | ||
| logger.error(f"Error executing pipeline for source {platform}: {e}") | ||
| raise RuntimeError(f"Pipeline execution failed for source {platform} : {e}") from e | ||
|
|
||
| @staticmethod | ||
| def _locate_config(config_path: str | Path) -> Path: | ||
| config_file = PRODUCT_PATH_PREFIX / config_path | ||
| if not config_file.exists(): | ||
| raise FileNotFoundError(f"Configuration file not found: {config_file}") | ||
| return config_file | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| from databricks.labs.lakebridge.connections.database_manager import DatabaseManager | ||
|
|
||
|
|
||
| def get_sqlpool_reader( | ||
| input_cred: dict, | ||
| db_name: str, | ||
| *, | ||
| endpoint_key: str = 'dedicated_sql_endpoint', | ||
| auth_type: str = 'sql_authentication', | ||
| ) -> DatabaseManager: | ||
| config = { | ||
| "driver": input_cred['driver'], | ||
| "server": input_cred[endpoint_key], | ||
| "database": db_name, | ||
| "user": input_cred['sql_user'], | ||
| "password": input_cred['sql_password'], | ||
| "port": input_cred.get('port', 1433), | ||
| "auth_type": auth_type, | ||
| } | ||
| # synapse and mssql use the same connector | ||
| source = "mssql" | ||
|
|
||
| return DatabaseManager(source, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1