From c09014126bc9f4a72d4b251ba8bfa6f84938421c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 2 Jan 2025 21:14:41 -0500 Subject: [PATCH] [dagster-sigma] Use Sigma translator instance in spec loader and state-backed defs --- .../dagster-sigma/dagster_sigma/resource.py | 27 +++++++++++---- .../pending_repo_with_translator.py | 2 +- .../pending_repo_with_translator_legacy.py | 33 +++++++++++++++++++ .../dagster_sigma_tests/test_asset_specs.py | 24 ++++++++++++++ 4 files changed, 78 insertions(+), 8 deletions(-) create mode 100644 python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator_legacy.py diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index b6d5ae249c099..b07e0b4015c8a 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -26,6 +26,7 @@ from dagster._serdes.serdes import deserialize_value from dagster._utils.cached_method import cached_method from dagster._utils.log import get_dagster_logger +from dagster._utils.warnings import deprecation_warning from pydantic import Field, PrivateAttr from sqlglot import exp, parse_one @@ -709,7 +710,9 @@ def build_defs( @experimental def load_sigma_asset_specs( organization: SigmaOrganization, - dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator, + dagster_sigma_translator: Optional[ + Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslator]] + ] = None, sigma_filter: Optional[SigmaFilter] = None, fetch_column_data: bool = True, fetch_lineage_data: bool = True, @@ -719,8 +722,9 @@ def load_sigma_asset_specs( Args: organization (SigmaOrganization): The Sigma organization to fetch assets from. - dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use - to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator. + dagster_sigma_translator (Optional[Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslatorr]]]): + The translator to use to convert Sigma content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterSigmaTranslator`. sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch. fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow. fetch_lineage_data (bool): Whether to fetch any lineage data for workbooks and datasets. @@ -730,6 +734,16 @@ def load_sigma_asset_specs( Returns: List[AssetSpec]: The set of assets representing the Sigma content in the organization. """ + if isinstance(dagster_sigma_translator, type): + deprecation_warning( + subject="Support of `dagster_sigma_translator` as a Type[DagsterSigmaTranslator]", + breaking_version="1.10", + additional_warn_text=( + "Pass an instance of DagsterSigmaTranslator or subclass to `dagster_sigma_translator` instead." + ), + ) + dagster_sigma_translator = dagster_sigma_translator() + snapshot = None if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME): snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData) @@ -738,7 +752,7 @@ def load_sigma_asset_specs( return check.is_list( SigmaOrganizationDefsLoader( organization=initialized_organization, - translator_cls=dagster_sigma_translator, + translator=dagster_sigma_translator or DagsterSigmaTranslator(), sigma_filter=sigma_filter, fetch_column_data=fetch_column_data, fetch_lineage_data=fetch_lineage_data, @@ -767,7 +781,7 @@ def _get_translator_spec_assert_keys_match( @dataclass class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]): organization: SigmaOrganization - translator_cls: Type[DagsterSigmaTranslator] + translator: DagsterSigmaTranslator snapshot: Optional[RepositoryLoadData] sigma_filter: Optional[SigmaFilter] = None fetch_column_data: bool = True @@ -790,7 +804,6 @@ def fetch_state(self) -> SigmaOrganizationData: ) def defs_from_state(self, state: SigmaOrganizationData) -> Definitions: - translator = self.translator_cls() translator_data_workbooks = [ SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state) for workbook in state.workbooks @@ -800,7 +813,7 @@ def defs_from_state(self, state: SigmaOrganizationData) -> Definitions: for dataset in state.datasets ] asset_specs = [ - _get_translator_spec_assert_keys_match(translator, obj) + _get_translator_spec_assert_keys_match(self.translator, obj) for obj in [*translator_data_workbooks, *translator_data_datasets] ] return Definitions(assets=asset_specs) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator.py index d3c48e3169250..d0e9981d4f4d1 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator.py @@ -28,5 +28,5 @@ def get_asset_spec(self, data) -> AssetSpec: client_secret=EnvVar("SIGMA_CLIENT_SECRET"), ) - sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator) + sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator()) defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")]) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator_legacy.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator_legacy.py new file mode 100644 index 0000000000000..a36174297f946 --- /dev/null +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator_legacy.py @@ -0,0 +1,33 @@ +from dagster import EnvVar, define_asset_job +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.definitions_class import Definitions +from dagster._utils.env import environ +from dagster_sigma import ( + DagsterSigmaTranslator, + SigmaBaseUrl, + SigmaOrganization, + load_sigma_asset_specs, +) + +fake_client_id = "fake_client_id" +fake_client_secret = "fake_client_secret" + +with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}): + fake_token = "fake_token" + + class MyCoolTranslator(DagsterSigmaTranslator): + def get_asset_spec(self, data) -> AssetSpec: + spec = super().get_asset_spec(data) + return spec.replace_attributes( + key=spec.key.with_prefix("my_prefix"), + ) + + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=EnvVar("SIGMA_CLIENT_ID"), + client_secret=EnvVar("SIGMA_CLIENT_SECRET"), + ) + + # Pass the translator type + sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator) + defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")]) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py index c3cc5a383c885..4624133674c5b 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py @@ -1,6 +1,7 @@ from pathlib import Path from tempfile import TemporaryDirectory +import pytest import responses from click.testing import CliRunner from dagster._core.code_pointer import CodePointer @@ -117,3 +118,26 @@ def test_load_assets_organization_data_translator( assert all( key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys() ), repository_def.assets_defs_by_key + + +@responses.activate +def test_load_assets_organization_data_translator_legacy( + sigma_auth_token: str, sigma_sample_data: None +) -> None: + with instance_for_test() as _instance: + with pytest.warns( + DeprecationWarning, + match=r"Support of `dagster_sigma_translator` as a Type\[DagsterSigmaTranslator\]", + ): + repository_def = initialize_repository_def_from_pointer( + CodePointer.from_python_file( + str(Path(__file__).parent / "pending_repo_with_translator_legacy.py"), + "defs", + None, + ), + ) + + assert len(repository_def.assets_defs_by_key) == 2 + assert all( + key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys() + ), repository_def.assets_defs_by_key