Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class ExecutorConfig:
# Name of the Cloud Storage bucket to store the generated data files
# for importing to prod.
storage_prod_bucket_name: str = 'datcom-prod-imports'
# Spanner instance details for import status.
spanner_project_id: str = 'datcom-store'
spanner_instance_id: str = 'dc-kg-test'
spanner_database_id: str = 'dc_graph_import'
# Name of the Cloud Storage bucket that the Data Commons importer
# outputs to.
storage_importer_bucket_name: str = 'resolved_mcf'
Expand Down
30 changes: 26 additions & 4 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import time
import traceback
from typing import Callable, Dict, Iterable, List, Optional, Tuple
from google.cloud import spanner

REPO_DIR = os.path.dirname(
os.path.dirname(
Expand Down Expand Up @@ -509,7 +510,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
if self.config.invoke_differ_tool and latest_version and len(
file_util.file_get_matching(previous_data_path)) > 0:
logging.info(
f'Invoking differ tool comparing {import_prefix} with {latest_version}...'
f'Invoking differ tool comparing {import_prefix} with {latest_version}'
)
timer = Timer()
differ = ImportDiffer(current_data=current_data_path,
Expand Down Expand Up @@ -644,7 +645,25 @@ def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
})
process.check_returncode()

@log_function_call
def _update_import_status_table(self, import_name: str,
gcs_path: str) -> None:
"""Updates import job status table in spanner."""
logging.info(f'Updating {import_name} status in spanner.')
spanner_client = spanner.Client(
project=self.config.spanner_project_id,
client_options={'quota_project_id': self.config.spanner_project_id})
instance = spanner_client.instance(self.config.spanner_instance_id)
database = instance.database(self.config.spanner_database_id)
with database.batch() as batch:
batch.insert_or_update(table="ImportStatus",
columns=("ImportName", "LatestVersion",
"State", "JobId",
"UpdateTimestamp"),
values=[(import_name, gcs_path, "PENDING",
os.getenv('BATCH_JOB_UID'),
spanner.COMMIT_TIMESTAMP)])
logging.info(f'Updated {import_name} status in spanner.')

def _update_latest_version(self, version, output_dir, import_spec):
logging.info(f'Updating import latest version {version}')
self.uploader.upload_string(
Expand All @@ -664,6 +683,9 @@ def _update_latest_version(self, version, output_dir, import_spec):
self.uploader.upload_string('\n'.join(versions_history),
history_filename)
logging.info(f'Updated import latest version {version}')
gcs_path = os.path.join(self.config.storage_prod_bucket_name,
output_dir, version, '*', 'validation')
self._update_import_status_table(import_spec['import_name'], gcs_path)

@log_function_call
def _import_one_helper(
Expand Down Expand Up @@ -756,8 +778,7 @@ def _import_one_helper(
import_spec)
else:
logging.warning(
"Skipping latest version update due to skip_gcs_upload."
)
"Skipping latest version update as per import config.")
else:
logging.error(
"Skipping latest version update due to validation failure.")
Expand Down Expand Up @@ -786,6 +807,7 @@ def _import_one_helper(
block=True,
timeout=self.config.importer_import_timeout,
)
logging.info(f'Import workflow completed successfully!')

@log_function_call
def _upload_import_inputs(self, import_dir: str, output_dir: str,
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ google-cloud-bigquery
google-cloud-bigquery-storage
google-cloud-datastore
google-cloud-run
google-cloud-spanner
google-cloud-storage
google-cloud-logging
google-cloud-scheduler
Expand Down