Skip to content

Commit

Permalink
Schema Change - Update Event Name (v2) (#589)
Browse files Browse the repository at this point in the history
Co-authored-by: Great Expectations <[email protected]>
  • Loading branch information
Shinnnyshinshin and tiny-tim-bot authored Dec 12, 2024
1 parent 2967188 commit c1a19f3
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 45 deletions.
4 changes: 2 additions & 2 deletions great_expectations_cloud/agent/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from great_expectations_cloud.agent.actions.draft_datasource_config_action import (
DraftDatasourceConfigAction,
)
from great_expectations_cloud.agent.actions.generate_schema_change_expectations_action import (
GenerateSchemaChangeExpectationsAction,
from great_expectations_cloud.agent.actions.generate_data_quality_check_expectations_action import (
GenerateDataQualityCheckExpectationsAction,
)
from great_expectations_cloud.agent.actions.list_table_names import ListTableNamesAction
from great_expectations_cloud.agent.actions.run_checkpoint import RunCheckpointAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from great_expectations_cloud.agent.exceptions import GXAgentError
from great_expectations_cloud.agent.models import (
CreatedResource,
GenerateSchemaChangeExpectationsEvent,
GenerateDataQualityCheckExpectationsEvent,
)

if TYPE_CHECKING:
Expand All @@ -39,7 +39,7 @@
LOGGER.setLevel(logging.DEBUG)


class PartialSchemaChangeExpectationError(GXAgentError):
class PartialGenerateDataQualityCheckExpectationError(GXAgentError):
def __init__(self, assets_with_errors: list[str], assets_attempted: int):
message_header = f"Unable to fetch schemas for {len(assets_with_errors)} of {assets_attempted} Data Assets."
errors = ", ".join(assets_with_errors)
Expand All @@ -48,7 +48,9 @@ def __init__(self, assets_with_errors: list[str], assets_attempted: int):
super().__init__(message)


class GenerateSchemaChangeExpectationsAction(AgentAction[GenerateSchemaChangeExpectationsEvent]):
class GenerateDataQualityCheckExpectationsAction(
AgentAction[GenerateDataQualityCheckExpectationsEvent]
):
def __init__( # noqa: PLR0913 # Refactor opportunity
self,
context: CloudDataContext,
Expand All @@ -69,15 +71,15 @@ def __init__( # noqa: PLR0913 # Refactor opportunity
)

@override
def run(self, event: GenerateSchemaChangeExpectationsEvent, id: str) -> ActionResult:
def run(self, event: GenerateDataQualityCheckExpectationsEvent, id: str) -> ActionResult:
created_resources: list[CreatedResource] = []
assets_with_errors: list[str] = []
for asset_name in event.data_assets:
try:
data_asset = self._retrieve_asset_from_asset_name(event, asset_name)
metric_run, metric_run_id = self._get_metrics(data_asset)

expectation_id = self._add_schema_change_expectation(
expectation_id = self._add_data_quality_check_expectation(
metric_run=metric_run, asset_id=data_asset.id
)
created_resources.append(
Expand All @@ -90,7 +92,7 @@ def run(self, event: GenerateSchemaChangeExpectationsEvent, id: str) -> ActionRe
assets_with_errors.append(asset_name)

if assets_with_errors:
raise PartialSchemaChangeExpectationError(
raise PartialGenerateDataQualityCheckExpectationError(
assets_with_errors=assets_with_errors,
assets_attempted=len(event.data_assets),
)
Expand All @@ -102,7 +104,7 @@ def run(self, event: GenerateSchemaChangeExpectationsEvent, id: str) -> ActionRe
)

def _retrieve_asset_from_asset_name(
self, event: GenerateSchemaChangeExpectationsEvent, asset_name: str
self, event: GenerateDataQualityCheckExpectationsEvent, asset_name: str
) -> DataAsset:
try:
datasource = self._context.data_sources.get(event.datasource_name)
Expand All @@ -129,7 +131,7 @@ def _get_metrics(self, data_asset: DataAsset) -> tuple[MetricRun, UUID]:

return metric_run, metric_run_id

def _add_schema_change_expectation(self, metric_run: MetricRun, asset_id: UUID) -> UUID:
def _add_data_quality_check_expectation(self, metric_run: MetricRun, asset_id: UUID) -> UUID:
expectation = gx_expectations.ExpectTableColumnsToMatchSet(
column_set=metric_run.metrics[0].value
)
Expand Down Expand Up @@ -169,5 +171,5 @@ def _raise_on_any_metric_exception(self, metric_run: MetricRun) -> None:


register_event_action(
"1", GenerateSchemaChangeExpectationsEvent, GenerateSchemaChangeExpectationsAction
"1", GenerateDataQualityCheckExpectationsEvent, GenerateDataQualityCheckExpectationsAction
)
8 changes: 4 additions & 4 deletions great_expectations_cloud/agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class DraftDatasourceConfigEvent(EventBase):
config_id: UUID


class GenerateSchemaChangeExpectationsEvent(EventBase):
type: Literal["generate_schema_change_expectations_request.received"] = (
"generate_schema_change_expectations_request.received"
class GenerateDataQualityCheckExpectationsEvent(EventBase):
type: Literal["generate_data_quality_check_expectations_request.received"] = (
"generate_data_quality_check_expectations_request.received"
)
datasource_name: str
data_assets: Sequence[str]
Expand Down Expand Up @@ -135,7 +135,7 @@ class UnknownEvent(AgentBaseExtraForbid):
RunMetricsListEvent,
DraftDatasourceConfigEvent,
ListTableNamesEvent,
GenerateSchemaChangeExpectationsEvent,
GenerateDataQualityCheckExpectationsEvent,
RunRdAgentEvent,
UnknownEvent,
],
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "great_expectations_cloud"
version = "20241212.1.dev0"
version = "20241212.1.dev1"
description = "Great Expectations Cloud"
authors = ["The Great Expectations Team <[email protected]>"]
repository = "https://github.com/great-expectations/cloud"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
TableMetric,
)

from great_expectations_cloud.agent.actions.generate_schema_change_expectations_action import (
GenerateSchemaChangeExpectationsAction,
PartialSchemaChangeExpectationError,
from great_expectations_cloud.agent.actions.generate_data_quality_check_expectations_action import (
GenerateDataQualityCheckExpectationsAction,
PartialGenerateDataQualityCheckExpectationError,
)
from great_expectations_cloud.agent.models import GenerateSchemaChangeExpectationsEvent
from great_expectations_cloud.agent.models import GenerateDataQualityCheckExpectationsEvent

if TYPE_CHECKING:
from great_expectations.data_context.data_context.cloud_data_context import CloudDataContext
Expand Down Expand Up @@ -61,7 +61,7 @@ def mock_metrics_list() -> list[TableMetric]:
# https://docs.pytest.org/en/7.1.x/how-to/monkeypatch.html
@pytest.fixture
def mock_response_success(monkeypatch, mock_metrics_list: list[TableMetric]):
def mock_data_asset(self, event: GenerateSchemaChangeExpectationsEvent, asset_name: str):
def mock_data_asset(self, event: GenerateDataQualityCheckExpectationsEvent, asset_name: str):
return TableAsset(
id=TABLE_ASSET_ID,
name="test-data-asset",
Expand All @@ -73,16 +73,18 @@ def mock_metrics(self, data_asset: DataAsset):
return MetricRun(metrics=mock_metrics_list), uuid.uuid4()

monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction, "_retrieve_asset_from_asset_name", mock_data_asset
GenerateDataQualityCheckExpectationsAction,
"_retrieve_asset_from_asset_name",
mock_data_asset,
)
monkeypatch.setattr(GenerateSchemaChangeExpectationsAction, "_get_metrics", mock_metrics)
monkeypatch.setattr(GenerateDataQualityCheckExpectationsAction, "_get_metrics", mock_metrics)


@pytest.fixture
def mock_multi_asset_success_and_failure(monkeypatch, mock_metrics_list: list[TableMetric]):
failing_asset_id = uuid.UUID("00000000-0000-0000-0000-000000000001")

def mock_data_asset(self, event: GenerateSchemaChangeExpectationsEvent, asset_name: str):
def mock_data_asset(self, event: GenerateDataQualityCheckExpectationsEvent, asset_name: str):
if "retrieve-fail" in asset_name:
raise RuntimeError(f"Failed to retrieve asset: {asset_name}") # noqa: TRY003 # following pattern in code
elif "schema-fail" in asset_name:
Expand Down Expand Up @@ -115,12 +117,14 @@ def mock_schema_change_expectation(self, metric_run: MetricRun, asset_id: uuid.U
return uuid.uuid4()

monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction, "_retrieve_asset_from_asset_name", mock_data_asset
GenerateDataQualityCheckExpectationsAction,
"_retrieve_asset_from_asset_name",
mock_data_asset,
)
monkeypatch.setattr(GenerateSchemaChangeExpectationsAction, "_get_metrics", mock_metrics)
monkeypatch.setattr(GenerateDataQualityCheckExpectationsAction, "_get_metrics", mock_metrics)
monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction,
"_add_schema_change_expectation",
GenerateDataQualityCheckExpectationsAction,
"_add_data_quality_check_expectation",
mock_schema_change_expectation,
)

Expand All @@ -146,7 +150,7 @@ def test_generate_schema_change_expectations_action_success(
mock_metric_repository = mocker.Mock(spec=MetricRepository)
mock_batch_inspector = mocker.Mock(spec=BatchInspector)

action = GenerateSchemaChangeExpectationsAction(
action = GenerateDataQualityCheckExpectationsAction(
context=mock_context,
metric_repository=mock_metric_repository,
batch_inspector=mock_batch_inspector,
Expand All @@ -157,15 +161,15 @@ def test_generate_schema_change_expectations_action_success(

# run the action
mocker.patch(
f"{GenerateSchemaChangeExpectationsAction.__module__}.{GenerateSchemaChangeExpectationsAction.__name__}._add_autogenerated_expectation_to_asset",
f"{GenerateDataQualityCheckExpectationsAction.__module__}.{GenerateDataQualityCheckExpectationsAction.__name__}._add_autogenerated_expectation_to_asset",
return_value=uuid.uuid4(),
)
mock_add_autogenerated_expectation_to_asset = mocker.spy(
GenerateSchemaChangeExpectationsAction, "_add_autogenerated_expectation_to_asset"
GenerateDataQualityCheckExpectationsAction, "_add_autogenerated_expectation_to_asset"
)
return_value = action.run(
event=GenerateSchemaChangeExpectationsEvent(
type="generate_schema_change_expectations_request.received",
event=GenerateDataQualityCheckExpectationsEvent(
type="generate_data_quality_check_expectations_request.received",
organization_id=uuid.uuid4(),
datasource_name="test-datasource",
data_assets=data_asset_names,
Expand All @@ -176,7 +180,7 @@ def test_generate_schema_change_expectations_action_success(

# assert
assert len(return_value.created_resources) == expected_created_resources
assert return_value.type == "generate_schema_change_expectations_request.received"
assert return_value.type == "generate_data_quality_check_expectations_request.received"
mock_add_autogenerated_expectation_to_asset.assert_called()
mock_add_autogenerated_expectation_to_asset.assert_called_with(
expectation=gx_expectations.ExpectTableColumnsToMatchSet(
Expand Down Expand Up @@ -242,7 +246,7 @@ def test_succeeding_and_failing_assets_together(
mock_metric_repository = mocker.Mock(spec=MetricRepository)
mock_batch_inspector = mocker.Mock(spec=BatchInspector)

action = GenerateSchemaChangeExpectationsAction(
action = GenerateDataQualityCheckExpectationsAction(
context=mock_context,
metric_repository=mock_metric_repository,
batch_inspector=mock_batch_inspector,
Expand All @@ -252,10 +256,10 @@ def test_succeeding_and_failing_assets_together(
)

# run the action
with pytest.raises(PartialSchemaChangeExpectationError) as e:
with pytest.raises(PartialGenerateDataQualityCheckExpectationError) as e:
action.run(
event=GenerateSchemaChangeExpectationsEvent(
type="generate_schema_change_expectations_request.received",
event=GenerateDataQualityCheckExpectationsEvent(
type="generate_data_quality_check_expectations_request.received",
organization_id=uuid.uuid4(),
datasource_name="test-datasource",
data_assets=succeeding_data_asset_names + failing_data_asset_names,
Expand All @@ -273,4 +277,4 @@ def test_succeeding_and_failing_assets_together(


if __name__ == "__main__":
print(GenerateSchemaChangeExpectationsEvent.__module__)
print(GenerateDataQualityCheckExpectationsEvent.__module__)
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import sqlalchemy as sa
from great_expectations import ExpectationSuite, ValidationDefinition

from great_expectations_cloud.agent.actions.generate_schema_change_expectations_action import (
GenerateSchemaChangeExpectationsAction,
from great_expectations_cloud.agent.actions.generate_data_quality_check_expectations_action import (
GenerateDataQualityCheckExpectationsAction,
)
from great_expectations_cloud.agent.models import (
GenerateSchemaChangeExpectationsEvent,
GenerateDataQualityCheckExpectationsEvent,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -107,14 +107,14 @@ def test_running_schema_change_expectation_action(
token_env_var_local: str,
seed_and_cleanup_test_data,
):
generate_schema_change_expectations_event = GenerateSchemaChangeExpectationsEvent(
type="generate_schema_change_expectations_request.received",
generate_schema_change_expectations_event = GenerateDataQualityCheckExpectationsEvent(
type="generate_data_quality_check_expectations_request.received",
datasource_name="local_mercury_db",
data_assets=["local-mercury-db-checkpoints-table"],
organization_id=uuid.UUID(org_id_env_var_local),
)

action = GenerateSchemaChangeExpectationsAction(
action = GenerateDataQualityCheckExpectationsAction(
context=context,
base_url=cloud_base_url,
organization_id=uuid.UUID(org_id_env_var_local),
Expand Down

0 comments on commit c1a19f3

Please sign in to comment.