Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
"((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$"
)

# Pre-compiled regex patterns for performance (used in ingestion hot path)
_COMPILED_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: Pattern = re.compile(
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX, re.IGNORECASE
)
_COMPILED_BIGQUERY_WILDCARD_REGEX: Pattern = re.compile("((_(\\d+)?)\\*$)|\\*$")


@dataclass(frozen=True, order=True)
class BigqueryTableIdentifier:
Expand Down Expand Up @@ -58,11 +64,7 @@ def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]:
In case of sharded tables, returns (<table-prefix>, shard)
"""
new_table_name = table_name
match = re.match(
BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
table_name,
re.IGNORECASE,
)
match = _COMPILED_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX.match(table_name)
if match:
shard: str = match[3]
if shard:
Expand Down Expand Up @@ -96,7 +98,7 @@ def get_table_display_name(self) -> str:
- removes partition ids (table$20210101 -> table or table$__UNPARTITIONED__ -> table)
"""
# if table name ends in _* or * or _yyyy* or _yyyymm* then we strip it as that represents a query on a sharded table
shortened_table_name = re.sub(self._BIGQUERY_WILDCARD_REGEX, "", self.table)
shortened_table_name = _COMPILED_BIGQUERY_WILDCARD_REGEX.sub("", self.table)

matches = BigQueryTableRef.SNAPSHOT_TABLE_REGEX.match(shortened_table_name)
if matches:
Expand Down Expand Up @@ -133,11 +135,8 @@ def is_sharded_table(self) -> bool:
if shard:
return True

if re.match(
f".*({BigqueryTableIdentifier._BIGQUERY_WILDCARD_REGEX})",
self.raw_table_name(),
re.IGNORECASE,
):
# Check if table name contains wildcard pattern
if _COMPILED_BIGQUERY_WILDCARD_REGEX.search(self.raw_table_name()):
return True

return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
"databricks": r"(databricks|spark)",
}

# Pre-compiled regex patterns for performance (used in ODBC connection hot path)
_compiled_platform_patterns = {
platform: re.compile(pattern, re.IGNORECASE)
for platform, pattern in platform_patterns.items()
}

powerbi_platform_names = {
"mysql": "MySQL",
"postgres": "PostgreSQL",
Expand Down Expand Up @@ -157,8 +163,8 @@ def extract_platform(connection_string: str) -> Tuple[Optional[str], Optional[st

driver_lower = driver_name.lower()

for platform, pattern in platform_patterns.items():
if re.search(pattern, driver_lower):
for platform, compiled_pattern in _compiled_platform_patterns.items():
if compiled_pattern.search(driver_lower):
return platform, powerbi_platform_names.get(platform)

return None, None
Expand All @@ -178,8 +184,8 @@ def normalize_platform_name(platform: str) -> Tuple[Optional[str], Optional[str]
"""
platform_lower = platform.lower()

for platform, pattern in platform_patterns.items():
if re.search(pattern, platform_lower):
return platform, powerbi_platform_names.get(platform)
for platform_name, compiled_pattern in _compiled_platform_patterns.items():
if compiled_pattern.search(platform_lower):
return platform_name, powerbi_platform_names.get(platform_name)

return None, None
17 changes: 11 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import re
from dataclasses import dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, ClassVar, Iterable, List, Optional, Union, cast
from functools import cached_property, partial
from typing import Any, ClassVar, Iterable, List, Optional, Pattern, Union, cast

import smart_open
from pydantic import BaseModel, ConfigDict, Field, field_validator
Expand Down Expand Up @@ -92,6 +92,11 @@ class SqlQueriesSourceConfig(
default=[],
)

@cached_property
def compiled_temp_table_patterns(self) -> List[Pattern]:
"""Pre-compiled regex patterns for temp table filtering (performance optimization)."""
return [re.compile(pattern, re.IGNORECASE) for pattern in self.temp_table_patterns]

enable_lazy_schema_loading: bool = Field(
default=True,
description="Enable lazy schema loading for better performance. When enabled, schemas are fetched on-demand "
Expand Down Expand Up @@ -422,15 +427,15 @@ def is_temp_table(self, name: str) -> bool:
return False

try:
for pattern in self.config.temp_table_patterns:
if re.match(pattern, name, flags=re.IGNORECASE):
for pattern in self.config.compiled_temp_table_patterns:
if pattern.match(name):
logger.debug(
f"Table '{name}' matched temp table pattern: {pattern}"
f"Table '{name}' matched temp table pattern: {pattern.pattern}"
)
self.report.num_temp_tables_detected += 1
return True
except re.error as e:
logger.warning(f"Invalid regex pattern '{pattern}': {e}")
logger.warning(f"Invalid regex pattern: {e}")

return False

Expand Down
Loading