Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3634 add snapshot to the report #1703

Merged
merged 10 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -31,6 +32,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down Expand Up @@ -83,7 +85,14 @@ def get_dwh_view(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
filtered_artifacts: List[GROUPABLE_ARTIFACT] = [
artifact
for artifact in artifacts
if isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
if isinstance(
artifact,
(
NormalizedSourceSchema,
NormalizedModelSchema,
NormalizedSnapshotSchema,
),
)
]
return self.get_fqn_view(filtered_artifacts)

Expand All @@ -107,7 +116,14 @@ def get_fqn_view(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
for artifact in artifacts
if artifact.unique_id is not None
and artifact.fqn is not None
and isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
and isinstance(
artifact,
(
NormalizedSourceSchema,
NormalizedModelSchema,
NormalizedSnapshotSchema,
),
)
)
tree_builder = TreeBuilder[GroupItemSchema](separator=".")
for artifact in filtered_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/api/lineage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from elementary.utils.pydantic_shim import BaseModel, validator

NodeUniqueIdType = str
NodeType = Literal["seed", "model", "source", "exposure"]
NodeType = Literal["snapshot", "seed", "model", "source", "exposure"]
NodeSubType = Literal["table", "view"]


Expand Down
30 changes: 28 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -36,6 +38,7 @@
class ModelsAPI(APIClient):
_ARTIFACT_TYPE_DIR_MAP = {
SeedSchema: "seeds",
SnapshotSchema: "snapshots",
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
Expand Down Expand Up @@ -133,6 +136,16 @@ def get_seeds(self) -> Dict[str, NormalizedSeedSchema]:
seeds[seed_unique_id] = normalized_seed
return seeds

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshot_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshot_results:
for snapshot_result in snapshot_results:
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_models(
self, exclude_elementary_models: bool = False
) -> Dict[str, NormalizedModelSchema]:
Expand Down Expand Up @@ -239,6 +252,12 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSeedSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: ModelSchema
Expand All @@ -258,15 +277,20 @@ def _normalize_dbt_artifact_dict(
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, SnapshotSchema, ModelSchema, ExposureSchema, SourceSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
SnapshotSchema: NormalizedSnapshotSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
Expand Down Expand Up @@ -308,7 +332,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -41,6 +42,11 @@ class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema):
artifact_type: str = Field("seed", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema):
artifact_type: str = Field("model", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -86,6 +95,8 @@ def get_report_data(
lineage_node_ids: List[str] = []
seeds = models_api.get_seeds()
lineage_node_ids.extend(seeds.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
models = models_api.get_models(exclude_elementary_models)
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, exposures, seeds, snapshots
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -209,8 +221,9 @@ def _serialize_models(
sources: Dict[str, NormalizedSourceSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **exposures, **seeds, **snapshots)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def decorator(func):
default=None,
help="Filter the report by last_invocation / invocation_id:<INVOCATION_ID> / invocation_time:<INVOCATION_TIME>."
if cmd in (Command.REPORT, Command.SEND_REPORT)
else "DEPRECATED! Please use --filters instead! - Filter the alerts by tag:<TAG> / owner:<OWNER> / model:<MODEL> / "
else "DEPRECATED! Please use --filters instead! - Filter the alerts by tags:<TAG> / owners:<OWNER> / models:<MODEL> / "
NoyaArie marked this conversation as resolved.
Show resolved Hide resolved
"statuses:<warn/fail/error/skipped> / resource_types:<model/test>.",
)(func)
return func
Expand Down
26 changes: 26 additions & 0 deletions elementary/monitor/dbt_project/macros/create_temp_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% macro create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% do return(adapter.dispatch('create_temp_table','elementary_cli')(database_name, schema_name, table_name, sql_query)) %}
{%- endmacro %}

{% macro default__create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% do return(elementary.create_temp_table(database_name, schema_name, table_name, sql_query)) %}
{% endmacro %}

{% macro snowflake__create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% set temp_table_exists, temp_table_relation = dbt.get_or_create_relation(database=database_name,
schema=schema_name,
identifier=table_name,
type='table') -%}
{% set temp_table_relation = elementary.edr_make_temp_relation(temp_table_relation) %}
{% set create_query %}
create or replace temporary table {{ temp_table_relation }}
as (
{{ sql_query }}
);

{% endset %}

{% do elementary.run_query(create_query) %}

{{ return(temp_table_relation) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
'seed' as type
from {{ ref('elementary', 'dbt_seeds') }}
union all
select
unique_id,
depends_on_nodes,
materialization,
'snapshot' as type
from {{ ref('elementary', 'dbt_snapshots') }}
union all
select
unique_id,
depends_on_nodes,
Expand Down
33 changes: 33 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{% macro get_snapshots() %}
{% set dbt_snapshots_relation = ref('elementary', 'dbt_snapshots') %}
{%- if elementary.relation_exists(dbt_snapshots_relation) -%}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
unique_id,
owner as owners,
tags,
package_name,
description,
meta,
materialization,
database_name,
schema_name,
depends_on_macros,
depends_on_nodes,
original_path as full_path,
path,
patch_path,
generated_at,
unique_key,
incremental_strategy
from {{ dbt_snapshots_relation }}
)
select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{%- endif -%}
{% endmacro %}
2 changes: 1 addition & 1 deletion elementary/monitor/dbt_project/macros/get_test_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
{% set test_results = [] %}

{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
{% set ordered_test_results_relation = elementary.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_test_results) %}
{% set ordered_test_results_relation = elementary_cli.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_test_results) %}

{% set test_results_agate_sql %}
select * from {{ ordered_test_results_relation }}
Expand Down
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -43,6 +44,16 @@ def get_seeds(self) -> List[SeedSchema]:
seeds = [SeedSchema(**seed) for seed in seeds]
return seeds

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_models(self, exclude_elementary_models: bool = False) -> List[ModelSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_models",
Expand Down
18 changes: 18 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ def load_meta(cls, meta):
ArtifactSchemaType = TypeVar("ArtifactSchemaType", bound=ArtifactSchema)


class SnapshotSchema(ArtifactSchema):
database_name: str
schema_name: str
depends_on_macros: str
depends_on_nodes: str
path: str
patch_path: Optional[str]
generated_at: str
unique_key: str
incremental_strategy: Optional[str]

table_name: Optional[str] = None

@validator("table_name", always=True)
def set_table_name(cls, table_name, values):
return values.get("name")


class SeedSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
Expand Down
Loading