Skip to content

Commit 3b8f8e8

Browse files
[dagster-sigma] Move contextual data from DagsterSigmaTranslator to container classes
1 parent 867bceb commit 3b8f8e8

File tree

4 files changed

+125
-40
lines changed

4 files changed

+125
-40
lines changed

examples/docs_snippets/docs_snippets/integrations/sigma/customize-sigma-asset-defs.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
class MyCustomSigmaTranslator(DagsterSigmaTranslator):
2020
def get_asset_spec(self, data: SigmaWorkbook) -> dg.AssetSpec:
2121
# We create the default asset spec using super()
22-
default_spec = super().get_asset_spec(data)
22+
default_spec = super().get_asset_spec(data) # type: ignore
2323
# we customize the team owner tag for all Sigma assets
2424
return default_spec.replace_attributes(owners=["team:my_team"])
2525

python_modules/libraries/dagster-sigma/dagster_sigma/resource.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@
4545
from dagster_sigma.translator import (
4646
DagsterSigmaTranslator,
4747
SigmaDataset,
48+
SigmaDatasetTranslatorData,
4849
SigmaOrganizationData,
4950
SigmaTable,
5051
SigmaWorkbook,
5152
SigmaWorkbookMetadataSet,
53+
SigmaWorkbookTranslatorData,
5254
_inode_from_url,
5355
)
5456

@@ -719,9 +721,7 @@ def build_defs(
719721
@experimental
720722
def load_sigma_asset_specs(
721723
organization: SigmaOrganization,
722-
dagster_sigma_translator: Callable[
723-
[SigmaOrganizationData], DagsterSigmaTranslator
724-
] = DagsterSigmaTranslator,
724+
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
725725
sigma_filter: Optional[SigmaFilter] = None,
726726
fetch_column_data: bool = True,
727727
fetch_lineage_data: bool = True,
@@ -731,7 +731,7 @@ def load_sigma_asset_specs(
731731
732732
Args:
733733
organization (SigmaOrganization): The Sigma organization to fetch assets from.
734-
dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use
734+
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
735735
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
736736
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
737737
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
@@ -763,7 +763,8 @@ def load_sigma_asset_specs(
763763

764764

765765
def _get_translator_spec_assert_keys_match(
766-
translator: DagsterSigmaTranslator, data: Union[SigmaDataset, SigmaWorkbook]
766+
translator: DagsterSigmaTranslator,
767+
data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData],
767768
) -> AssetSpec:
768769
key = translator.get_asset_key(data)
769770
spec = translator.get_asset_spec(data)
@@ -778,7 +779,7 @@ def _get_translator_spec_assert_keys_match(
778779
@dataclass
779780
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
780781
organization: SigmaOrganization
781-
translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator]
782+
translator_cls: Type[DagsterSigmaTranslator]
782783
snapshot: Optional[RepositoryLoadData]
783784
sigma_filter: Optional[SigmaFilter] = None
784785
fetch_column_data: bool = True
@@ -801,9 +802,17 @@ def fetch_state(self) -> SigmaOrganizationData:
801802
)
802803

803804
def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
804-
translator = self.translator_cls(state)
805+
translator = self.translator_cls()
806+
translator_data_workbooks = [
807+
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
808+
for workbook in state.workbooks
809+
]
810+
translator_data_datasets = [
811+
SigmaDatasetTranslatorData(dataset=dataset, organization_data=state)
812+
for dataset in state.datasets
813+
]
805814
asset_specs = [
806815
_get_translator_spec_assert_keys_match(translator, obj)
807-
for obj in [*state.workbooks, *state.datasets]
816+
for obj in [*translator_data_workbooks, *translator_data_datasets]
808817
]
809818
return Definitions(assets=asset_specs)

python_modules/libraries/dagster-sigma/dagster_sigma/translator.py

+73-14
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,58 @@ def get_table_path(self) -> List[str]:
9090
return self.properties["path"].split("/")[1:] + [self.properties["name"]]
9191

9292

93+
@record
94+
class SigmaWorkbookTranslatorData:
95+
"""A record representing a Sigma workbook and the Sigma organization data."""
96+
97+
workbook: "SigmaWorkbook"
98+
organization_data: "SigmaOrganizationData"
99+
100+
@property
101+
def properties(self) -> Dict[str, Any]:
102+
return self.workbook.properties
103+
104+
@property
105+
def lineage(self) -> List[Dict[str, Any]]:
106+
return self.workbook.lineage
107+
108+
@property
109+
def datasets(self) -> AbstractSet[str]:
110+
return self.workbook.datasets
111+
112+
@property
113+
def direct_table_deps(self) -> AbstractSet[str]:
114+
return self.workbook.direct_table_deps
115+
116+
@property
117+
def owner_email(self) -> Optional[str]:
118+
return self.workbook.owner_email
119+
120+
@property
121+
def materialization_schedules(self) -> Optional[List[Dict[str, Any]]]:
122+
return self.workbook.materialization_schedules
123+
124+
125+
@record
126+
class SigmaDatasetTranslatorData:
127+
"""A record representing a Sigma dataset and the Sigma organization data."""
128+
129+
dataset: "SigmaDataset"
130+
organization_data: "SigmaOrganizationData"
131+
132+
@property
133+
def properties(self) -> Dict[str, Any]:
134+
return self.dataset.properties
135+
136+
@property
137+
def columns(self) -> AbstractSet[str]:
138+
return self.dataset.columns
139+
140+
@property
141+
def inputs(self) -> AbstractSet[str]:
142+
return self.dataset.inputs
143+
144+
93145
@whitelist_for_serdes
94146
@record
95147
class SigmaOrganizationData:
@@ -111,24 +163,21 @@ class DagsterSigmaTranslator:
111163
Subclass this class to provide custom translation logic.
112164
"""
113165

114-
def __init__(self, context: SigmaOrganizationData):
115-
self._context = context
116-
117-
@property
118-
def organization_data(self) -> SigmaOrganizationData:
119-
return self._context
120-
121166
@deprecated(
122167
breaking_version="1.10",
123168
additional_warn_text="Use `DagsterSigmaTranslator.get_asset_spec(...).key` instead",
124169
)
125-
def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey:
170+
def get_asset_key(
171+
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
172+
) -> AssetKey:
126173
"""Get the AssetKey for a Sigma object, such as a workbook or dataset."""
127174
return self.get_asset_spec(data).key
128175

129-
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
176+
def get_asset_spec(
177+
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
178+
) -> AssetSpec:
130179
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
131-
if isinstance(data, SigmaWorkbook):
180+
if isinstance(data, SigmaWorkbookTranslatorData):
132181
metadata = {
133182
**SigmaWorkbookMetadataSet(
134183
web_url=MetadataValue.url(data.properties["url"]),
@@ -148,25 +197,35 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
148197
),
149198
),
150199
}
151-
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
200+
datasets = [
201+
data.organization_data.get_datasets_by_inode()[inode] for inode in data.datasets
202+
]
152203
tables = [
153-
self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps
204+
data.organization_data.get_tables_by_inode()[inode]
205+
for inode in data.direct_table_deps
154206
]
155207

156208
return AssetSpec(
157209
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
158210
metadata=metadata,
159211
kinds={"sigma", "workbook"},
160212
deps={
161-
*[self.get_asset_key(dataset) for dataset in datasets],
213+
*[
214+
self.get_asset_key(
215+
SigmaDatasetTranslatorData(
216+
dataset=dataset, organization_data=data.organization_data
217+
)
218+
)
219+
for dataset in datasets
220+
],
162221
*[
163222
asset_key_from_table_name(".".join(table.get_table_path()).lower())
164223
for table in tables
165224
],
166225
},
167226
owners=[data.owner_email] if data.owner_email else None,
168227
)
169-
elif isinstance(data, SigmaDataset):
228+
elif isinstance(data, SigmaDatasetTranslatorData):
170229
metadata = {
171230
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
172231
"dagster_sigma/created_at": MetadataValue.timestamp(

python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py

+34-17
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
from dagster_sigma.translator import (
88
DagsterSigmaTranslator,
99
SigmaDataset,
10+
SigmaDatasetTranslatorData,
1011
SigmaOrganizationData,
1112
SigmaTable,
1213
SigmaWorkbook,
14+
SigmaWorkbookTranslatorData,
1315
)
1416

1517
from dagster_sigma_tests.conftest import (
@@ -33,16 +35,19 @@ def test_workbook_translation() -> None:
3335

3436
sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set())
3537

36-
translator = DagsterSigmaTranslator(
37-
SigmaOrganizationData(
38-
workbooks=[sample_workbook],
39-
datasets=[sample_dataset],
40-
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
38+
translator = DagsterSigmaTranslator()
39+
40+
asset_spec = translator.get_asset_spec(
41+
SigmaWorkbookTranslatorData(
42+
workbook=sample_workbook,
43+
organization_data=SigmaOrganizationData(
44+
workbooks=[sample_workbook],
45+
datasets=[sample_dataset],
46+
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
47+
),
4148
)
4249
)
4350

44-
asset_spec = translator.get_asset_spec(sample_workbook)
45-
4651
assert asset_spec.key.path == ["Sample_Workbook"]
4752
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_WORKBOOK_DATA["url"]
4853
assert asset_spec.metadata["dagster_sigma/version"] == 5
@@ -63,11 +68,16 @@ def test_dataset_translation() -> None:
6368
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
6469
)
6570

66-
translator = DagsterSigmaTranslator(
67-
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
68-
)
71+
translator = DagsterSigmaTranslator()
6972

70-
asset_spec = translator.get_asset_spec(sample_dataset)
73+
asset_spec = translator.get_asset_spec(
74+
SigmaDatasetTranslatorData(
75+
dataset=sample_dataset,
76+
organization_data=SigmaOrganizationData(
77+
workbooks=[], datasets=[sample_dataset], tables=[]
78+
),
79+
)
80+
)
7181

7282
assert asset_spec.key.path == ["Orders_Dataset"]
7383
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_DATASET_DATA["url"]
@@ -91,9 +101,11 @@ def test_dataset_translation() -> None:
91101

92102
def test_dataset_translation_custom_translator() -> None:
93103
class MyCustomTranslator(DagsterSigmaTranslator):
94-
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
104+
def get_asset_spec(
105+
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
106+
) -> AssetSpec:
95107
spec = super().get_asset_spec(data)
96-
if isinstance(data, SigmaDataset):
108+
if isinstance(data, SigmaDatasetTranslatorData):
97109
spec = spec.replace_attributes(
98110
key=spec.key.with_prefix("sigma"), description="Custom description"
99111
)
@@ -105,11 +117,16 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
105117
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
106118
)
107119

108-
translator = MyCustomTranslator(
109-
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
110-
)
120+
translator = MyCustomTranslator()
111121

112-
asset_spec = translator.get_asset_spec(sample_dataset)
122+
asset_spec = translator.get_asset_spec(
123+
SigmaDatasetTranslatorData(
124+
dataset=sample_dataset,
125+
organization_data=SigmaOrganizationData(
126+
workbooks=[], datasets=[sample_dataset], tables=[]
127+
),
128+
)
129+
)
113130

114131
assert asset_spec.key.path == ["sigma", "Orders_Dataset"]
115132
assert asset_spec.description == "Custom description"

0 commit comments

Comments
 (0)