Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Data docs get deleted sporadically #10765

Open
ekeras opened this issue Dec 12, 2024 · 0 comments
Open

[BUG] Data docs get deleted sporadically #10765

ekeras opened this issue Dec 12, 2024 · 0 comments

Comments

@ekeras
Copy link

ekeras commented Dec 12, 2024

Describe the bug
Data docs get deleted from the Google Cloud Storage sporadically.

To Reproduce
I haven't found a way how to reproduce this. It can happen after 4 runs, 6 runs, 2 runs, 10 runs, I can never tell.

It calculates validations correctly and to my understanding it fails when it tries to update data docs on UpdateDataDocsAction.

I am attaching my code of the workflow. Basically, we have framework which extracts data, transforms it and then before putting it into the bucket, it validates the data.

class GreatExpectationsValidator(BaseValidator):

    def __init__(self, session: SparkSession, params: dict[str, str]):
        super().__init__(session, params)

        self._data_source = self._params['data_source']
        self._data_name = self._params['data_name']
        self._project = self._params['project']
        self._bucket = self._params['bucket']
        self._prefix = self._params['reports_prefix']

        self._context = self._create_context()
        target = f"{self._data_source}-{self._data_name}"
        self._batch_name = f"batch_definition_{target}"
        self._suite_name = f"expectation_suite_{target}"
        self._validation_name = f"validation_definition_{target}"
        self._checkpoint_name = f"checkpoint_{target}"

    def create_expectations(self) -> list[Expectation]:
        raise NotImplementedError("Method create_expectations must be implemented in child class")

    def _create_context(self):

        context_config = DataContextConfig(
            store_backend_defaults=GCSStoreBackendDefaults(
                default_project_name=self._project,
                default_bucket_name=self._bucket,
                expectations_store_prefix=f"{self._prefix}/expectations",
                data_docs_prefix=f"{self._prefix}/data_docs",
                checkpoint_store_prefix=f"{self._prefix}/checkpoints",
                validation_results_store_prefix=f"{self._prefix}/validations",
                validation_definition_store_prefix=f"{self._prefix}/validation_definitions",
            )
        )

        context = gx.get_context(project_config=context_config, mode="file")
        return context

    def _create_batch(self):

        data_source = self._context.data_sources.add_or_update_spark(name=self._data_source)
        data_asset = data_source.add_dataframe_asset(name=self._data_name)

        batch_definition = data_asset.add_batch_definition_whole_dataframe(
            self._batch_name
        )
        return batch_definition

    def _create_expectations_suite(self):

        try:
            self._context.suites.delete(self._suite_name)
        except DataContextError as e:
            _logger.info(f"Expectation suite with name {self._suite_name} does not exist yet: {e}")

        suite = gx.ExpectationSuite(name=self._suite_name, id=self._suite_name)
        for expectation in self.create_expectations():
            suite.add_expectation(expectation)
        self._context.suites.add(suite)

    def _create_action_list(self):

        action_list = []

        if 'slack_channel' in self._params:
            action_list.append(
                SlackNotificationAction(
                    name="send_slack_notification_on_failed_expectations",
                    slack_token=self._params['slack_token'],
                    slack_channel=self._params['slack_channel'],
                    notify_on="failure",
                    show_failed_expectations=True,
                ),
            )

        action_list.append(
            UpdateDataDocsAction(
                name="update_all_data_docs",
            ),
        )
        return action_list

    def _create_validation(self, batch: Batch):

        try:
            validation_definition = self._context.validation_definitions.get(self._validation_name)
        except DataContextError as e:
            _logger.info(
                "Could not find an existing validation definition with name"
                f"{self._validation_name}: {e}.\n"
                "Creating new validation definition."
            )
            validation_definition = gx.ValidationDefinition(
                data=batch,
                suite=self._context.suites.get(self._suite_name),
                name=self._validation_name,
            )
            self._context.validation_definitions.add(validation_definition)

    def _create_checkpoint(self):

        action_list = self._create_action_list()
        try:
            self._context.checkpoints.delete(self._checkpoint_name)
        except DataContextError as e:
            _logger.info(f"Checkpoint with name {self._checkpoint_name} does not exist yet: {e}")

        checkpoint = gx.Checkpoint(
            id=self._checkpoint_name,
            name=self._checkpoint_name,
            validation_definitions=[
                self._context.validation_definitions.get(self._validation_name)
            ],
            actions=action_list,
            result_format={"result_format": "COMPLETE"},
        )
        self._context.checkpoints.add(checkpoint)

    def validate(self, df: DataFrame):
        batch_definition = self._create_batch()
        self._create_expectations_suite()
        self._create_validation(batch_definition)
        self._create_checkpoint()

        run_results = self._context.checkpoints.get(self._checkpoint_name).run(
            batch_parameters={"dataframe": df},
            run_id=RunIdentifier(
                run_name=self._job_id,
                run_time=datetime.now(),
            ),
        ).run_results

        success_percent = list(run_results.values())[0]['statistics']['success_percent']
        if success_percent < self._pass_success_percent:
            raise ValidationException(f"Validation failed. Success percent: {success_percent}")

Expected behavior
Data docs doesn't get deleted.

Environment (please complete the following information):

  • Operating System: MacOS, Linux
  • Great Expectations Version: 1.2.5
  • Data Source: Spark Dataframe
  • Cloud environment: Google Cloud

Additional context
https://greatexpectationstalk.slack.com/archives/CUTCNHN82/p1733301818679899

@ekeras ekeras changed the title Data docs get deleted sporadically [BUG] Data docs get deleted sporadically Dec 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant