Skip to content

Commit 926c8de

Browse files
[dagster-sigma] Use Sigma translator instance in spec loader and state-backed defs
1 parent 3b8f8e8 commit 926c8de

File tree

4 files changed

+79
-21
lines changed

4 files changed

+79
-21
lines changed

python_modules/libraries/dagster-sigma/dagster_sigma/resource.py

+21-20
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,7 @@
99
from dataclasses import dataclass
1010
from enum import Enum
1111
from pathlib import Path
12-
from typing import (
13-
AbstractSet,
14-
Any,
15-
Callable,
16-
Dict,
17-
Iterator,
18-
List,
19-
Mapping,
20-
Optional,
21-
Sequence,
22-
Type,
23-
Union,
24-
)
12+
from typing import AbstractSet, Any, Dict, Iterator, List, Mapping, Optional, Sequence, Type, Union
2513

2614
import aiohttp
2715
import dagster._check as check
@@ -38,6 +26,7 @@
3826
from dagster._serdes.serdes import deserialize_value
3927
from dagster._utils.cached_method import cached_method
4028
from dagster._utils.log import get_dagster_logger
29+
from dagster._utils.warnings import deprecation_warning
4130
from pydantic import Field, PrivateAttr
4231
from sqlglot import exp, parse_one
4332

@@ -721,7 +710,9 @@ def build_defs(
721710
@experimental
722711
def load_sigma_asset_specs(
723712
organization: SigmaOrganization,
724-
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
713+
dagster_sigma_translator: Optional[
714+
Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslator]]
715+
] = None,
725716
sigma_filter: Optional[SigmaFilter] = None,
726717
fetch_column_data: bool = True,
727718
fetch_lineage_data: bool = True,
@@ -731,8 +722,9 @@ def load_sigma_asset_specs(
731722
732723
Args:
733724
organization (SigmaOrganization): The Sigma organization to fetch assets from.
734-
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
735-
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
725+
dagster_sigma_translator (Optional[Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslatorr]]]):
726+
The translator to use to convert Sigma content into :py:class:`dagster.AssetSpec`.
727+
Defaults to :py:class:`DagsterSigmaTranslator`.
736728
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
737729
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
738730
fetch_lineage_data (bool): Whether to fetch any lineage data for workbooks and datasets.
@@ -742,6 +734,16 @@ def load_sigma_asset_specs(
742734
Returns:
743735
List[AssetSpec]: The set of assets representing the Sigma content in the organization.
744736
"""
737+
if isinstance(dagster_sigma_translator, type):
738+
deprecation_warning(
739+
subject="Support of `dagster_sigma_translator` as a Type[DagsterSigmaTranslator]",
740+
breaking_version="1.10",
741+
additional_warn_text=(
742+
"Pass an instance of DagsterSigmaTranslator or subclass to `dagster_sigma_translator` instead."
743+
),
744+
)
745+
dagster_sigma_translator = dagster_sigma_translator()
746+
745747
snapshot = None
746748
if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME):
747749
snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData)
@@ -750,7 +752,7 @@ def load_sigma_asset_specs(
750752
return check.is_list(
751753
SigmaOrganizationDefsLoader(
752754
organization=initialized_organization,
753-
translator_cls=dagster_sigma_translator,
755+
translator=dagster_sigma_translator or DagsterSigmaTranslator(),
754756
sigma_filter=sigma_filter,
755757
fetch_column_data=fetch_column_data,
756758
fetch_lineage_data=fetch_lineage_data,
@@ -779,7 +781,7 @@ def _get_translator_spec_assert_keys_match(
779781
@dataclass
780782
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
781783
organization: SigmaOrganization
782-
translator_cls: Type[DagsterSigmaTranslator]
784+
translator: DagsterSigmaTranslator
783785
snapshot: Optional[RepositoryLoadData]
784786
sigma_filter: Optional[SigmaFilter] = None
785787
fetch_column_data: bool = True
@@ -802,7 +804,6 @@ def fetch_state(self) -> SigmaOrganizationData:
802804
)
803805

804806
def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
805-
translator = self.translator_cls()
806807
translator_data_workbooks = [
807808
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
808809
for workbook in state.workbooks
@@ -812,7 +813,7 @@ def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
812813
for dataset in state.datasets
813814
]
814815
asset_specs = [
815-
_get_translator_spec_assert_keys_match(translator, obj)
816+
_get_translator_spec_assert_keys_match(self.translator, obj)
816817
for obj in [*translator_data_workbooks, *translator_data_datasets]
817818
]
818819
return Definitions(assets=asset_specs)

python_modules/libraries/dagster-sigma/dagster_sigma_tests/pending_repo_with_translator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ def get_asset_spec(self, data) -> AssetSpec:
2828
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
2929
)
3030

31-
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
31+
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator())
3232
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from dagster import EnvVar, define_asset_job
2+
from dagster._core.definitions.asset_spec import AssetSpec
3+
from dagster._core.definitions.definitions_class import Definitions
4+
from dagster._utils.env import environ
5+
from dagster_sigma import (
6+
DagsterSigmaTranslator,
7+
SigmaBaseUrl,
8+
SigmaOrganization,
9+
load_sigma_asset_specs,
10+
)
11+
12+
fake_client_id = "fake_client_id"
13+
fake_client_secret = "fake_client_secret"
14+
15+
with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}):
16+
fake_token = "fake_token"
17+
18+
class MyCoolTranslator(DagsterSigmaTranslator):
19+
def get_asset_spec(self, data) -> AssetSpec:
20+
spec = super().get_asset_spec(data)
21+
return spec.replace_attributes(
22+
key=spec.key.with_prefix("my_prefix"),
23+
)
24+
25+
resource = SigmaOrganization(
26+
base_url=SigmaBaseUrl.AWS_US,
27+
client_id=EnvVar("SIGMA_CLIENT_ID"),
28+
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
29+
)
30+
31+
# Pass the translator type
32+
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
33+
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])

python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py

+24
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pathlib import Path
22
from tempfile import TemporaryDirectory
33

4+
import pytest
45
import responses
56
from click.testing import CliRunner
67
from dagster._core.code_pointer import CodePointer
@@ -117,3 +118,26 @@ def test_load_assets_organization_data_translator(
117118
assert all(
118119
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
119120
), repository_def.assets_defs_by_key
121+
122+
123+
@responses.activate
124+
def test_load_assets_organization_data_translator_legacy(
125+
sigma_auth_token: str, sigma_sample_data: None
126+
) -> None:
127+
with instance_for_test() as _instance:
128+
with pytest.warns(
129+
DeprecationWarning,
130+
match=r"Support of `dagster_sigma_translator` as a Type\[DagsterSigmaTranslator\]",
131+
):
132+
repository_def = initialize_repository_def_from_pointer(
133+
CodePointer.from_python_file(
134+
str(Path(__file__).parent / "pending_repo_with_translator_legacy.py"),
135+
"defs",
136+
None,
137+
),
138+
)
139+
140+
assert len(repository_def.assets_defs_by_key) == 2
141+
assert all(
142+
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
143+
), repository_def.assets_defs_by_key

0 commit comments

Comments
 (0)