From 7bb5d6b5279fd1d63d964700d851cda858dba97e Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Mon, 8 Sep 2025 13:38:54 +0000 Subject: [PATCH] Update import status in spanner --- import-automation/executor/app/configs.py | 4 +++ .../executor/app/executor/import_executor.py | 30 ++++++++++++++++--- import-automation/executor/requirements.txt | 1 + 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index d302cff5bb..db43c6480f 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -42,6 +42,10 @@ class ExecutorConfig: # Name of the Cloud Storage bucket to store the generated data files # for importing to prod. storage_prod_bucket_name: str = 'datcom-prod-imports' + # Spanner instance details for import status. + spanner_project_id: str = 'datcom-store' + spanner_instance_id: str = 'dc-kg-test' + spanner_database_id: str = 'dc_graph_import' # Name of the Cloud Storage bucket that the Data Commons importer # outputs to. storage_importer_bucket_name: str = 'resolved_mcf' diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 7ef7d67095..c65261f8a3 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -29,6 +29,7 @@ import time import traceback from typing import Callable, Dict, Iterable, List, Optional, Tuple +from google.cloud import spanner REPO_DIR = os.path.dirname( os.path.dirname( @@ -509,7 +510,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, if self.config.invoke_differ_tool and latest_version and len( file_util.file_get_matching(previous_data_path)) > 0: logging.info( - f'Invoking differ tool comparing {import_prefix} with {latest_version}...' + f'Invoking differ tool comparing {import_prefix} with {latest_version}' ) timer = Timer() differ = ImportDiffer(current_data=current_data_path, @@ -644,7 +645,25 @@ def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict, }) process.check_returncode() - @log_function_call + def _update_import_status_table(self, import_name: str, + gcs_path: str) -> None: + """Updates import job status table in spanner.""" + logging.info(f'Updating {import_name} status in spanner.') + spanner_client = spanner.Client( + project=self.config.spanner_project_id, + client_options={'quota_project_id': self.config.spanner_project_id}) + instance = spanner_client.instance(self.config.spanner_instance_id) + database = instance.database(self.config.spanner_database_id) + with database.batch() as batch: + batch.insert_or_update(table="ImportStatus", + columns=("ImportName", "LatestVersion", + "State", "JobId", + "UpdateTimestamp"), + values=[(import_name, gcs_path, "PENDING", + os.getenv('BATCH_JOB_UID'), + spanner.COMMIT_TIMESTAMP)]) + logging.info(f'Updated {import_name} status in spanner.') + def _update_latest_version(self, version, output_dir, import_spec): logging.info(f'Updating import latest version {version}') self.uploader.upload_string( @@ -664,6 +683,9 @@ def _update_latest_version(self, version, output_dir, import_spec): self.uploader.upload_string('\n'.join(versions_history), history_filename) logging.info(f'Updated import latest version {version}') + gcs_path = os.path.join(self.config.storage_prod_bucket_name, + output_dir, version, '*', 'validation') + self._update_import_status_table(import_spec['import_name'], gcs_path) @log_function_call def _import_one_helper( @@ -756,8 +778,7 @@ def _import_one_helper( import_spec) else: logging.warning( - "Skipping latest version update due to skip_gcs_upload." - ) + "Skipping latest version update as per import config.") else: logging.error( "Skipping latest version update due to validation failure.") @@ -786,6 +807,7 @@ def _import_one_helper( block=True, timeout=self.config.importer_import_timeout, ) + logging.info(f'Import workflow completed successfully!') @log_function_call def _upload_import_inputs(self, import_dir: str, output_dir: str, diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index ce00fbd330..c45a694fa8 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -22,6 +22,7 @@ google-cloud-bigquery google-cloud-bigquery-storage google-cloud-datastore google-cloud-run +google-cloud-spanner google-cloud-storage google-cloud-logging google-cloud-scheduler