From 5e4ff3d297949df0cea85f42c5d3816f1e012bc7 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 17 Nov 2025 10:24:02 -0500 Subject: [PATCH 01/12] Add initial MSSQL profiler files --- .../labs/lakebridge/assessments/_constants.py | 3 +- .../resources/assessments/mssql/__init__.py | 0 .../assessments/mssql/activity_extract.py | 38 ++++ .../assessments/mssql/common/__init__.py | 0 .../assessments/mssql/common/functions.py | 9 + .../assessments/mssql/common/queries.py | 180 ++++++++++++++++++ .../assessments/mssql/pipeline_config.yml | 19 ++ 7 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/__init__.py create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/common/__init__.py create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml diff --git a/src/databricks/labs/lakebridge/assessments/_constants.py b/src/databricks/labs/lakebridge/assessments/_constants.py index 9a793b19d1..e1299ccbae 100644 --- a/src/databricks/labs/lakebridge/assessments/_constants.py +++ b/src/databricks/labs/lakebridge/assessments/_constants.py @@ -5,10 +5,11 @@ PLATFORM_TO_SOURCE_TECHNOLOGY_CFG = { "synapse": "src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml", + "mssql": "src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml", } # TODO modify this PLATFORM_TO_SOURCE_TECHNOLOGY.keys() once all platforms are supported -PROFILER_SOURCE_SYSTEM = ["synapse"] +PROFILER_SOURCE_SYSTEM = ["synapse", "mssql"] # This flag indicates whether a connector is required for the source system when pipeline is trigger diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/__init__.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py new file mode 100644 index 0000000000..b7e0f9c4e1 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -0,0 +1,38 @@ +import json +import sys + +from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager +from databricks.labs.lakebridge.assessments import PRODUCT_NAME +from databricks.labs.lakebridge.resources.assessments.mssql.common.functions import create_msql_sql_client +from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger + + +def execute(): + logger = set_logger(__file__) + + db_path, creds_file = arguments_loader(desc="MSSQL Server Activity Extract Script") + + cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) + mssql_settings = cred_manager.get_credentials("mssql") + mssql_profiler_settings = mssql_settings["profiler"] + + mssql_client = create_msql_sql_client(mssql_profiler_settings) + + try: + # list all the SQL servers in the subscription + sql_servers = mssql_client.servers.list() + for sql_server in sql_servers: + + # Extract activity metrics + logger.info(f"Extracting activity metrics for: {sql_server}") + + print(json.dumps({"status": "success", "message": " All data loaded successfully loaded successfully"})) + + except Exception as e: + logger.error(f"Failed to extract activity info for Azure SQL server: {str(e)}") + print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + execute() diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/__init__.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py new file mode 100644 index 0000000000..33e63e446e --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/functions.py @@ -0,0 +1,9 @@ +from azure.identity import DefaultAzureCredential +from azure.mgmt.sql import SqlManagementClient + + +def create_msql_sql_client(config: dict) -> SqlManagementClient: + """ + Creates an Azure SQL management client for the provided subscription using the default Azure credential. + """ + return SqlManagementClient(credential=DefaultAzureCredential(), subscription_id=config["subscription_id"]) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py new file mode 100644 index 0000000000..f9f93ed5d3 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py @@ -0,0 +1,180 @@ +class MSSQLQueries: + + @staticmethod + def get_query_stats(last_execution_time: str | None) -> str: + """ + Retrieves and classifies recently executed SQL statements from `sys.dm_exec_query_stats`. + Includes execution metrics (count, duration, CPU time, rows) and categorizes each statement as QUERY, DML, DDL, + ROUTINE, TRANSACTION_CONTROL, or OTHER based on its command type. + """ + predicate = ( + f"WHERE qs.last_execution_time > CAST('{last_execution_time}' AS DATETIME2(6))" + if last_execution_time + else "" + ) + + return f""" + with query_stats as ( + SELECT + CONVERT(VARCHAR(64), HASHBYTES('SHA2_256', qs.sql_handle), 1) as sql_handle, + st.dbid, + qs.creation_time, + qs.last_execution_time, + qs.execution_count, + qs.total_worker_time, + qs.total_elapsed_time, + qs.total_rows, + SUBSTRING(st.text, (qs.statement_start_offset/2) + 1, + ((CASE statement_end_offset + WHEN -1 THEN DATALENGTH(st.text) + ELSE qs.statement_end_offset END + - qs.statement_start_offset)/2) + 1) AS statement_text + FROM sys.dm_exec_query_stats AS qs + CROSS APPLY sys.dm_exec_sql_text(qs.sql_handle) AS st + {predicate} + ), + query_stats_ex as ( + select + dbid, + creation_time, + last_execution_time, + execution_count, + total_worker_time, + total_elapsed_time, + total_rows, + UPPER(SUBSTRING(LTRIM(RTRIM(statement_text)), 1, 40)) as command + from query_stats + ) + SELECT + *, + CASE + WHEN command like 'SELECT%' THEN 'QUERY' + WHEN command like 'WITH%' THEN 'QUERY' + WHEN command like 'INSERT%' THEN 'DML' + WHEN command like 'UPDATE%' THEN 'DML' + WHEN command like 'MERGE%' THEN 'DML' + WHEN command like 'DELETE%' THEN 'DML' + WHEN command like 'TRUNCATE%' THEN 'DML' + WHEN command like 'COPY%' THEN 'DML' + WHEN command like 'IF%' THEN 'DML' + WHEN command like 'BEGIN%' THEN 'DML' + WHEN command like 'DECLARE%' THEN 'DML' + WHEN command like 'BUILDREPLICATEDTABLECACHE%' THEN 'DML' + WHEN command like 'CREATE%' THEN 'DDL' + WHEN command like 'DROP%' THEN 'DDL' + WHEN command like 'ALTER%' THEN 'DDL' + WHEN command like 'EXEC%' THEN 'ROUTINE' + WHEN command like 'EXECUTE %' THEN 'ROUTINE' + WHEN command like 'BEGIN%TRAN%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'END%TRAN%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'COMMIT%' THEN 'TRANSACTION_CONTROL' + WHEN command like 'ROLLBACK%' THEN 'TRANSACTION_CONTROL' + ELSE 'OTHER' + END as command_type, + SYSDATETIME() as extract_ts + FROM query_stats_ex + ORDER BY last_execution_time + """ + + @staticmethod + def get_procedure_stats(last_execution_time: str | None): + """ + Retrieves execution statistics for stored procedures from `sys.dm_exec_procedure_stats`, + including execution counts, total CPU and elapsed time, last execution timestamp, + and maps object and database IDs to their names. Results are ordered by most recent execution. + """ + predicate = ( + f"WHERE last_execution_time > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + ) + return f""" + SELECT + database_id, + DB_NAME(database_id) AS db_name, + object_id, + OBJECT_NAME(object_id, database_id) AS object_name, + type, + last_execution_time, + execution_count, + total_worker_time, + total_elapsed_time, + SYSDATETIME() as extract_ts + FROM + sys.dm_exec_procedure_stats + {predicate} + ORDER BY + last_execution_time + """ + + @staticmethod + def get_sessions(last_execution_time: str | None): + """ + Retrieves active user session details from `sys.dm_exec_sessions`, including login info, + program and client names, CPU and memory usage, request timing, row counts, and database context. + Excludes system sessions and orders results by the end time of the last request. + """ + predicate = ( + f"AND last_request_end_time > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + ) + return f""" + SELECT + session_id, + login_time, + program_name, + client_interface_name, + CONVERT(VARCHAR(64), HASHBYTES('SHA2_256', login_name), 1) as login_name, + status, + cpu_time, + memory_usage, + total_scheduled_time, + total_elapsed_time, + last_request_start_time, + last_request_end_time, + is_user_process, + row_count, + database_id, + DB_NAME(database_id) AS db_name, + SYSDATETIME() as extract_ts + FROM + sys.dm_exec_sessions + WHERE + is_user_process <> 0 {predicate} + ORDER BY + last_request_end_time + """ + + @staticmethod + def get_cpu_utilization(last_execution_time: str | None): + """ + Extracts SQL Server CPU and system utilization metrics from `sys.dm_os_ring_buffers`, + including system idle and SQL process utilization over time. + """ + predicate = f"WHERE EventTime > CAST('{last_execution_time}' AS DATETIME2(6))" if last_execution_time else "" + return f""" + WITH process_utilization_info + AS (SELECT record.value('(./Record/@id)[1]', 'int') + AS record_id, + [timestamp], + record.value('(./Record/SchedulerMonitorEvent/SystemHealth/SystemIdle)[1]', 'int') AS SystemIdle, + record.value('(./Record/SchedulerMonitorEvent/SystemHealth/ProcessUtilization)[1]', 'int') AS SQLProcessUtilization + FROM (SELECT [timestamp], + CONVERT(XML, record) AS record + FROM sys.dm_os_ring_buffers + WHERE ring_buffer_type = N'RING_BUFFER_SCHEDULER_MONITOR' + AND record LIKE '%%') X), + os_sysinfo + AS (SELECT TOP 1 ms_ticks + FROM sys.dm_os_sys_info), + cpu_utilization + AS (SELECT record_id, + Dateadd (ms, ( [timestamp] - ms_ticks ), Getdate()) AS EventTime + , + systemidle, + sqlprocessutilization + FROM process_utilization_info + CROSS JOIN os_sysinfo) + SELECT *, + Sysdatetime() AS extract_ts + FROM cpu_utilization + {predicate} + ORDER BY eventtime + """ diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml new file mode 100644 index 0000000000..59f26cd879 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -0,0 +1,19 @@ +name: mssql_assessment +version: "1.0" +extract_folder: "/tmp/data/mssql_assessment" +steps: + - name: activity_extract + type: python + extract_source: src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py + mode: overwrite + frequency: once + flag: active + dependencies: + - pandas + - duckdb + - databricks-sdk + - databricks-labs-blueprint[yaml]>=0.10.1 + - sqlalchemy + - azure-mgmt-sql>=0.5.0 + - azure-identity>=1.25.1 + - pyodbc From bd40c8ad1694d0044ca1ee09197482a4dfcdb771 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 17 Nov 2025 11:17:13 -0500 Subject: [PATCH 02/12] Add activity extract tables. --- .../assessments/mssql/activity_extract.py | 54 ++++++++++++++++--- .../assessments/mssql/common/connector.py | 22 ++++++++ 2 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index b7e0f9c4e1..ed3877c76c 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -3,7 +3,10 @@ from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager from databricks.labs.lakebridge.assessments import PRODUCT_NAME +from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader from databricks.labs.lakebridge.resources.assessments.mssql.common.functions import create_msql_sql_client +from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries +from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger @@ -11,25 +14,64 @@ def execute(): logger = set_logger(__file__) db_path, creds_file = arguments_loader(desc="MSSQL Server Activity Extract Script") - cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") + config = mssql_settings["workspace"] + auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") mssql_profiler_settings = mssql_settings["profiler"] - mssql_client = create_msql_sql_client(mssql_profiler_settings) try: # list all the SQL servers in the subscription sql_servers = mssql_client.servers.list() - for sql_server in sql_servers: + for idx, sql_server in enumerate(sql_servers): + + # TODO: get the last time the profiler was executed + # For now, we'll default to None, but this will eventually need + # input from a scheduler component. + last_execution_time = None + mode = "overwrite" if idx == 0 else "append" # Extract activity metrics - logger.info(f"Extracting activity metrics for: {sql_server}") + server_name = sql_server.name + logger.info(f"Extracting activity metrics for: {server_name}") + fully_qualified_domain = sql_server.fully_qualified_domain_name + connection = get_sqlserver_reader( + config, db_name="master", fully_qualified_domain_name=fully_qualified_domain, auth_type=auth_type + ) + + # Query stats + table_name = "query_stats" + table_query = MSSQLQueries.get_query_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Stored procedure stats + table_name = "proc_stats" + table_query = MSSQLQueries.get_procedure_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Session info + table_name = "sessions" + table_query = MSSQLQueries.get_sessions(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # CPU Utilization + table_name = "cpu_utilization" + table_query = MSSQLQueries.get_cpu_utilization() + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) - print(json.dumps({"status": "success", "message": " All data loaded successfully loaded successfully"})) + print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) except Exception as e: - logger.error(f"Failed to extract activity info for Azure SQL server: {str(e)}") + logger.error(f"Failed to extract activity info for SQL server: {str(e)}") print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr) sys.exit(1) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py new file mode 100644 index 0000000000..6f68c929bb --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py @@ -0,0 +1,22 @@ +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager + + +def get_sqlserver_reader( + input_cred: dict, + db_name: str, + *, + fully_qualified_domain_name: str, + auth_type: str = 'sql_authentication', +) -> DatabaseManager: + config = { + "driver": input_cred['driver'], + "server": fully_qualified_domain_name, + "database": db_name, + "user": input_cred['sql_user'], + "password": input_cred['sql_password'], + "port": input_cred.get('port', 1433), + "auth_type": auth_type, + } + source = "mssql" + + return DatabaseManager(source, config) From 46f5e75cada1364ec60d67e2a619b636baff62a6 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 17 Nov 2025 16:57:54 -0500 Subject: [PATCH 03/12] Add info extract queries. --- .../assessments/mssql/common/queries.py | 152 +++++++++++++++++- 1 file changed, 150 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py index f9f93ed5d3..36faddc384 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py @@ -166,8 +166,7 @@ def get_cpu_utilization(last_execution_time: str | None): FROM sys.dm_os_sys_info), cpu_utilization AS (SELECT record_id, - Dateadd (ms, ( [timestamp] - ms_ticks ), Getdate()) AS EventTime - , + Dateadd (ms, ( [timestamp] - ms_ticks ), Getdate()) AS EventTime, systemidle, sqlprocessutilization FROM process_utilization_info @@ -178,3 +177,152 @@ def get_cpu_utilization(last_execution_time: str | None): {predicate} ORDER BY eventtime """ + + @staticmethod + def get_sys_info(): + """ + Retrieves system-level information from SQL Server using sys.dm_os_sys_info. + Returns details about memory, CPU, scheduler count, and other OS-related + metadata for the SQL Server instance, along with a timestamp indicating when + the data was extracted. + """ + return """ + SELECT *, + Sysdatetime() AS extract_ts + FROM sys.dm_os_sys_info + """ + + @staticmethod + def get_databases(): + """ + Retrieve metadata for all user databases, excluding system databases. + Returns each database's ID, name, collation, creation date, + and a timestamp indicating when the data was extracted. + """ + return """ + SELECT DB_ID(NAME) AS db_id, + NAME, + collation_name, + create_date, + SYSDATETIME() AS extract_ts + FROM sys.databases + WHERE NAME NOT IN ( 'master', 'tempdb', 'model', 'msdb' ); + """ + + @staticmethod + def get_tables(db_name: str): + """ + Retrieves metadata for all tables in the specified database by querying + INFORMATION_SCHEMA.TABLES. Returns table definitions along with a timestamp + indicating when the data was extracted. + """ + return f""" + SELECT + *, + SYSDATETIME() as extract_ts + FROM {db_name}.information_schema.tables + """ + + @staticmethod + def get_views(db_name: str): + """ + Retrieves metadata for all views in the specified database by querying + `INFORMATION_SCHEMA.VIEWS`. Returns view definitions along with a timestamp + indicating when the data was extracted. + """ + return f""" + SELECT + *, + SYSDATETIME() as extract_ts + FROM {db_name}.information_schema.views + """ + + @staticmethod + def get_columns(db_name: str): + """ + Retrieves column-level metadata for all tables and views in the specified + database by querying INFORMATION_SCHEMA.COLUMNS. Returns column attributes + along with a timestamp indicating when the data was extracted. + """ + return f""" + SELECT + *, + SYSDATETIME() as extract_ts + FROM {db_name}.information_schema.columns + """ + + @staticmethod + def get_indexed_views(db_name: str): + """ + Retrieves metadata for all indexed views in the specified database by joining + `sys.views` with `sys.indexes`. Returns view details for those with a clustered + index (index_id = 1) along with a timestamp indicating when the data was extracted. + """ + return f""" + SELECT A.*, SYSDATETIME() as extract_ts + FROM {db_name}.sys.views A + JOIN {db_name}.sys.indexes B ON A.object_id = B.object_id + WHERE B.index_id = 1 + """ + + @staticmethod + def get_routines(db_name: str): + """ + Retrieves metadata for all routines (stored procedures and functions) in the + specified database by querying INFORMATION_SCHEMA.ROUTINES. Returns routine + details along with a timestamp indicating when the data was extracted. + """ + return f""" + select + *, + SYSDATETIME() as extract_ts + from {db_name}.information_schema.routines""" + + @staticmethod + def get_db_sizes(db_name: str): + """ + Retrieves metadata for all data files (type = 0) in the specified database + from sys.database_files. Returns file name, type, current size, free space, + maximum size, and a timestamp indicating when the data was extracted. + """ + return f""" + SELECT + '{db_name}' AS DbName, + name AS FileName, + type_desc, + size/128.0 AS CurrentSizeMB, + size/128.0 - CAST(FILEPROPERTY(name, 'SpaceUsed') AS int)/128.0 AS FreeSpaceInMB, + max_size as MaxSize, + SYSDATETIME() as extract_ts + FROM {db_name}.sys.database_files WHERE type=0 + """ + + @staticmethod + def get_table_sizes(db_name: str): + """ + Retrieves storage and row count statistics for all user tables in the specified + database by querying sys.dm_db_partition_stats and sys.objects. Returns table + name, total rows, reserved, used, and unused space (MB), breakdown of data vs. + index space, and a timestamp indicating when the data was extracted. + """ + return f""" + SELECT + o.name AS TableName, + SUM(ps.row_count) AS [RowCount], + SUM(ps.reserved_page_count) * 8 / 1024 AS ReservedMB, + SUM(ps.used_page_count) * 8 / 1024 AS UsedMB, + (SUM(ps.reserved_page_count) - SUM(ps.used_page_count)) * 8 / 1024 AS UnusedMB, + SUM(CASE + WHEN ps.index_id < 2 THEN ps.in_row_data_page_count + ps.lob_used_page_count + ps.row_overflow_used_page_count + ELSE 0 + END) * 8 / 1024 AS DataMB, + SUM(CASE + WHEN ps.index_id >= 2 THEN ps.in_row_data_page_count + ELSE 0 + END) * 8 / 1024 AS IndexMB, + SYSDATETIME() as extract_ts + FROM {db_name}.sys.dm_db_partition_stats AS ps + JOIN {db_name}sys.objects AS o ON ps.object_id = o.object_id + WHERE o.type = 'U' + GROUP BY schema_name(o.schema_id), o.name + """ From 866bfc7c883de67c29d19ef8cb15ca1ebeda39e9 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Tue, 18 Nov 2025 08:29:38 -0500 Subject: [PATCH 04/12] Add info extraction step. --- .../assessments/mssql/activity_extract.py | 1 + .../assessments/mssql/info_extract.py | 116 ++++++++++++++++++ .../assessments/mssql/pipeline_config.yml | 15 +++ 3 files changed, 132 insertions(+) create mode 100644 src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index ed3877c76c..f2ff21d3b1 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -35,6 +35,7 @@ def execute(): # Extract activity metrics server_name = sql_server.name logger.info(f"Extracting activity metrics for: {server_name}") + print(f"Extracting activity metrics for: {server_name}") fully_qualified_domain = sql_server.fully_qualified_domain_name connection = get_sqlserver_reader( config, db_name="master", fully_qualified_domain_name=fully_qualified_domain, auth_type=auth_type diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py new file mode 100644 index 0000000000..738bcf9d10 --- /dev/null +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -0,0 +1,116 @@ +import json +import sys + +from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager +from databricks.labs.lakebridge.assessments import PRODUCT_NAME +from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader +from databricks.labs.lakebridge.resources.assessments.mssql.common.functions import create_msql_sql_client +from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries +from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db +from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger + + +def execute(): + logger = set_logger(__file__) + + db_path, creds_file = arguments_loader(desc="MSSQL Server Info Extract Script") + cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) + mssql_settings = cred_manager.get_credentials("mssql") + config = mssql_settings["workspace"] + auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") + mssql_profiler_settings = mssql_settings["profiler"] + mssql_client = create_msql_sql_client(mssql_profiler_settings) + + try: + # list all the SQL servers in the subscription + sql_servers = mssql_client.servers.list() + for idx, sql_server in enumerate(sql_servers): + + mode = "overwrite" if idx == 0 else "append" + + # Extract info metrics + server_name = sql_server.name + logger.info(f"Extracting info metrics for: {server_name}") + print(f"Extracting info metrics for: {server_name}") + fully_qualified_domain = sql_server.fully_qualified_domain_name + connection = get_sqlserver_reader( + config, db_name="master", fully_qualified_domain_name=fully_qualified_domain, auth_type=auth_type + ) + + # System info + table_name = "sys_info" + table_query = MSSQLQueries.get_sys_info() + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + print(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Databases + table_name = "databases" + table_query = MSSQLQueries.get_databases() + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + # TODO: if list of `db_names` not provided in config + # then loop through all the databases to collect the following info + result = connection.fetch(table_query) + db_name = "main" + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Tables + table_name = "tables" + table_query = MSSQLQueries.get_tables(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Views + table_name = "views" + table_query = MSSQLQueries.get_views(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Columns + table_name = "columns" + table_query = MSSQLQueries.get_columns(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Indexed views + table_name = "indexed_views" + table_query = MSSQLQueries.get_indexed_views(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Routines + table_name = "routines" + table_query = MSSQLQueries.get_routines(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Database sizes + table_name = "db_sizes" + table_query = MSSQLQueries.get_db_sizes(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + # Table sizes + table_name = "table_sizes" + table_query = MSSQLQueries.get_table_sizes(db_name) + logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + result = connection.fetch(table_query) + save_resultset_to_db(result, table_name, db_path, mode=mode) + + print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) + + except Exception as e: + logger.error(f"Failed to execute info extract for SQL server: {str(e)}") + print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + execute() diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml index 59f26cd879..a7876fe8c3 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -17,3 +17,18 @@ steps: - azure-mgmt-sql>=0.5.0 - azure-identity>=1.25.1 - pyodbc + - name: info_extract + type: python + extract_source: src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py + mode: overwrite + frequency: once + flag: active + dependencies: + - pandas + - duckdb + - databricks-sdk + - databricks-labs-blueprint[yaml]>=0.10.1 + - sqlalchemy + - azure-mgmt-sql>=0.5.0 + - azure-identity>=1.25.1 + - pyodbc From 6db273d7f1aad188f6a9f1b6060b186b79b67101 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Wed, 19 Nov 2025 17:35:12 -0500 Subject: [PATCH 05/12] Update MSSQL configuration --- .../assessments/configure_assessment.py | 15 +++++++++++---- .../assessments/mssql/pipeline_config.yml | 8 ++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index 0da1c28efa..0f0db972dd 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -87,18 +87,25 @@ def _configure_credentials(self) -> str: secret_vault_type = str(self.prompts.choice("Enter secret vault type (local | env)", ["local", "env"])).lower() secret_vault_name = None - logger.info("Please refer to the documentation to understand the difference between local and env.") - + # Profiler settings + logger.info("Please configure profiler settings:") + mssql_profiler = { + "redact_sql_pools_sql_text": self.prompts.confirm("Redact SQL pools SQL text?"), + } credential = { "secret_vault_type": secret_vault_type, "secret_vault_name": secret_vault_name, source: { - "database": self.prompts.question("Enter the database name"), - "driver": self.prompts.question("Enter the driver details"), + "db_names": self.prompts.question("Enter the database names (comma-separated)"), + "driver": self.prompts.question( + "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" + ), "server": self.prompts.question("Enter the server or host details"), "port": int(self.prompts.question("Enter the port details", valid_number=True)), "user": self.prompts.question("Enter the user details"), "password": self.prompts.password("Enter the password details"), + "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), + "profiler": mssql_profiler, }, } diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml index a7876fe8c3..0c15c246d7 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -14,8 +14,8 @@ steps: - databricks-sdk - databricks-labs-blueprint[yaml]>=0.10.1 - sqlalchemy - - azure-mgmt-sql>=0.5.0 - - azure-identity>=1.25.1 + - azure-mgmt-sql + - azure-identity - pyodbc - name: info_extract type: python @@ -29,6 +29,6 @@ steps: - databricks-sdk - databricks-labs-blueprint[yaml]>=0.10.1 - sqlalchemy - - azure-mgmt-sql>=0.5.0 - - azure-identity>=1.25.1 + - azure-mgmt-sql + - azure-identity - pyodbc From 60c38d8385acd62a7a0469db717ae7db8b08379e Mon Sep 17 00:00:00 2001 From: Will Girten Date: Tue, 25 Nov 2025 16:28:09 -0500 Subject: [PATCH 06/12] Update common DuckDB helpers with MSSQL schemas --- .../assessments/configure_assessment.py | 8 +- .../assessments/mssql/activity_extract.py | 81 +++++++++---------- .../assessments/mssql/common/connector.py | 4 +- .../assessments/mssql/info_extract.py | 44 +++++----- .../assessments/mssql/pipeline_config.yml | 2 + .../synapse/common/duckdb_helpers.py | 8 ++ 6 files changed, 73 insertions(+), 74 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index 0f0db972dd..32661e4da3 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -96,14 +96,14 @@ def _configure_credentials(self) -> str: "secret_vault_type": secret_vault_type, "secret_vault_name": secret_vault_name, source: { - "db_names": self.prompts.question("Enter the database names (comma-separated)"), + "db_names": self.prompts.question("Enter the database names to profile (comma-separated)"), "driver": self.prompts.question( "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" ), - "server": self.prompts.question("Enter the server or host details"), + "server": self.prompts.question("Enter the fully-qualified server name"), "port": int(self.prompts.question("Enter the port details", valid_number=True)), - "user": self.prompts.question("Enter the user details"), - "password": self.prompts.password("Enter the password details"), + "user": self.prompts.question("Enter the SQL username"), + "password": self.prompts.password("Enter the SQL password"), "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), "profiler": mssql_profiler, }, diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index f2ff21d3b1..8bb25cf6be 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -4,7 +4,6 @@ from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager from databricks.labs.lakebridge.assessments import PRODUCT_NAME from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader -from databricks.labs.lakebridge.resources.assessments.mssql.common.functions import create_msql_sql_client from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger @@ -16,58 +15,50 @@ def execute(): db_path, creds_file = arguments_loader(desc="MSSQL Server Activity Extract Script") cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") - config = mssql_settings["workspace"] auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - mssql_profiler_settings = mssql_settings["profiler"] - mssql_client = create_msql_sql_client(mssql_profiler_settings) - + server_name = mssql_settings["jdbc"].get("server_name") try: - # list all the SQL servers in the subscription - sql_servers = mssql_client.servers.list() - for idx, sql_server in enumerate(sql_servers): - # TODO: get the last time the profiler was executed - # For now, we'll default to None, but this will eventually need - # input from a scheduler component. - last_execution_time = None - mode = "overwrite" if idx == 0 else "append" + # TODO: get the last time the profiler was executed + # For now, we'll default to None, but this will eventually need + # input from a scheduler component. + last_execution_time = None + mode = "overwrite" - # Extract activity metrics - server_name = sql_server.name - logger.info(f"Extracting activity metrics for: {server_name}") - print(f"Extracting activity metrics for: {server_name}") - fully_qualified_domain = sql_server.fully_qualified_domain_name - connection = get_sqlserver_reader( - config, db_name="master", fully_qualified_domain_name=fully_qualified_domain, auth_type=auth_type - ) + # Extract activity metrics + logger.info(f"Extracting activity metrics for: {server_name}") + print(f"Extracting activity metrics for: {server_name}") + connection = get_sqlserver_reader( + mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type + ) - # Query stats - table_name = "query_stats" - table_query = MSSQLQueries.get_query_stats(last_execution_time) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) - result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + # Query stats + table_name = "query_stats" + table_query = MSSQLQueries.get_query_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - # Stored procedure stats - table_name = "proc_stats" - table_query = MSSQLQueries.get_procedure_stats(last_execution_time) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) - result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + # Stored procedure stats + table_name = "proc_stats" + table_query = MSSQLQueries.get_procedure_stats(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - # Session info - table_name = "sessions" - table_query = MSSQLQueries.get_sessions(last_execution_time) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) - result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + # Session info + table_name = "sessions" + table_query = MSSQLQueries.get_sessions(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - # CPU Utilization - table_name = "cpu_utilization" - table_query = MSSQLQueries.get_cpu_utilization() - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) - result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + # CPU Utilization + table_name = "cpu_utilization" + table_query = MSSQLQueries.get_cpu_utilization(last_execution_time) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py index 6f68c929bb..6b9f515b01 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py @@ -5,12 +5,12 @@ def get_sqlserver_reader( input_cred: dict, db_name: str, *, - fully_qualified_domain_name: str, + server_name: str, auth_type: str = 'sql_authentication', ) -> DatabaseManager: config = { "driver": input_cred['driver'], - "server": fully_qualified_domain_name, + "server": server_name, "database": db_name, "user": input_cred['sql_user'], "password": input_cred['sql_password'], diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py index 738bcf9d10..99f3dbd68b 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -16,8 +16,8 @@ def execute(): db_path, creds_file = arguments_loader(desc="MSSQL Server Info Extract Script") cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") - config = mssql_settings["workspace"] auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") + server_name = mssql_settings["jdbc"].get("server_name") mssql_profiler_settings = mssql_settings["profiler"] mssql_client = create_msql_sql_client(mssql_profiler_settings) @@ -29,80 +29,78 @@ def execute(): mode = "overwrite" if idx == 0 else "append" # Extract info metrics - server_name = sql_server.name logger.info(f"Extracting info metrics for: {server_name}") print(f"Extracting info metrics for: {server_name}") - fully_qualified_domain = sql_server.fully_qualified_domain_name connection = get_sqlserver_reader( - config, db_name="master", fully_qualified_domain_name=fully_qualified_domain, auth_type=auth_type + mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type ) # System info table_name = "sys_info" table_query = MSSQLQueries.get_sys_info() - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) - print(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + print(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Databases table_name = "databases" table_query = MSSQLQueries.get_databases() - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") # TODO: if list of `db_names` not provided in config # then loop through all the databases to collect the following info result = connection.fetch(table_query) db_name = "main" - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Tables table_name = "tables" table_query = MSSQLQueries.get_tables(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Views table_name = "views" table_query = MSSQLQueries.get_views(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Columns table_name = "columns" table_query = MSSQLQueries.get_columns(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Indexed views table_name = "indexed_views" table_query = MSSQLQueries.get_indexed_views(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Routines table_name = "routines" table_query = MSSQLQueries.get_routines(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Database sizes table_name = "db_sizes" table_query = MSSQLQueries.get_db_sizes(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Table sizes table_name = "table_sizes" table_query = MSSQLQueries.get_table_sizes(db_name) - logger.info(f"Loading '{table_name}' for SQL server: %s", server_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) - save_resultset_to_db(result, table_name, db_path, mode=mode) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml index 0c15c246d7..8c89895657 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -16,6 +16,7 @@ steps: - sqlalchemy - azure-mgmt-sql - azure-identity + - azure-monitor-query==1.3.0b1 - pyodbc - name: info_extract type: python @@ -31,4 +32,5 @@ steps: - sqlalchemy - azure-mgmt-sql - azure-identity + - azure-monitor-query==1.3.0b1 - pyodbc diff --git a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py index 52448c2b11..6f81f71052 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py +++ b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py @@ -66,6 +66,14 @@ def save_resultset_to_db( "serverless_requests_history": "STATUS STRING, TRANSACTION_ID BIGINT, DISTRIBUTED_STATEMENT_ID STRING, QUERY_HASH STRING, LOGIN_NAME STRING, START_TIME STRING, ERROR_CODE INTEGER, REJECTED_ROWS_PATH STRING, END_TIME STRING, COMMAND STRING, QUERY_TEXT STRING, TOTAL_ELAPSED_TIME_MS BIGINT, DATA_PROCESSED_MB BIGINT, ERROR STRING", # Data processed info "serverless_data_processed": "DATA_PROCESSED_MB BIGINT, TYPE STRING, POOL_NAME STRING, EXTRACT_TS STRING", + # MSSQL activity extract + "mssql_query_stats": "DBID VARCHAR, CREATION_TIME TIMESTAMP, LAST_EXECUTION_TIME TIMESTAMP, EXECUTION_COUNT BIGINT, TOTAL_WORKER_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, TOTAL_ROWS BIGINT, COMMAND VARCHAR, COMMAND_TYPE VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_proc_stats": "DATABASE_ID BIGINT, DB_NAME VARCHAR, OBJECT_ID BIGINT, OBJECT_NAME VARCHAR, TYPE VARCHAR, LAST_EXECUTION_TIME TIMESTAMP, EXECUTION_COUNT BIGINT, TOTAL_WORKER_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, EXTRACT_TS TIMESTAMP", + "mssql_sessions": "SESSION_ID BIGINT, LOGIN_TIME TIMESTAMP, PROGRAM_NAME VARCHAR, CLIENT_INTERFACE_NAME VARCHAR, LOGIN_NAME VARCHAR, STATUS VARCHAR, CPU_TIME BIGINT, MEMORY_USAGE BIGINT, TOTAL_SCHEDULED_TIME BIGINT, TOTAL_ELAPSED_TIME BIGINT, LAST_REQUEST_START_TIME TIMESTAMP, LAST_REQUEST_END_TIME TIMESTAMP, IS_USER_PROCESS BOOLEAN, ROW_COUNT BIGINT, DATABASE_ID BIGINT, DB_NAME VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_cpu_utilization": "RECORD_ID BIGINT, EVENTTIME TIMESTAMP, SYSTEMIDLE BIGINT, SQLPROCESSUTILIZATION BIGINT, EXTRACT_TS TIMESTAMP", + # MSSQL info extract + "mssql_sys_info": "CPU_TICKS BIGINT, MS_TICKS BIGINT, CPU_COUNT BIGINT, HYPERTHREAD_RATIO BIGINT, PHYSICAL_MEMORY_KB BIGINT, VIRTUAL_MEMORY_KB BIGINT, COMMITTED_KB BIGINT, COMMITTED_TARGET_KB BIGINT, VISIBLE_TARGET_KB BIGINT, STACK_SIZE_IN_BYTES BIGINT, OS_QUANTUM BIGINT, OS_ERROR_MODE BIGINT, OS_PRIORITY_CLASS BIGINT, MAX_WORKERS_COUNT BIGINT, SCHEDULER_COUNT BIGINT, SCHEDULER_TOTAL_COUNT BIGINT, DEADLOCK_MONITOR_SERIAL_NUMBER BIGINT, SQLSERVER_START_TIME_MS_TICKS BIGINT, SQLSERVER_START_TIME TIMESTAMP, AFFINITY_TYPE BIGINT, AFFINITY_TYPE_DESC VARCHAR, PROCESS_KERNEL_TIME_MS BIGINT, PROCESS_USER_TIME_MS BIGINT, TIME_SOURCE BIGINT, TIME_SOURCE_DESC VARCHAR, VIRTUAL_MACHINE_TYPE BIGINT, VIRTUAL_MACHINE_TYPE_DESC VARCHAR, SOFTNUMA_CONFIGURATION BIGINT, SOFTNUMA_CONFIGURATION_DESC VARCHAR, PROCESS_PHYSICAL_AFFINITY VARCHAR, SQL_MEMORY_MODEL BIGINT, SQL_MEMORY_MODEL_DESC VARCHAR, SOCKET_COUNT BIGINT, CORES_PER_SOCKET BIGINT, NUMA_NODE_COUNT BIGINT, CONTAINER_TYPE BIGINT, CONTAINER_TYPE_DESC VARCHAR, EXTRACT_TS TIMESTAMP", + "mssql_databases": "DB_ID VARCHAR, NAME VARCHAR, COLLATION_NAME VARCHAR, CREATE_DATE VARCHAR, EXTRACT_TS VARCHAR", } try: columns = list(result.columns) From adbaf7add39e20e2ba35454ac85c4697a5280da3 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Tue, 25 Nov 2025 16:39:00 -0500 Subject: [PATCH 07/12] Add missing pipeline dependency --- .../lakebridge/resources/assessments/mssql/pipeline_config.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml index 8c89895657..4d69689729 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/pipeline_config.yml @@ -14,6 +14,7 @@ steps: - databricks-sdk - databricks-labs-blueprint[yaml]>=0.10.1 - sqlalchemy + - azure-synapse-artifacts - azure-mgmt-sql - azure-identity - azure-monitor-query==1.3.0b1 @@ -30,6 +31,7 @@ steps: - databricks-sdk - databricks-labs-blueprint[yaml]>=0.10.1 - sqlalchemy + - azure-synapse-artifacts - azure-mgmt-sql - azure-identity - azure-monitor-query==1.3.0b1 From 6a1c150ea71b424fd671073d54fa2410d54e288b Mon Sep 17 00:00:00 2001 From: Will Girten Date: Wed, 26 Nov 2025 13:50:49 -0500 Subject: [PATCH 08/12] Update MSSQL prompt. --- .../lakebridge/resources/assessments/mssql/activity_extract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index 8bb25cf6be..fdc5f0d05e 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -16,7 +16,7 @@ def execute(): cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - server_name = mssql_settings["jdbc"].get("server_name") + server_name = mssql_settings["jdbc"].get("server_name", "") try: # TODO: get the last time the profiler was executed From 777e9d1caf4f6f5c45f8cca626347f64bd5b7cd8 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Wed, 26 Nov 2025 14:05:39 -0500 Subject: [PATCH 09/12] Fix arg parsing. --- .../assessments/configure_assessment.py | 32 +++++++++++++------ .../assessments/mssql/activity_extract.py | 2 +- .../assessments/mssql/info_extract.py | 2 +- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index 32661e4da3..042c9f24b0 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -87,24 +87,36 @@ def _configure_credentials(self) -> str: secret_vault_type = str(self.prompts.choice("Enter secret vault type (local | env)", ["local", "env"])).lower() secret_vault_name = None + # JDBC Settings + logger.info("Please select JDBC authentication type:") + auth_type = self.prompts.choice( + "Select authentication type", ["sql_authentication", "ad_passwd_authentication", "spn_authentication"] + ) + mssql_jdbc = { + "auth_type": auth_type, + "fetch_size": self.prompts.question("Enter fetch size", default="1000"), + "login_timeout": self.prompts.question("Enter login timeout (seconds)", default="30"), + "server": self.prompts.question("Enter the fully-qualified server name"), + "port": int(self.prompts.question("Enter the port details", valid_number=True)), + "sql_user": self.prompts.question("Enter the SQL username"), + "sql_password": self.prompts.password("Enter the SQL password"), + "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), + "db_names": self.prompts.question("Enter the database names to profile (comma-separated)"), + "driver": self.prompts.question( + "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" + ), + } + # Profiler settings logger.info("Please configure profiler settings:") mssql_profiler = { - "redact_sql_pools_sql_text": self.prompts.confirm("Redact SQL pools SQL text?"), + "redact_sql_pools_sql_text": self.prompts.confirm("Redact SQL text?"), } credential = { "secret_vault_type": secret_vault_type, "secret_vault_name": secret_vault_name, source: { - "db_names": self.prompts.question("Enter the database names to profile (comma-separated)"), - "driver": self.prompts.question( - "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" - ), - "server": self.prompts.question("Enter the fully-qualified server name"), - "port": int(self.prompts.question("Enter the port details", valid_number=True)), - "user": self.prompts.question("Enter the SQL username"), - "password": self.prompts.password("Enter the SQL password"), - "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), + "jdbc": mssql_jdbc, "profiler": mssql_profiler, }, } diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index fdc5f0d05e..d29b4e57f7 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -16,7 +16,7 @@ def execute(): cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - server_name = mssql_settings["jdbc"].get("server_name", "") + server_name = mssql_settings["jdbc"].get("server", "") try: # TODO: get the last time the profiler was executed diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py index 99f3dbd68b..8efba2c5bb 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -17,7 +17,7 @@ def execute(): cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - server_name = mssql_settings["jdbc"].get("server_name") + server_name = mssql_settings["jdbc"].get("server", "") mssql_profiler_settings = mssql_settings["profiler"] mssql_client = create_msql_sql_client(mssql_profiler_settings) From 1d3923172884519f493b70ad25d5dab2effb6471 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Wed, 26 Nov 2025 14:19:27 -0500 Subject: [PATCH 10/12] Remove loop through all SQL servers in subscription. --- .../assessments/mssql/common/connector.py | 4 +- .../assessments/mssql/info_extract.py | 164 +++++++++--------- 2 files changed, 83 insertions(+), 85 deletions(-) diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py index 6b9f515b01..4b5f2eb496 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/connector.py @@ -12,8 +12,8 @@ def get_sqlserver_reader( "driver": input_cred['driver'], "server": server_name, "database": db_name, - "user": input_cred['sql_user'], - "password": input_cred['sql_password'], + "user": input_cred['user'], + "password": input_cred['password'], "port": input_cred.get('port', 1433), "auth_type": auth_type, } diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py index 8efba2c5bb..5f2189caa7 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -18,91 +18,89 @@ def execute(): mssql_settings = cred_manager.get_credentials("mssql") auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") server_name = mssql_settings["jdbc"].get("server", "") - mssql_profiler_settings = mssql_settings["profiler"] - mssql_client = create_msql_sql_client(mssql_profiler_settings) try: - # list all the SQL servers in the subscription - sql_servers = mssql_client.servers.list() - for idx, sql_server in enumerate(sql_servers): - - mode = "overwrite" if idx == 0 else "append" - - # Extract info metrics - logger.info(f"Extracting info metrics for: {server_name}") - print(f"Extracting info metrics for: {server_name}") - connection = get_sqlserver_reader( - mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type - ) - - # System info - table_name = "sys_info" - table_query = MSSQLQueries.get_sys_info() - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - print(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Databases - table_name = "databases" - table_query = MSSQLQueries.get_databases() - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - # TODO: if list of `db_names` not provided in config - # then loop through all the databases to collect the following info - result = connection.fetch(table_query) - db_name = "main" - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Tables - table_name = "tables" - table_query = MSSQLQueries.get_tables(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Views - table_name = "views" - table_query = MSSQLQueries.get_views(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Columns - table_name = "columns" - table_query = MSSQLQueries.get_columns(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Indexed views - table_name = "indexed_views" - table_query = MSSQLQueries.get_indexed_views(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Routines - table_name = "routines" - table_query = MSSQLQueries.get_routines(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Database sizes - table_name = "db_sizes" - table_query = MSSQLQueries.get_db_sizes(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - # Table sizes - table_name = "table_sizes" - table_query = MSSQLQueries.get_table_sizes(db_name) - logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - result = connection.fetch(table_query) - save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) - - print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) + # TODO: get the last time the profiler was executed + # For now, we'll default to None, but this will eventually need + # input from a scheduler component. + last_execution_time = None + mode = "overwrite" + + # Extract info metrics + logger.info(f"Extracting info metrics for: {server_name}") + print(f"Extracting info metrics for: {server_name}") + connection = get_sqlserver_reader( + mssql_settings, db_name="master", server_name=server_name, auth_type=auth_type + ) + + # System info + table_name = "sys_info" + table_query = MSSQLQueries.get_sys_info() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + print(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Databases + table_name = "databases" + table_query = MSSQLQueries.get_databases() + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + # TODO: if list of `db_names` not provided in config + # then loop through all the databases to collect the following info + result = connection.fetch(table_query) + db_name = "main" + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Tables + table_name = "tables" + table_query = MSSQLQueries.get_tables(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Views + table_name = "views" + table_query = MSSQLQueries.get_views(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Columns + table_name = "columns" + table_query = MSSQLQueries.get_columns(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Indexed views + table_name = "indexed_views" + table_query = MSSQLQueries.get_indexed_views(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Routines + table_name = "routines" + table_query = MSSQLQueries.get_routines(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Database sizes + table_name = "db_sizes" + table_query = MSSQLQueries.get_db_sizes(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + # Table sizes + table_name = "table_sizes" + table_query = MSSQLQueries.get_table_sizes(db_name) + logger.info(f"Loading '{table_name}' for SQL server: {server_name}") + result = connection.fetch(table_query) + save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) + + print(json.dumps({"status": "success", "message": "All data loaded successfully loaded successfully"})) except Exception as e: logger.error(f"Failed to execute info extract for SQL server: {str(e)}") From 60a1f0548f427167c421283162df04f79f666182 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 1 Dec 2025 12:08:50 -0500 Subject: [PATCH 11/12] Refactor info extract script. --- .../assessments/configure_assessment.py | 1 - .../assessments/mssql/common/queries.py | 139 ++++++++++++------ .../assessments/mssql/info_extract.py | 20 +-- .../synapse/common/duckdb_helpers.py | 9 +- 4 files changed, 114 insertions(+), 55 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index 042c9f24b0..cc88eebdbd 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -101,7 +101,6 @@ def _configure_credentials(self) -> str: "sql_user": self.prompts.question("Enter the SQL username"), "sql_password": self.prompts.password("Enter the SQL password"), "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), - "db_names": self.prompts.question("Enter the database names to profile (comma-separated)"), "driver": self.prompts.question( "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" ), diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py index 36faddc384..7ba18dc7fe 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/common/queries.py @@ -210,102 +210,159 @@ def get_databases(): """ @staticmethod - def get_tables(db_name: str): + def get_tables(): """ Retrieves metadata for all tables in the specified database by querying INFORMATION_SCHEMA.TABLES. Returns table definitions along with a timestamp indicating when the data was extracted. """ - return f""" - SELECT - *, - SYSDATETIME() as extract_ts - FROM {db_name}.information_schema.tables - """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + TABLE_TYPE + FROM INFORMATION_SCHEMA.TABLES ; + """ @staticmethod - def get_views(db_name: str): + def get_views(): """ Retrieves metadata for all views in the specified database by querying `INFORMATION_SCHEMA.VIEWS`. Returns view definitions along with a timestamp indicating when the data was extracted. """ - return f""" - SELECT - *, - SYSDATETIME() as extract_ts - FROM {db_name}.information_schema.views - """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + CHECK_OPTION, + IS_UPDATABLE, + '[REDACTED]' as VIEW_DEFINITION + FROM INFORMATION_SCHEMA.VIEWS + """ @staticmethod - def get_columns(db_name: str): + def get_columns(): """ Retrieves column-level metadata for all tables and views in the specified database by querying INFORMATION_SCHEMA.COLUMNS. Returns column attributes along with a timestamp indicating when the data was extracted. """ - return f""" - SELECT - *, - SYSDATETIME() as extract_ts - FROM {db_name}.information_schema.columns - """ + return """ + SELECT + TABLE_CATALOG, + TABLE_SCHEMA, + TABLE_NAME, + COLUMN_NAME, + ORDINAL_POSITION, + COLUMN_DEFAULT, + IS_NULLABLE, + DATA_TYPE, + CHARACTER_MAXIMUM_LENGTH, + CHARACTER_OCTET_LENGTH, + NUMERIC_PRECISION, + NUMERIC_PRECISION_RADIX, + NUMERIC_SCALE, + DATETIME_PRECISION, + CHARACTER_SET_CATALOG, + CHARACTER_SET_SCHEMA, + CHARACTER_SET_NAME, + COLLATION_CATALOG, + COLLATION_SCHEMA, + COLLATION_NAME, + DOMAIN_CATALOG, + DOMAIN_SCHEMA, + DOMAIN_NAME + FROM INFORMATION_SCHEMA.COLUMNS ; + """ @staticmethod - def get_indexed_views(db_name: str): + def get_indexed_views(): """ Retrieves metadata for all indexed views in the specified database by joining `sys.views` with `sys.indexes`. Returns view details for those with a clustered index (index_id = 1) along with a timestamp indicating when the data was extracted. """ - return f""" - SELECT A.*, SYSDATETIME() as extract_ts - FROM {db_name}.sys.views A - JOIN {db_name}.sys.indexes B ON A.object_id = B.object_id - WHERE B.index_id = 1 + return """ + SELECT + v.[name] AS indexed_view_name, + s.[name] AS schema_name, + i.[name] AS index_name, + i.[type_desc] AS index_type, + i.[index_id], + SYSDATETIME() as extract_ts + FROM sys.views AS v + JOIN sys.schemas AS s + ON v.[schema_id] = s.[schema_id] + JOIN sys.indexes AS i + ON v.[object_id] = i.[object_id] + WHERE i.[index_id] = 1; """ @staticmethod - def get_routines(db_name: str): + def get_routines(): """ Retrieves metadata for all routines (stored procedures and functions) in the specified database by querying INFORMATION_SCHEMA.ROUTINES. Returns routine details along with a timestamp indicating when the data was extracted. """ - return f""" - select - *, - SYSDATETIME() as extract_ts - from {db_name}.information_schema.routines""" + return """ + SELECT + CREATED, + DATA_TYPE, + IS_DETERMINISTIC, + IS_IMPLICITLY_INVOCABLE, + IS_NULL_CALL, + IS_USER_DEFINED_CAST, + LAST_ALTERED, + MAX_DYNAMIC_RESULT_SETS, + NUMERIC_PRECISION, + NUMERIC_PRECISION_RADIX, + NUMERIC_SCALE, + ROUTINE_BODY, + ROUTINE_CATALOG, + '[REDACTED]' as ROUTINE_DEFINITION, + ROUTINE_NAME, + ROUTINE_SCHEMA, + ROUTINE_TYPE, + SCHEMA_LEVEL_ROUTINE, + SPECIFIC_CATALOG, + SPECIFIC_NAME, + SPECIFIC_SCHEMA, + SQL_DATA_ACCESS + FROM information_schema.routines + """ @staticmethod - def get_db_sizes(db_name: str): + def get_db_sizes(): """ Retrieves metadata for all data files (type = 0) in the specified database from sys.database_files. Returns file name, type, current size, free space, maximum size, and a timestamp indicating when the data was extracted. """ - return f""" + return """ SELECT - '{db_name}' AS DbName, + DB_NAME() AS database_name, name AS FileName, type_desc, size/128.0 AS CurrentSizeMB, size/128.0 - CAST(FILEPROPERTY(name, 'SpaceUsed') AS int)/128.0 AS FreeSpaceInMB, max_size as MaxSize, SYSDATETIME() as extract_ts - FROM {db_name}.sys.database_files WHERE type=0 + FROM sys.database_files WHERE type=0 """ @staticmethod - def get_table_sizes(db_name: str): + def get_table_sizes(): """ Retrieves storage and row count statistics for all user tables in the specified database by querying sys.dm_db_partition_stats and sys.objects. Returns table name, total rows, reserved, used, and unused space (MB), breakdown of data vs. index space, and a timestamp indicating when the data was extracted. """ - return f""" + return """ SELECT o.name AS TableName, SUM(ps.row_count) AS [RowCount], @@ -321,8 +378,8 @@ def get_table_sizes(db_name: str): ELSE 0 END) * 8 / 1024 AS IndexMB, SYSDATETIME() as extract_ts - FROM {db_name}.sys.dm_db_partition_stats AS ps - JOIN {db_name}sys.objects AS o ON ps.object_id = o.object_id + FROM sys.dm_db_partition_stats AS ps + JOIN sys.objects AS o ON ps.object_id = o.object_id WHERE o.type = 'U' GROUP BY schema_name(o.schema_id), o.name """ diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py index 5f2189caa7..870d782b59 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -1,10 +1,10 @@ import json import sys + from databricks.labs.lakebridge.connections.credential_manager import create_credential_manager from databricks.labs.lakebridge.assessments import PRODUCT_NAME from databricks.labs.lakebridge.resources.assessments.mssql.common.connector import get_sqlserver_reader -from databricks.labs.lakebridge.resources.assessments.mssql.common.functions import create_msql_sql_client from databricks.labs.lakebridge.resources.assessments.mssql.common.queries import MSSQLQueries from databricks.labs.lakebridge.resources.assessments.synapse.common.duckdb_helpers import save_resultset_to_db from databricks.labs.lakebridge.resources.assessments.synapse.common.functions import arguments_loader, set_logger @@ -23,7 +23,6 @@ def execute(): # TODO: get the last time the profiler was executed # For now, we'll default to None, but this will eventually need # input from a scheduler component. - last_execution_time = None mode = "overwrite" # Extract info metrics @@ -45,57 +44,54 @@ def execute(): table_name = "databases" table_query = MSSQLQueries.get_databases() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") - # TODO: if list of `db_names` not provided in config - # then loop through all the databases to collect the following info result = connection.fetch(table_query) - db_name = "main" save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Tables table_name = "tables" - table_query = MSSQLQueries.get_tables(db_name) + table_query = MSSQLQueries.get_tables() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Views table_name = "views" - table_query = MSSQLQueries.get_views(db_name) + table_query = MSSQLQueries.get_views() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Columns table_name = "columns" - table_query = MSSQLQueries.get_columns(db_name) + table_query = MSSQLQueries.get_columns() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Indexed views table_name = "indexed_views" - table_query = MSSQLQueries.get_indexed_views(db_name) + table_query = MSSQLQueries.get_indexed_views() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Routines table_name = "routines" - table_query = MSSQLQueries.get_routines(db_name) + table_query = MSSQLQueries.get_routines() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Database sizes table_name = "db_sizes" - table_query = MSSQLQueries.get_db_sizes(db_name) + table_query = MSSQLQueries.get_db_sizes() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) # Table sizes table_name = "table_sizes" - table_query = MSSQLQueries.get_table_sizes(db_name) + table_query = MSSQLQueries.get_table_sizes() logger.info(f"Loading '{table_name}' for SQL server: {server_name}") result = connection.fetch(table_query) save_resultset_to_db(result, f"mssql_{table_name}", db_path, mode=mode) diff --git a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py index 6f81f71052..372559ee35 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py +++ b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/duckdb_helpers.py @@ -73,7 +73,14 @@ def save_resultset_to_db( "mssql_cpu_utilization": "RECORD_ID BIGINT, EVENTTIME TIMESTAMP, SYSTEMIDLE BIGINT, SQLPROCESSUTILIZATION BIGINT, EXTRACT_TS TIMESTAMP", # MSSQL info extract "mssql_sys_info": "CPU_TICKS BIGINT, MS_TICKS BIGINT, CPU_COUNT BIGINT, HYPERTHREAD_RATIO BIGINT, PHYSICAL_MEMORY_KB BIGINT, VIRTUAL_MEMORY_KB BIGINT, COMMITTED_KB BIGINT, COMMITTED_TARGET_KB BIGINT, VISIBLE_TARGET_KB BIGINT, STACK_SIZE_IN_BYTES BIGINT, OS_QUANTUM BIGINT, OS_ERROR_MODE BIGINT, OS_PRIORITY_CLASS BIGINT, MAX_WORKERS_COUNT BIGINT, SCHEDULER_COUNT BIGINT, SCHEDULER_TOTAL_COUNT BIGINT, DEADLOCK_MONITOR_SERIAL_NUMBER BIGINT, SQLSERVER_START_TIME_MS_TICKS BIGINT, SQLSERVER_START_TIME TIMESTAMP, AFFINITY_TYPE BIGINT, AFFINITY_TYPE_DESC VARCHAR, PROCESS_KERNEL_TIME_MS BIGINT, PROCESS_USER_TIME_MS BIGINT, TIME_SOURCE BIGINT, TIME_SOURCE_DESC VARCHAR, VIRTUAL_MACHINE_TYPE BIGINT, VIRTUAL_MACHINE_TYPE_DESC VARCHAR, SOFTNUMA_CONFIGURATION BIGINT, SOFTNUMA_CONFIGURATION_DESC VARCHAR, PROCESS_PHYSICAL_AFFINITY VARCHAR, SQL_MEMORY_MODEL BIGINT, SQL_MEMORY_MODEL_DESC VARCHAR, SOCKET_COUNT BIGINT, CORES_PER_SOCKET BIGINT, NUMA_NODE_COUNT BIGINT, CONTAINER_TYPE BIGINT, CONTAINER_TYPE_DESC VARCHAR, EXTRACT_TS TIMESTAMP", - "mssql_databases": "DB_ID VARCHAR, NAME VARCHAR, COLLATION_NAME VARCHAR, CREATE_DATE VARCHAR, EXTRACT_TS VARCHAR", + "mssql_databases": "DB_ID VARCHAR, NAME VARCHAR, COLLATION_NAME VARCHAR, CREATE_DATE TIMESTAMP, EXTRACT_TS TIMESTAMP", + "mssql_tables": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, TABLE_TYPE VARCHAR", + "mssql_views": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, CHECK_OPTION VARCHAR, IS_UPDATABLE VARCHAR, VIEW_DEFINITION VARCHAR", + "mssql_columns": "TABLE_CATALOG VARCHAR, TABLE_SCHEMA VARCHAR, TABLE_NAME VARCHAR, COLUMN_NAME VARCHAR, ORDINAL_POSITION BIGINT, COLUMN_DEFAULT VARCHAR, IS_NULLABLE VARCHAR, DATA_TYPE VARCHAR, CHARACTER_MAXIMUM_LENGTH DOUBLE, CHARACTER_OCTET_LENGTH DOUBLE, NUMERIC_PRECISION DOUBLE, NUMERIC_PRECISION_RADIX DOUBLE, NUMERIC_SCALE DOUBLE, DATETIME_PRECISION DOUBLE, CHARACTER_SET_CATALOG VARCHAR, CHARACTER_SET_SCHEMA VARCHAR, CHARACTER_SET_NAME VARCHAR, COLLATION_CATALOG VARCHAR, COLLATION_SCHEMA VARCHAR, COLLATION_NAME VARCHAR, DOMAIN_CATALOG VARCHAR, DOMAIN_SCHEMA VARCHAR, DOMAIN_NAME VARCHAR", + "mssql_indexed_views": "INDEXED_VIEW_NAME VARCHAR, SCHEMA_NAME VARCHAR, INDEX_NAME VARCHAR, INDEX_TYPE VARCHAR, INDEX_ID VARCHAR, EXTRACT_TS VARCHAR", + "mssql_routines": "CREATED TIMESTAMP, DATA_TYPE VARCHAR, IS_DETERMINISTIC VARCHAR, IS_IMPLICITLY_INVOCABLE VARCHAR, IS_NULL_CALL VARCHAR, IS_USER_DEFINED_CAST VARCHAR, LAST_ALTERED TIMESTAMP, MAX_DYNAMIC_RESULT_SETS BIGINT, NUMERIC_PRECISION DOUBLE, NUMERIC_PRECISION_RADIX DOUBLE, NUMERIC_SCALE DOUBLE, ROUTINE_BODY VARCHAR, ROUTINE_CATALOG VARCHAR, ROUTINE_DEFINITION VARCHAR, ROUTINE_NAME VARCHAR, ROUTINE_SCHEMA VARCHAR, ROUTINE_TYPE VARCHAR, SCHEMA_LEVEL_ROUTINE VARCHAR, SPECIFIC_CATALOG VARCHAR, SPECIFIC_NAME VARCHAR, SPECIFIC_SCHEMA VARCHAR, SQL_DATA_ACCESS VARCHAR", + "mssql_db_sizes": "DATABASE_NAME VARCHAR, FILENAME VARCHAR, TYPE_DESC VARCHAR, CURRENTSIZEMB VARCHAR, FREESPACEINMB VARCHAR, MAXSIZE BIGINT, EXTRACT_TS TIMESTAMP", + "mssql_table_sizes": "TABLENAME VARCHAR, ROWCOUNT BIGINT, RESERVEDMB BIGINT, USEDMB BIGINT, UNUSEDMB BIGINT, DATAMB BIGINT, INDEXMB BIGINT, EXTRACT_TS TIMESTAMP", } try: columns = list(result.columns) From 42d0028e2a723a77e244cc770e1b96d412a39a55 Mon Sep 17 00:00:00 2001 From: Will Girten Date: Mon, 1 Dec 2025 12:34:26 -0500 Subject: [PATCH 12/12] Simplify config prompts. --- .../assessments/configure_assessment.py | 37 ++++++------------- .../assessments/mssql/activity_extract.py | 4 +- .../assessments/mssql/info_extract.py | 4 +- 3 files changed, 15 insertions(+), 30 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/configure_assessment.py b/src/databricks/labs/lakebridge/assessments/configure_assessment.py index cc88eebdbd..5923d8d96b 100644 --- a/src/databricks/labs/lakebridge/assessments/configure_assessment.py +++ b/src/databricks/labs/lakebridge/assessments/configure_assessment.py @@ -87,36 +87,21 @@ def _configure_credentials(self) -> str: secret_vault_type = str(self.prompts.choice("Enter secret vault type (local | env)", ["local", "env"])).lower() secret_vault_name = None - # JDBC Settings - logger.info("Please select JDBC authentication type:") - auth_type = self.prompts.choice( - "Select authentication type", ["sql_authentication", "ad_passwd_authentication", "spn_authentication"] - ) - mssql_jdbc = { - "auth_type": auth_type, - "fetch_size": self.prompts.question("Enter fetch size", default="1000"), - "login_timeout": self.prompts.question("Enter login timeout (seconds)", default="30"), - "server": self.prompts.question("Enter the fully-qualified server name"), - "port": int(self.prompts.question("Enter the port details", valid_number=True)), - "sql_user": self.prompts.question("Enter the SQL username"), - "sql_password": self.prompts.password("Enter the SQL password"), - "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), - "driver": self.prompts.question( - "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" - ), - } - - # Profiler settings - logger.info("Please configure profiler settings:") - mssql_profiler = { - "redact_sql_pools_sql_text": self.prompts.confirm("Redact SQL text?"), - } credential = { "secret_vault_type": secret_vault_type, "secret_vault_name": secret_vault_name, source: { - "jdbc": mssql_jdbc, - "profiler": mssql_profiler, + "auth_type": "sql_authentication", + "fetch_size": self.prompts.question("Enter fetch size", default="1000"), + "login_timeout": self.prompts.question("Enter login timeout (seconds)", default="30"), + "server": self.prompts.question("Enter the fully-qualified server name"), + "port": int(self.prompts.question("Enter the port details", valid_number=True)), + "user": self.prompts.question("Enter the SQL username"), + "password": self.prompts.password("Enter the SQL password"), + "tz_info": self.prompts.question("Enter timezone (e.g. America/New_York)", default="UTC"), + "driver": self.prompts.question( + "Enter the ODBC driver installed locally", default="ODBC Driver 18 for SQL Server" + ), }, } diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py index d29b4e57f7..e9f3b4f815 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/activity_extract.py @@ -15,8 +15,8 @@ def execute(): db_path, creds_file = arguments_loader(desc="MSSQL Server Activity Extract Script") cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") - auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - server_name = mssql_settings["jdbc"].get("server", "") + auth_type = mssql_settings.get("auth_type", "sql_authentication") + server_name = mssql_settings.get("server", "") try: # TODO: get the last time the profiler was executed diff --git a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py index 870d782b59..57d5841b1a 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py +++ b/src/databricks/labs/lakebridge/resources/assessments/mssql/info_extract.py @@ -16,8 +16,8 @@ def execute(): db_path, creds_file = arguments_loader(desc="MSSQL Server Info Extract Script") cred_manager = create_credential_manager(PRODUCT_NAME, creds_file) mssql_settings = cred_manager.get_credentials("mssql") - auth_type = mssql_settings["jdbc"].get("auth_type", "sql_authentication") - server_name = mssql_settings["jdbc"].get("server", "") + auth_type = mssql_settings.get("auth_type", "sql_authentication") + server_name = mssql_settings.get("server", "") try: # TODO: get the last time the profiler was executed