Skip to content

Commit e1045ef

Browse files
committed
Move version update to cloud workflow
1 parent c5133b8 commit e1045ef

9 files changed

Lines changed: 406 additions & 184 deletions

File tree

import-automation/executor/app/configs.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ class ExecutorConfig:
4242
# Name of the Cloud Storage bucket to store the generated data files
4343
# for importing to prod.
4444
storage_prod_bucket_name: str = 'datcom-prod-imports'
45-
# Spanner instance details for import status.
46-
spanner_project_id: str = 'datcom-store'
47-
spanner_instance_id: str = 'dc-kg-test'
48-
spanner_database_id: str = 'dc_graph_import'
4945
# Name of the Cloud Storage bucket that the Data Commons importer
5046
# outputs to.
5147
storage_importer_bucket_name: str = 'resolved_mcf'
@@ -152,8 +148,6 @@ class ExecutorConfig:
152148
ignore_validation_status: bool = False
153149
# Import validation config file path (relative to data repo).
154150
validation_config_file: str = 'tools/import_validation/validation_config.json'
155-
# Latest import version (overwrite)
156-
import_version_override: str = ''
157151
# Maximum time venv creation can take in seconds.
158152
venv_create_timeout: float = 3600
159153
# Maximum time downloading a file can take in seconds.

import-automation/executor/app/executor/import_executor.py

Lines changed: 7 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import time
3030
import traceback
3131
from typing import Callable, Dict, Iterable, List, Optional, Tuple
32-
from google.cloud import spanner
3332
import datetime
3433
from enum import Enum
3534

@@ -776,42 +775,6 @@ def _invoke_import_job(
776775
import_summary.import_stats.get('source_data_size', 0))
777776
return inputs
778777

779-
def _update_import_status_table(
780-
self, import_summary: ImportStatusSummary) -> None:
781-
"""Updates import job status table in spanner."""
782-
logging.info(
783-
f'Updating {import_summary.import_name} status to {import_summary.status} in spanner.'
784-
)
785-
if not self.config.spanner_project_id or not self.config.spanner_instance_id or not self.config.spanner_database_id:
786-
return
787-
spanner_client = spanner.Client(
788-
project=self.config.spanner_project_id,
789-
client_options={'quota_project_id': self.config.spanner_project_id})
790-
instance = spanner_client.instance(self.config.spanner_instance_id)
791-
database = instance.database(self.config.spanner_database_id)
792-
793-
with database.batch() as batch:
794-
columns = [
795-
"ImportName", "State", "JobId", "ExecutionTime", "DataVolume",
796-
"StatusUpdateTimestamp", "NextRefreshTimestamp", "LatestVersion"
797-
]
798-
values = [
799-
import_summary.import_name, import_summary.status.name,
800-
import_summary.job_id, import_summary.execution_time,
801-
import_summary.data_volume, spanner.COMMIT_TIMESTAMP,
802-
import_summary.next_refresh, import_summary.latest_version
803-
]
804-
# Update import timestamp only if import completed successfully.
805-
if import_summary.status == ImportStatus.READY:
806-
columns.extend(["DataImportTimestamp"])
807-
values.extend([spanner.COMMIT_TIMESTAMP])
808-
809-
batch.insert_or_update(table="ImportStatus",
810-
columns=tuple(columns),
811-
values=[tuple(values)])
812-
813-
logging.info(f'Updated {import_summary.import_name} status in spanner.')
814-
815778
def _update_latest_version(self, version, output_dir, import_spec,
816779
import_summary):
817780
if self.config.skip_gcs_upload:
@@ -821,25 +784,16 @@ def _update_latest_version(self, version, output_dir, import_spec,
821784
logging.info(f'Updating import latest version {version}')
822785
self.uploader.upload_string(
823786
version,
824-
os.path.join(output_dir, self.config.storage_version_filename))
787+
os.path.join(output_dir, STAGING_PATH,
788+
self.config.storage_version_filename))
825789
self.uploader.upload_string(
826790
self._import_metadata_mcf_helper(import_spec),
827-
os.path.join(output_dir, self.config.import_metadata_mcf_filename))
791+
os.path.join(output_dir, version,
792+
self.config.import_metadata_mcf_filename))
828793
self.uploader.upload_string(
829794
json.dumps(dataclasses.asdict(import_summary), default=str),
830-
os.path.join(output_dir, IMPORT_SUMMARY_FILE))
831-
# Add current version to the history of versions if import was successful.
832-
if self.config.storage_version_history_filename:
833-
history_filename = os.path.join(
834-
output_dir, self.config.storage_version_history_filename)
835-
versions_history = [version]
836-
history = self._get_blob_content(history_filename)
837-
if history:
838-
versions_history.append(history)
839-
self.uploader.upload_string('\n'.join(versions_history),
840-
history_filename)
795+
os.path.join(output_dir, version, IMPORT_SUMMARY_FILE))
841796
logging.info(f'Updated import latest version {version}')
842-
self._update_import_status_table(import_summary)
843797

844798
@log_function_call
845799
def _import_one_helper(
@@ -868,21 +822,14 @@ def _import_one_helper(
868822
self.config.file_download_timeout)
869823

870824
output_dir = f'{relative_import_dir}/{import_name}'
871-
version = self.config.import_version_override if self.config.import_version_override else _clean_time(
872-
utils.pacific_time())
825+
version = _clean_time(utils.pacific_time())
873826
if version == 'DATE_VERSION_PLACEHOLDER':
874827
version = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
875828
import_summary.latest_version = 'gs://' + os.path.join(
876829
self.config.storage_prod_bucket_name, output_dir, version, '*', '*',
877830
'*.mcf')
878831
import_summary.next_refresh = utils.next_utc_timestamp(
879832
import_spec.get('cron_schedule'))
880-
if self.config.import_version_override and self.config.import_version_override != 'DATE_VERSION_PLACEHOLDER':
881-
logging.info(f'Import version override {version}')
882-
import_summary.status = ImportStatus.READY
883-
self._update_latest_version(version, output_dir, import_spec,
884-
import_summary)
885-
return
886833

887834
with tempfile.TemporaryDirectory() as tmpdir:
888835
requirements_path = os.path.join(absolute_import_dir,
@@ -947,9 +894,6 @@ def _import_one_helper(
947894
import_summary.import_stats.get('mcf_data_size', 0) +
948895
import_summary.import_stats.get('validation_data_size', 0))
949896
logging.info(import_summary)
950-
self.uploader.upload_string(
951-
json.dumps(dataclasses.asdict(import_summary), default=str),
952-
os.path.join(output_dir, version, IMPORT_SUMMARY_FILE))
953897

954898
if self.config.ignore_validation_status or validation_status:
955899
import_summary.status = ImportStatus.READY
@@ -958,10 +902,7 @@ def _import_one_helper(
958902
"Staging latest version update due to validation failure.")
959903
import_summary.status = ImportStatus.VALIDATION
960904

961-
# Update version and metadata files in staging folder for failed imports
962-
version_dir = output_dir if import_summary.status == ImportStatus.READY else os.path.join(
963-
output_dir, STAGING_PATH)
964-
self._update_latest_version(version, version_dir, import_spec,
905+
self._update_latest_version(version, output_dir, import_spec,
965906
import_summary)
966907

967908
if self.importer:

import-automation/executor/update_import_version.sh

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,53 +2,22 @@
22
#
33
# This script updates the latest version of an import.
44
#
5-
# It takes an existing cloud batch job name and a version as input parameters.
6-
# It fetches the configuration of the job including the import name, modifies the
7-
# import_version_override parameter, and submits a new job with the updated configuration.
8-
#
9-
# Requirements:
10-
# - gcloud
11-
# - jq
12-
#
13-
# Usage: ./update_import_version.sh <job_name> <version>
14-
# Example: ./update_import_version.sh usfed-constantmaturityrates-1755659705 2025_08_15T20_18_20_801877_07_00
5+
# Usage: ./update_import_version.sh <import_name> <version> <reason>
6+
# Example: ./update_import_version.sh scripts/us_fed/treasury_constant_maturity_rates:USFed_ConstantMaturityRates_Test 2025_12_17T02_30_27_233484_08_00 'Manual validation'
157

168
set -e
179

18-
if [ "$#" -ne 2 ]; then
19-
echo "Usage: $0 <job_name> <version>"
20-
echo "Example: $0 usfed-constantmaturityrates-1755659705 2025_08_15T20_18_20_801877_07_00"
10+
if [ "$#" -ne 3 ]; then
11+
echo "Usage: $0 <import_name> <version> <reason>"
2112
exit 1
2213
fi
2314

24-
JOB_NAME=$1
15+
FUNCTION_URL="https://spanner-ingestion-helper-965988403328.us-central1.run.app"
16+
IMPORT_NAME=$1
2517
VERSION=$2
26-
LOCATION="us-central1"
27-
PROJECT="datcom-import-automation-prod"
28-
USER_NAME=$(whoami)
29-
NEW_JOB_NAME="${JOB_NAME}-override-${USER_NAME}"
30-
TEMP_JSON_FILE=$(mktemp)
31-
trap 'rm -f -- "$TEMP_JSON_FILE"' EXIT
18+
REASON=$3
3219

33-
echo "Fetching configuration for job '${JOB_NAME}'..."
34-
gcloud batch jobs describe "${JOB_NAME}" --location="${LOCATION}" --project="${PROJECT}" --format=json > "${TEMP_JSON_FILE}"
35-
echo "Updating job configuration with import_version_override: '${VERSION}'"
36-
# TODO: add check to ensure the version exists on GCS.
37-
COMMAND_INDEX=$(jq -r '[.taskGroups[0].taskSpec.runnables[0].container.commands[] | startswith("--import_config=")] | index(true)' "${TEMP_JSON_FILE}")
38-
IMPORT_CONFIG_COMMAND=$(jq -r ".taskGroups[0].taskSpec.runnables[0].container.commands[${COMMAND_INDEX}]" "${TEMP_JSON_FILE}")
39-
IMPORT_CONFIG_JSON=$(echo "${IMPORT_CONFIG_COMMAND}" | sed 's/^--import_config=//')
40-
NEW_IMPORT_CONFIG_JSON=$(echo "${IMPORT_CONFIG_JSON}" | jq -c --arg version "${VERSION}" '. + {import_version_override: $version}')
41-
NEW_COMMAND="--import_config=${NEW_IMPORT_CONFIG_JSON}"
42-
43-
UPDATED_CONFIG_FILE=$(mktemp)
44-
trap 'rm -f -- "$TEMP_JSON_FILE" "$UPDATED_CONFIG_FILE"' EXIT
45-
46-
jq --arg new_command "${NEW_COMMAND}" '.taskGroups[0].taskSpec.runnables[0].container.commands[1] = $new_command' "${TEMP_JSON_FILE}" > "${UPDATED_CONFIG_FILE}"
47-
48-
echo "Submitting new job '${NEW_JOB_NAME}'..."
49-
gcloud batch jobs submit "${NEW_JOB_NAME}" \
50-
--location="${LOCATION}" \
51-
--project="${PROJECT}" \
52-
--config="${UPDATED_CONFIG_FILE}"
53-
54-
echo "Successfully submitted new job: ${NEW_JOB_NAME}"
20+
curl -X POST "${FUNCTION_URL}" \
21+
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
22+
-H "Content-Type: application/json" \
23+
-d "{\"actionType\": \"update_import_version\", \"importName\": \"${IMPORT_NAME}\", \"version\": \"${VERSION}\", \"reason\": \"${REASON}\"}"

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ main:
8080
result: function_response
8181
- fail_workflow:
8282
raise: ${e}
83+
- update_import_version:
84+
call: http.post
85+
args:
86+
url: ${function_url}
87+
auth:
88+
type: OIDC
89+
body:
90+
actionType: update_import_version
91+
importName: ${args.importName}
92+
version: 'staging'
93+
reason: 'import-automation'
94+
result: function_response
8395
- returnResult:
8496
return:
8597
jobId: ${jobId}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""Utility functions for the ingestion helper."""
2+
3+
import logging
4+
import croniter
5+
from datetime import datetime, timezone
6+
import base64
7+
from googleapiclient.discovery import build
8+
import json
9+
from googleapiclient.errors import HttpError
10+
from google.oauth2 import id_token
11+
from google.auth.transport import requests
12+
from google.auth import jwt
13+
14+
15+
def get_caller_identity(request):
16+
"""Extracts the caller's email from the Authorization header (JWT)."""
17+
auth_header = request.headers.get('Authorization')
18+
if auth_header:
19+
parts = auth_header.split()
20+
if len(parts) == 2 and parts[0].lower() == 'bearer':
21+
token = parts[1]
22+
unverified_claims = {}
23+
try:
24+
unverified_claims = jwt.decode(token, verify=False)
25+
#logging.info(f"Token claims (unverified): iss={unverified_claims.get('iss')}, aud={unverified_claims.get('aud')}, email={unverified_claims.get('email')}")
26+
logging.warning(
27+
f"Could not decode unverified token for debugging: {debug_e}"
28+
)
29+
id_info = id_token.verify_oauth2_token(token,
30+
requests.Request())
31+
return id_info.get('email', 'unknown_email')
32+
except Exception as e:
33+
if unverified_claims:
34+
email = unverified_claims.get('email', 'unknown_email')
35+
return f"{email} (unverified)"
36+
return 'decode_error'
37+
else:
38+
logging.warning(
39+
f"Invalid Authorization header format. Parts: {len(parts)}")
40+
else:
41+
logging.warning("No Authorization header received.")
42+
return 'no_auth_header'
43+
44+
45+
def get_import_params(request_json) -> dict:
46+
"""Extracts and calculates import parameters from the request JSON.
47+
48+
Args:
49+
request_json: A dictionary containing request parameters.
50+
51+
Returns:
52+
A dictionary with keys: job_id, duration, data, version, next_refresh, status.
53+
"""
54+
import_name = request_json.get('importName', '')
55+
status = request_json.get('status', '')
56+
job_id = request_json.get('jobId', '')
57+
duration = request_json.get('duration', 0)
58+
data = request_json.get('data', 0)
59+
version = request_json.get('version', '')
60+
schedule = request_json.get('schedule', '')
61+
next_refresh = datetime.now(timezone.utc)
62+
try:
63+
next_refresh = croniter.croniter(schedule, datetime.now(
64+
timezone.utc)).get_next(datetime)
65+
except (croniter.CroniterError) as e:
66+
logging.error(
67+
f"Error calculating next refresh from schedule '{schedule}': {e}")
68+
return {
69+
'import_name': import_name,
70+
'status': status,
71+
'job_id': job_id,
72+
'duration': duration,
73+
'data': data,
74+
'version': version,
75+
'next_refresh': next_refresh
76+
}
77+
78+
79+
def create_import_params(summary) -> dict:
80+
"""Creates import parameters from the import summary.
81+
82+
Args:
83+
summary: A dictionary containing import summary details.
84+
85+
Returns:
86+
A dictionary with keys: job_id, duration, data, version, next_refresh, status.
87+
"""
88+
import_name = summary.get('import_name', '')
89+
status = summary.get('status', '').removeprefix('ImportStatus.')
90+
job_id = summary.get('job_id', '')
91+
duration = summary.get('execution_time', 0)
92+
data = summary.get('data_volume', 0)
93+
version = summary.get('latest_version', '')
94+
next_refresh_str = summary.get('next_refresh', '')
95+
next_refresh = None
96+
if next_refresh_str:
97+
try:
98+
next_refresh = datetime.fromisoformat(next_refresh_str)
99+
except ValueError:
100+
logging.error(f"Error parsing next_refresh: {next_refresh_str}")
101+
102+
return {
103+
'import_name': import_name,
104+
'status': status,
105+
'job_id': job_id,
106+
'duration': duration,
107+
'data': data,
108+
'version': version,
109+
'next_refresh': next_refresh,
110+
}
111+
112+
113+
def get_ingestion_metrics(project_id, location, job_id):
114+
"""Fetches graph metrics (nodes, edges, observations) from a Dataflow job.
115+
116+
Args:
117+
project_id: The GCP project ID.
118+
location: The location of the Dataflow job.
119+
job_id: The Dataflow job ID.
120+
121+
Returns:
122+
A dictionary containing 'obs_count', 'node_count', and 'edge_count'.
123+
"""
124+
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
125+
# Fetch Dataflow metrics
126+
node_count = 0
127+
edge_count = 0
128+
obs_count = 0
129+
if project_id and job_id:
130+
try:
131+
metrics = dataflow.projects().locations().jobs().getMetrics(
132+
projectId=project_id, location=location,
133+
jobId=job_id).execute()
134+
for metric in metrics.get('metrics', []):
135+
name = metric['name']['name']
136+
if name == 'graph_node_count':
137+
node_count = int(metric['scalar'])
138+
elif name == 'graph_edge_count':
139+
edge_count = int(metric['scalar'])
140+
elif name == 'graph_observation_count':
141+
obs_count = int(metric['scalar'])
142+
except HttpError as e:
143+
logging.error(
144+
f"Error fetching dataflow metrics for job {job_id}: {e}")
145+
return {
146+
'obs_count': obs_count,
147+
'node_count': node_count,
148+
'edge_count': edge_count
149+
}

0 commit comments

Comments
 (0)