diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index fd767a7b45..154b559ac8 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -115,6 +115,10 @@ class ExecutorConfig: user_script_args: List[str] = () # Environment variables for the user script user_script_env: dict = None + # Invoke validations before upload. + invoke_import_validation: bool = False + # Import validation config file. + validation_config_file: str = 'tools/import_validation/validation_config.json' # Maximum time venv creation can take in seconds. venv_create_timeout: float = 3600 # Maximum time downloading a file can take in seconds. @@ -125,8 +129,10 @@ class ExecutorConfig: email_account: str = '' # The corresponding password, app password, or access token. email_token: str = '' - # Disbale email alert notifications. + # Disable email alert notifications. disable_email_notifications: bool = False + # Skip uploading the data to GCS (for local testing). + skip_gcs_upload: bool = False # Maximum time a blocking call to the importer to # perform an import can take in seconds. importer_import_timeout: float = 20 * 60 @@ -134,8 +140,8 @@ class ExecutorConfig: # delete an import can take in seconds. importer_delete_timeout: float = 10 * 60 # Executor type depends on where the executor runs - # Suppports one of: "GKE", "GAE" - executor_type: str = 'GAE' + # Suppports one of: "GKE", "GAE", "CLOUD_RUN" + executor_type: str = 'CLOUD_RUN' def get_data_refresh_config(self): """Returns the config used for Cloud Scheduler data refresh jobs.""" diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 683d70af8a..85bb97afaa 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -17,15 +17,26 @@ """ import dataclasses +import glob import json import logging import os +import sys import subprocess import tempfile import time import traceback from typing import Callable, Dict, Iterable, List, Optional, Tuple +REPO_DIR = os.path.dirname( + os.path.dirname( + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) +sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_differ')) +sys.path.append(os.path.join(REPO_DIR, 'tools', 'import_validation')) + +from import_differ import ImportDiffer +from import_validation import ImportValidation from app import configs from app import utils from app.executor import cloud_run_simple_import @@ -34,6 +45,7 @@ from app.service import file_uploader from app.service import github_api from app.service import import_service +from google.cloud import storage # Email address for status messages. _DEBUG_EMAIL_ADDR = 'datacommons-debug+imports@google.com' @@ -317,6 +329,97 @@ def _import_one( ) raise exc + def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, + absolute_import_dir: str, + import_spec: dict) -> None: + """ + Performs validations on import data. + """ + import_inputs = import_spec.get('import_inputs', []) + for import_input in import_inputs: + mcf_path = import_input['node_mcf'] + if not mcf_path: + # TODO: Generate node mcf using dc-import tool + logging.error( + 'Empty node_mcf in manifest, skipping validation.') + current_data_path = os.path.join(absolute_import_dir, mcf_path) + previous_data_path = os.path.join(absolute_import_dir, + 'previous_data.mcf') + summary_stats = os.path.join(absolute_import_dir, + 'summary_report.csv') + validation_output_path = os.path.join(absolute_import_dir, + 'validation') + config_file = import_spec.get('validation_config_file', '') + if not config_file: + config_file = self.config.validation_config_file + config_file_path = os.path.join(REPO_DIR, config_file) + logging.info(f'Validation config file: {config_file_path}') + + # Download previous import data. + bucket = storage.Client(self.config.gcs_project_id).bucket( + self.config.storage_prod_bucket_name) + folder = relative_import_dir + '/' + import_spec['import_name'] + '/' + blob = bucket.blob(folder + 'latest_version.txt') + if not blob: + logging.error( + f'Not able to download latest_version.txt from {folder}, skipping validation.' + ) + return + latest_version = blob.download_as_text() + blob = bucket.blob(folder + latest_version + '/' + mcf_path) + if not blob: + logging.error( + f'Not able to download previous import from {latest_version}, skipping validation.' + ) + return + # blob.download_to_filename(previous_data_path) + + # Invoke differ script. + differ = ImportDiffer(current_data_path, previous_data_path, + validation_output_path) + differ.run_differ() + + # Invoke validation script. + validation_output = os.path.join(validation_output_path, + 'validation_output.csv') + differ_output = os.path.join(validation_output_path, + 'point_analysis_summary.csv') + validation = ImportValidation(config_file_path, differ_output, + summary_stats, validation_output) + validation.run_validations() + + def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict, + version: str, interpreter_path: str, + process: subprocess.CompletedProcess) -> None: + script_paths = import_spec.get('scripts') + for path in script_paths: + script_path = os.path.join(absolute_import_dir, path) + simple_job = cloud_run_simple_import.get_simple_import_job_id( + import_spec, script_path) + if simple_job: + # Running simple import as cloud run job. + cloud_run_simple_import.cloud_run_simple_import_job( + import_spec=import_spec, + config_file=script_path, + env=self.config.user_script_env, + version=version, + image=import_spec.get('image'), + ) + else: + # Run import script locally. + script_interpreter = _get_script_interpreter( + script_path, interpreter_path) + process = _run_user_script( + interpreter_path=script_interpreter, + script_path=script_path, + timeout=self.config.user_script_timeout, + args=self.config.user_script_args, + cwd=absolute_import_dir, + env=self.config.user_script_env, + ) + _log_process(process=process) + process.check_returncode() + def _import_one_helper( self, repo_dir: str, @@ -350,35 +453,23 @@ def _import_one_helper( _log_process(process=process) process.check_returncode() - script_paths = import_spec.get('scripts') - for path in script_paths: - script_path = os.path.join(absolute_import_dir, path) - simple_job = cloud_run_simple_import.get_simple_import_job_id( - import_spec, script_path) - if simple_job: - # Running simple import as cloud run job. - cloud_run_simple_import.cloud_run_simple_import_job( - import_spec=import_spec, - config_file=script_path, - env=self.config.user_script_env, - version=version, - image=import_spec.get('image'), - ) - else: - # Run import script locally. - script_interpreter = _get_script_interpreter( - script_path, interpreter_path) - process = _run_user_script( - interpreter_path=script_interpreter, - script_path=script_path, - timeout=self.config.user_script_timeout, - args=self.config.user_script_args, - cwd=absolute_import_dir, - env=self.config.user_script_env, - name=import_name, - ) - _log_process(process=process) - process.check_returncode() + self._invoke_import_job(absolute_import_dir=absolute_import_dir, + import_spec=import_spec, + version=version, + interpreter_path=interpreter_path, + process=process) + + if self.config.invoke_import_validation: + logging.info("Invoking import validations") + self._invoke_import_validation( + repo_dir=repo_dir, + relative_import_dir=relative_import_dir, + absolute_import_dir=absolute_import_dir, + import_spec=import_spec) + + if self.config.skip_gcs_upload: + logging.info("Skipping GCS upload") + return inputs = self._upload_import_inputs( import_dir=absolute_import_dir, @@ -387,6 +478,14 @@ def _import_one_helper( import_spec=import_spec, ) + validation_output_path = os.path.join(absolute_import_dir, 'validation') + for filepath in glob.iglob(f'{validation_output_path}/*.csv'): + dest = f'{relative_import_dir}/{import_name}/{version}/validation/{os.path.basename(filepath)}' + self.uploader.upload_file( + src=filepath, + dest=dest, + ) + if self.importer: self.importer.delete_previous_output(relative_import_dir, import_spec) diff --git a/import-automation/executor/main.py b/import-automation/executor/main.py index 5801748823..8f22945ccb 100644 --- a/import-automation/executor/main.py +++ b/import-automation/executor/main.py @@ -1,3 +1,19 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Import executor entrypoint. +""" import logging import json @@ -16,10 +32,12 @@ def scheduled_updates(absolute_import_name: str, import_config: str): + """ + Invokes import update workflow. + """ logging.info(absolute_import_name) cfg = json.loads(import_config) config = configs.ExecutorConfig(**cfg) - logging.info(config) executor = import_executor.ImportExecutor( uploader=file_uploader.GCSFileUploader( project_id=config.gcs_project_id, diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index 956b49e547..5555b7cdf3 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -12,3 +12,4 @@ gunicorn pytz absl-py croniter +pandas diff --git a/tools/import_differ/import_differ.py b/tools/import_differ/import_differ.py index 4afab7e40a..21e659ba74 100644 --- a/tools/import_differ/import_differ.py +++ b/tools/import_differ/import_differ.py @@ -23,6 +23,10 @@ import differ_utils +SAMPLE_COUNT = 3 +GROUPBY_COLUMNS = 'variableMeasured,observationAbout,observationDate,measurementMethod,unit,observationPeriod' +VALUE_COLUMNS = 'value,scalingFactor' + FLAGS = flags.FLAGS flags.DEFINE_string( 'current_data', '', 'Path to the current MCF data \ @@ -34,15 +38,12 @@ 'Path to the output data folder.') flags.DEFINE_string( - 'groupby_columns', - 'variableMeasured,observationAbout,observationDate,measurementMethod,unit', + 'groupby_columns', GROUPBY_COLUMNS, 'Columns to group data for diff analysis in the order (var,place,time etc.).' ) -flags.DEFINE_string('value_columns', 'value,scalingFactor', +flags.DEFINE_string('value_columns', VALUE_COLUMNS, 'Columns with statvar value for diff analysis.') -SAMPLE_COUNT = 3 - class ImportDiffer: """ @@ -69,8 +70,12 @@ class ImportDiffer: """ - def __init__(self, current_data, previous_data, output_location, - groupby_columns, value_columns): + def __init__(self, + current_data, + previous_data, + output_location, + groupby_columns=GROUPBY_COLUMNS, + value_columns=VALUE_COLUMNS): self.current_data = current_data self.previous_data = previous_data self.output_location = output_location @@ -89,8 +94,8 @@ def _cleanup_data(self, df: pd.DataFrame): def _get_samples(self, row): years = sorted(row[self.time_column]) if len(years) > SAMPLE_COUNT: - return years[0] + random.sample(years[1:-1], - SAMPLE_COUNT - 2) + years[-1] + return [years[0]] + random.sample(years[1:-1], + SAMPLE_COUNT - 2) + [years[-1]] else: return years @@ -213,8 +218,8 @@ def series_analysis(self, return summary, result def run_differ(self): - if not os.path.exists(FLAGS.output_location): - os.makedirs(FLAGS.output_location) + if not os.path.exists(self.output_location): + os.makedirs(self.output_location) logging.info('Loading data...') current_df = differ_utils.load_data(self.current_data, self.output_location) diff --git a/tools/import_validation/import_validation.py b/tools/import_validation/import_validation.py index 95f0286ccb..8dee351aeb 100644 --- a/tools/import_validation/import_validation.py +++ b/tools/import_validation/import_validation.py @@ -27,7 +27,9 @@ flags.DEFINE_string('differ_output_location', '.', 'Path to the differ output data folder.') flags.DEFINE_string('stats_summary_location', '.', - 'Path to the stats summary report.') + 'Path to the stats summary report folder.') +flags.DEFINE_string('validation_output_location', '.', + 'Path to the validation output folder.') POINT_ANALAYSIS_FILE = 'point_analysis_summary.csv' STATS_SUMMARY_FILE = 'summary_report.csv' @@ -67,8 +69,8 @@ class ImportValidation: Sample config and output files can be found in test folder. """ - def __init__(self, config_file: str, differ_output: str, - stats_summary: str): + def __init__(self, config_file: str, differ_output: str, stats_summary: str, + validation_output: str): logging.info('Reading config from %s', config_file) self.differ_results = pd.read_csv(differ_output) self.validation_map = { @@ -77,6 +79,7 @@ def __init__(self, config_file: str, differ_output: str, Validation.DELETED_COUNT: self._deleted_count_validation, Validation.UNMODIFIED_COUNT: self._unmodified_count_validation } + self.validation_output = validation_output self.validation_result = [] with open(config_file, encoding='utf-8') as fd: self.validation_config = json.load(fd) @@ -114,7 +117,7 @@ def _run_validation(self, config) -> ValidationResult: return ValidationResult('FAILED', config['validation'], repr(exc)) def run_validations(self): - output_file = open(VALIDATION_OUTPUT_FILE, mode='w', encoding='utf-8') + output_file = open(self.validation_output, mode='w', encoding='utf-8') output_file.write('test,status,message\n') for config in self.validation_config: result = self._run_validation(config) @@ -128,7 +131,8 @@ def main(_): validation = ImportValidation( FLAGS.config_file, os.path.join(FLAGS.differ_output_location, POINT_ANALAYSIS_FILE), - os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE)) + os.path.join(FLAGS.stats_summary_location, STATS_SUMMARY_FILE), + os.paht.join(FLAGS.validation_output_location, VALIDATION_OUTPUT_FILE)) validation.run_validations() diff --git a/tools/import_validation/validation_config.json b/tools/import_validation/validation_config.json new file mode 100644 index 0000000000..21daba858b --- /dev/null +++ b/tools/import_validation/validation_config.json @@ -0,0 +1,15 @@ +[ + { + "validation": "DELETED_COUNT", + "threshold": 1 + }, + { + "validation": "MODIFIED_COUNT" + }, + { + "validation": "ADDED_COUNT" + }, + { + "validation": "UNMODIFIED_COUNT" + } +] diff --git a/tools/import_validation/validation_output.csv b/tools/import_validation/validation_output.csv deleted file mode 100644 index 45763a8679..0000000000 --- a/tools/import_validation/validation_output.csv +++ /dev/null @@ -1,5 +0,0 @@ -test,status,message -DELETED_COUNT,PASSED, -MODIFIED_COUNT,PASSED, -ADDED_COUNT,PASSED, -UNMODIFIED_COUNT,FAILED,AssertionError('Validation failed: UNMODIFIED_COUNT')