Skip to content

Commit

Permalink
Create expectation with autogenerated flag set to true (#583)
Browse files Browse the repository at this point in the history
Co-authored-by: Great Expectations <[email protected]>
  • Loading branch information
anthonyburdi and tiny-tim-bot authored Dec 9, 2024
1 parent 4498ebb commit 4e5aae0
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Final
from urllib.parse import urljoin
from uuid import UUID

import great_expectations.expectations as gx_expectations
from great_expectations.exceptions import DataContextError
from great_expectations.core.http import create_session
from great_expectations.experimental.metric_repository.batch_inspector import (
BatchInspector,
)
Expand Down Expand Up @@ -74,15 +76,15 @@ def run(self, event: GenerateSchemaChangeExpectationsEvent, id: str) -> ActionRe
try:
data_asset = self._retrieve_asset_from_asset_name(event, asset_name)
metric_run, metric_run_id = self._get_metrics(data_asset)
expectation_suite_name = event.data_asset_to_expectation_suite_name[asset_name]
expectation = self._add_schema_change_expectation(
metric_run, expectation_suite_name

expectation_id = self._add_schema_change_expectation(
metric_run=metric_run, asset_id=data_asset.id
)
created_resources.append(
CreatedResource(resource_id=str(metric_run_id), type="MetricRun")
)
created_resources.append(
CreatedResource(resource_id=expectation.id, type="Expectation")
CreatedResource(resource_id=str(expectation_id), type="Expectation")
)
except Exception:
assets_with_errors.append(asset_name)
Expand Down Expand Up @@ -127,26 +129,37 @@ def _get_metrics(self, data_asset: DataAsset) -> tuple[MetricRun, UUID]:

return metric_run, metric_run_id

def _add_schema_change_expectation(
self, metric_run: MetricRun, expectation_suite_name: str
) -> gx_expectations.Expectation:
try:
expectation_suite = self._context.suites.get(name=expectation_suite_name)
except DataContextError as e:
raise RuntimeError( # noqa: TRY003 # want to keep this informative for now
f"Expectation Suite with name {expectation_suite_name} was not found."
) from e
def _add_schema_change_expectation(self, metric_run: MetricRun, asset_id: UUID) -> UUID:
expectation = gx_expectations.ExpectTableColumnsToMatchSet(
column_set=metric_run.metrics[0].value
)
expectation_id = self._add_autogenerated_expectation_to_asset(
expectation=expectation, asset_id=asset_id
)
return expectation_id

def _add_autogenerated_expectation_to_asset(
self, expectation: gx_expectations.Expectation, asset_id: UUID
) -> UUID:
url = urljoin(
base=self._base_url,
url=f"/api/v1/organizations/{self._organization_id}/expectations/{asset_id}",
)

try:
expectation = expectation_suite.add_expectation(
expectation=gx_expectations.ExpectTableColumnsToMatchSet(
column_set=metric_run.metrics[0].value
)
)
expectation_suite.save()
except Exception as e:
raise RuntimeError(f"Failed to add expectation to suite: {e}") from e # noqa: TRY003 # want to keep this informative for now
return expectation
expectation_payload = expectation.configuration.to_json_dict()
expectation_payload["autogenerated"] = True

# Backend expects `expectation_type` instead of `type`:
expectation_type = expectation_payload.pop("type")
expectation_payload["expectation_type"] = expectation_type

with create_session(access_token=self._auth_key) as session:
response = session.post(url=url, json=expectation_payload)

if response.status_code != HTTPStatus.CREATED:
message = f"Failed to add autogenerated expectation: {expectation_type}"
raise GXAgentError(message)
return UUID(response.json()["data"]["id"])

def _raise_on_any_metric_exception(self, metric_run: MetricRun) -> None:
if any(metric.exception for metric in metric_run.metrics):
Expand Down
1 change: 1 addition & 0 deletions great_expectations_cloud/agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class GenerateSchemaChangeExpectationsEvent(EventBase):
)
datasource_name: str
data_assets: Sequence[str]
# TODO: Remove data_asset_to_expectation_suite_name from the event
data_asset_to_expectation_suite_name: Dict[str, str]
create_expectations: bool = False

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "great_expectations_cloud"
version = "20241209.0.dev0"
version = "20241209.0.dev1"
description = "Great Expectations Cloud"
authors = ["The Great Expectations Team <[email protected]>"]
repository = "https://github.com/great-expectations/cloud"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,43 @@ def mock_metrics_list() -> list[TableMetric]:
]


TABLE_ASSET_ID = uuid.uuid4()


# https://docs.pytest.org/en/7.1.x/how-to/monkeypatch.html
@pytest.fixture
def mock_response_success(monkeypatch, mock_metrics_list: list[TableMetric]):
def mock_data_asset(self, event: GenerateSchemaChangeExpectationsEvent, asset_name: str):
return TableAsset(
id=TABLE_ASSET_ID,
name="test-data-asset",
table_name="test_table",
schema_name="test_schema",
)

def mock_metrics(self, data_asset: DataAsset):
return MetricRun(metrics=mock_metrics_list)

def mock_schema_change_expectation(self, metric_run: MetricRun, expectation_suite_name: str):
return gx_expectations.ExpectTableColumnsToMatchSet(
column_set=["col1", "col2"], id=str(uuid.uuid4())
)
return MetricRun(metrics=mock_metrics_list), uuid.uuid4()

monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction, "_retrieve_asset_from_asset_name", mock_data_asset
)
monkeypatch.setattr(GenerateSchemaChangeExpectationsAction, "_get_metrics", mock_metrics)
monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction,
"_add_schema_change_expectation",
mock_schema_change_expectation,
)


@pytest.fixture
def mock_multi_asset_success_and_failure(monkeypatch, mock_metrics_list: list[TableMetric]):
failing_asset_id = uuid.UUID("00000000-0000-0000-0000-000000000001")

def mock_data_asset(self, event: GenerateSchemaChangeExpectationsEvent, asset_name: str):
if "retrieve-fail" in asset_name:
raise RuntimeError(f"Failed to retrieve asset: {asset_name}") # noqa: TRY003 # following pattern in code
elif "schema-fail" in asset_name:
return TableAsset(
id=failing_asset_id,
name=asset_name,
table_name="test_table",
schema_name="test_schema",
)
else:
return TableAsset(
name=asset_name,
Expand All @@ -102,15 +105,14 @@ def mock_metrics(self, data_asset: DataAsset):
else:
return MetricRun(metrics=mock_metrics_list)

def mock_schema_change_expectation(self, metric_run: MetricRun, expectation_suite_name: str):
def mock_schema_change_expectation(self, metric_run: MetricRun, asset_id: uuid.UUID):
# The data asset name is contained in the expectation_suite_name
# Here we are simulating a failure to add an expectation to the suite, for suite names that contain "schema-fail"
if "schema-fail" in expectation_suite_name:
raise RuntimeError("Failed to add expectation to suite: test-suite") # noqa: TRY003 # following pattern in code
if asset_id == failing_asset_id:
error_msg = "Failed to add autogenerated expectation: expect_table_columns_to_match_set"
raise RuntimeError(error_msg)
else:
return gx_expectations.ExpectTableColumnsToMatchSet(
column_set=["col1", "col2"], id=str(uuid.uuid4())
)
return uuid.uuid4()

monkeypatch.setattr(
GenerateSchemaChangeExpectationsAction, "_retrieve_asset_from_asset_name", mock_data_asset
Expand Down Expand Up @@ -142,7 +144,7 @@ def test_generate_schema_change_expectations_action_success(
mock_context: CloudDataContext,
mocker: MockerFixture,
data_asset_names,
data_assets_to_expectation_suite_names,
data_assets_to_expectation_suite_names, # TODO: Remove data_asset_to_expectation_suite_name from the event
expected_created_resources,
):
# setup
Expand All @@ -159,12 +161,20 @@ def test_generate_schema_change_expectations_action_success(
)

# run the action
mocker.patch(
f"{GenerateSchemaChangeExpectationsAction.__module__}.{GenerateSchemaChangeExpectationsAction.__name__}._add_autogenerated_expectation_to_asset",
return_value=uuid.uuid4(),
)
mock_add_autogenerated_expectation_to_asset = mocker.spy(
GenerateSchemaChangeExpectationsAction, "_add_autogenerated_expectation_to_asset"
)
return_value = action.run(
event=GenerateSchemaChangeExpectationsEvent(
type="generate_schema_change_expectations_request.received",
organization_id=uuid.uuid4(),
datasource_name="test-datasource",
data_assets=data_asset_names,
# TODO: Remove data_asset_to_expectation_suite_name from the event
data_asset_to_expectation_suite_name=data_assets_to_expectation_suite_names,
create_expectations=True,
),
Expand All @@ -174,6 +184,13 @@ def test_generate_schema_change_expectations_action_success(
# assert
assert len(return_value.created_resources) == expected_created_resources
assert return_value.type == "generate_schema_change_expectations_request.received"
mock_add_autogenerated_expectation_to_asset.assert_called()
mock_add_autogenerated_expectation_to_asset.assert_called_with(
expectation=gx_expectations.ExpectTableColumnsToMatchSet(
column_set=["col1", "col2"],
),
asset_id=TABLE_ASSET_ID,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -263,6 +280,7 @@ def test_succeeding_and_failing_assets_together(
organization_id=uuid.uuid4(),
datasource_name="test-datasource",
data_assets=succeeding_data_asset_names + failing_data_asset_names,
# TODO: Remove data_asset_to_expectation_suite_name from the event
data_asset_to_expectation_suite_name=suceeding_data_assets_to_expectation_suite_names,
create_expectations=True,
),
Expand All @@ -275,3 +293,7 @@ def test_succeeding_and_failing_assets_together(
assert error_message_footer in str(e.value)
for asset_name in failing_data_asset_names:
assert asset_name in str(e.value)


if __name__ == "__main__":
print(GenerateSchemaChangeExpectationsEvent.__module__)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from typing import TYPE_CHECKING

import pytest
from great_expectations import ExpectationSuite
import sqlalchemy as sa
from great_expectations import ExpectationSuite, ValidationDefinition

from great_expectations_cloud.agent.actions.generate_schema_change_expectations_action import (
GenerateSchemaChangeExpectationsAction,
Expand All @@ -16,8 +17,6 @@

if TYPE_CHECKING:
from great_expectations.data_context import CloudDataContext
from great_expectations.datasource.fluent import PostgresDatasource
from great_expectations.datasource.fluent.sql_datasource import TableAsset

pytestmark = pytest.mark.integration

Expand All @@ -43,12 +42,51 @@ def seed_and_cleanup_test_data(context: CloudDataContext):

suite = context.suites.add(ExpectationSuite(name="local-mercury-db-checkpoints-table Suite"))

# Create validation
batch_definition = table_data_asset.add_batch_definition_whole_table(
name="local-mercury-db-checkpoints-table-batch-definition"
)
validation = context.validation_definitions.add(
ValidationDefinition(
name="local-mercury-db-checkpoints-table Validation",
suite=suite,
data=batch_definition,
)
)

# Mark the validation as gx_managed
engine = sa.create_engine("postgresql://postgres:postgres@localhost:5432/mercury")
with engine.begin() as conn:
query = f"UPDATE validations SET gx_managed=true WHERE id='{validation.id}'"
conn.execute(sa.text(query))
conn.commit()

# Mark the suite as gx_managed
with engine.begin() as conn:
query = f"UPDATE expectation_suites SET gx_managed=true WHERE id='{suite.id}'"
conn.execute(sa.text(query))
conn.commit()

# Yield
yield table_data_asset, suite

# clean up
data_source.delete_asset(name="local-mercury-db-checkpoints-table")

# Mark the validation as not gx_managed
with engine.begin() as conn:
query = f"UPDATE validations SET gx_managed=false WHERE id='{validation.id}'"
conn.execute(sa.text(query))
conn.commit()

# Mark the suite as not gx_managed
with engine.begin() as conn:
query = f"UPDATE expectation_suites SET gx_managed=false WHERE id='{suite.id}'"
conn.execute(sa.text(query))
conn.commit()

context.validation_definitions.delete(name="local-mercury-db-checkpoints-table Validation")
context.suites.delete(name="local-mercury-db-checkpoints-table Suite")
data_source.delete_asset(name="local-mercury-db-checkpoints-table")


@pytest.fixture
Expand All @@ -66,29 +104,12 @@ def token_env_var_local():
return os.environ.get("GX_CLOUD_ACCESS_TOKEN")


@pytest.fixture
def local_mercury_db_datasource(
context: CloudDataContext,
) -> PostgresDatasource:
datasource_name = "local_mercury_db"
datasource = context.data_sources.get(datasource_name)
yield datasource


@pytest.fixture
def local_mercury_db_organizations_table_asset(
local_mercury_db_datasource: PostgresDatasource,
) -> TableAsset:
data_asset_name = "local-mercury-db-organizations-table"
data_asset = local_mercury_db_datasource.get_asset(name=data_asset_name)
yield data_asset


@pytest.mark.skip(
"This test fails due to a connection error when attempting to call the backend. It is a problem with the test setup, not the code which was demoed, and this test passes locally."
)
def test_running_schema_change_expectation_action(
context: CloudDataContext,
user_api_token_headers_org_admin_sc_org,
local_mercury_db_datasource: PostgresDatasource,
local_mercury_db_organizations_table_asset: TableAsset,
org_id_env_var_local: str,
cloud_base_url_local: str,
token_env_var_local: str,
Expand Down

0 comments on commit 4e5aae0

Please sign in to comment.