Skip to content

Commit 239151b

Browse files
committed
Address reviewer comments
1 parent 73915d6 commit 239151b

9 files changed

Lines changed: 100 additions & 67 deletions

File tree

import-automation/executor/app/configs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ class ExecutorConfig:
149149
ignore_validation_status: bool = False
150150
# Import validation config file path (relative to data repo).
151151
validation_config_file: str = 'tools/import_validation/validation_config.json'
152+
# Latest import version (overwrite)
153+
import_version_override: str = ''
152154
# Maximum time venv creation can take in seconds.
153155
venv_create_timeout: float = 3600
154156
# Maximum time downloading a file can take in seconds.

import-automation/executor/app/executor/import_executor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
AUTO_IMPORT_JOB_STAGE = "auto-import-job-stage"
7676
AUTO_IMPORT_JOB_STATUS = "auto-import-job-status"
7777
IMPORT_SUMMARY_FILE = "import_summary.json"
78-
STAGING_PATH = "staging"
78+
STAGING_VERSION_FILE = "staging_version.txt"
7979

8080

8181
class ImportStatus(Enum):
@@ -783,9 +783,7 @@ def _update_latest_version(self, version, output_dir, import_spec,
783783
return
784784
logging.info(f'Updating import latest version {version}')
785785
self.uploader.upload_string(
786-
version,
787-
os.path.join(output_dir, STAGING_PATH,
788-
self.config.storage_version_filename))
786+
version, os.path.join(output_dir, STAGING_VERSION_FILE))
789787
self.uploader.upload_string(
790788
self._import_metadata_mcf_helper(import_spec),
791789
os.path.join(output_dir, version,
@@ -822,10 +820,12 @@ def _import_one_helper(
822820
self.config.file_download_timeout)
823821

824822
output_dir = f'{relative_import_dir}/{import_name}'
823+
version = self.config.import_version_override if self.config.import_version_override else _clean_time(
824+
utils.pacific_time())
825+
# Used for imports using CDA feed tranfers with a date placeholder in the GCS path,
826+
# thus, we can determine the path using the current date (instead of a variable timestamp).
825827
if version == 'DATE_VERSION_PLACEHOLDER':
826828
version = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
827-
else:
828-
version = _clean_time(utils.pacific_time())
829829
import_summary.latest_version = 'gs://' + os.path.join(
830830
self.config.storage_prod_bucket_name, output_dir, version, '*', '*',
831831
'*.mcf')

import-automation/executor/update_import_version.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,23 @@
22
#
33
# This script updates the latest version of an import.
44
#
5-
# Usage: ./update_import_version.sh <import_name> <version> <reason>
5+
# Usage: ./update_import_version.sh <import_name> <version> <comment>
66
# Example: ./update_import_version.sh scripts/us_fed/treasury_constant_maturity_rates:USFed_ConstantMaturityRates_Test 2025_12_17T02_30_27_233484_08_00 'Manual validation'
77

88
set -e
99

1010
if [ "$#" -ne 3 ]; then
11-
echo "Usage: $0 <import_name> <version> <reason>"
11+
echo "Usage: $0 <import_name> <version> <comment>"
1212
exit 1
1313
fi
1414

1515
# Deployed using import-automation/workflow/cloudbuild.yaml
1616
FUNCTION_URL="https://us-central1-datcom-import-automation-prod.cloudfunctions.net/spanner-ingestion-helper"
1717
IMPORT_NAME=$1
1818
VERSION=$2
19-
REASON=$3
19+
COMMENT=$3
2020

2121
curl -X POST "${FUNCTION_URL}" \
2222
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
2323
-H "Content-Type: application/json" \
24-
-d "{\"actionType\": \"update_import_version\", \"importName\": \"${IMPORT_NAME}\", \"version\": \"${VERSION}\", \"reason\": \"${REASON}\"}"
24+
-d "{\"actionType\": \"update_import_version\", \"importName\": \"${IMPORT_NAME}\", \"version\": \"${VERSION}\", \"comment\": \"${COMMENT}\"}"

import-automation/workflow/cloudbuild.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# gcloud builds submit --config=cloudbuild.yaml . --project=datcom-import-automation-prod
22

33
substitutions:
4-
_PROJECT_ID: 'datcom-import-automation-prod'
4+
_PROJECT_ID: 'datcom-import-automation-prod'
55
_SPANNER_PROJECT_ID: 'datcom-store'
66
_SPANNER_INSTANCE_ID: 'dc-kg-test'
77
_SPANNER_DATABASE_ID: 'dc_graph_import'
@@ -12,11 +12,11 @@ substitutions:
1212
steps:
1313
- id: 'import-automation-workflow'
1414
name: 'gcr.io/cloud-builders/gcloud'
15-
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
15+
args: ['workflows', 'deploy', 'vishg-test-workflow', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
1616

1717
- id: 'spanner-ingestion-workflow'
1818
name: 'gcr.io/cloud-builders/gcloud'
19-
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
19+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
2020

2121
- id: 'spanner-ingestion-helper'
2222
name: 'gcr.io/cloud-builders/gcloud'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ main:
7575
jobId: ${jobId}
7676
importName: ${text.split(args.importName, ":")[1]}
7777
status: 'FAILED'
78-
duration: ${int(sys.now() - startTime)}
78+
execTime: ${int(sys.now() - startTime)}
7979
version: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
8080
schedule: ${default(map.get(args, "schedule"), "")}
8181
result: functionResponse
@@ -91,7 +91,7 @@ main:
9191
actionType: 'update_import_version'
9292
importName: ${args.importName}
9393
version: 'staging'
94-
reason: 'import-automation'
94+
comment: 'import-automation'
9595
result: functionResponse
9696
- returnResult:
9797
return:

import-automation/workflow/ingestion-helper/import_utils.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
import logging
1717
import croniter
1818
from datetime import datetime, timezone
19-
import base64
2019
from googleapiclient.discovery import build
21-
import json
2220
from googleapiclient.errors import HttpError
2321
from google.oauth2 import id_token
2422
from google.auth.transport import requests
@@ -60,13 +58,13 @@ def get_import_params(request_json) -> dict:
6058
request_json: A dictionary containing request parameters.
6159
6260
Returns:
63-
A dictionary with keys: job_id, duration, data, version, next_refresh, status.
61+
A dictionary with import params.
6462
"""
6563
import_name = request_json.get('importName', '')
6664
status = request_json.get('status', '')
6765
job_id = request_json.get('jobId', '')
68-
duration = request_json.get('duration', 0)
69-
data = request_json.get('data', 0)
66+
exec_time = request_json.get('execTime', 0)
67+
data_volume = request_json.get('dataVolume', 0)
7068
version = request_json.get('version', '')
7169
schedule = request_json.get('schedule', '')
7270
next_refresh = datetime.now(timezone.utc)
@@ -80,8 +78,8 @@ def get_import_params(request_json) -> dict:
8078
'import_name': import_name,
8179
'status': status,
8280
'job_id': job_id,
83-
'duration': duration,
84-
'data': data,
81+
'exec_time': exec_time,
82+
'data_volume': data_volume,
8583
'version': version,
8684
'next_refresh': next_refresh
8785
}
@@ -94,13 +92,13 @@ def create_import_params(summary) -> dict:
9492
summary: A dictionary containing import summary details.
9593
9694
Returns:
97-
A dictionary with keys: job_id, duration, data, version, next_refresh, status.
95+
A dictionary with import params.
9896
"""
9997
import_name = summary.get('import_name', '')
10098
status = summary.get('status', '').removeprefix('ImportStatus.')
10199
job_id = summary.get('job_id', '')
102-
duration = summary.get('execution_time', 0)
103-
data = summary.get('data_volume', 0)
100+
exec_time = summary.get('execution_time', 0)
101+
data_volume = summary.get('data_volume', 0)
104102
version = summary.get('latest_version', '')
105103
next_refresh_str = summary.get('next_refresh', '')
106104
next_refresh = None
@@ -114,8 +112,8 @@ def create_import_params(summary) -> dict:
114112
'import_name': import_name,
115113
'status': status,
116114
'job_id': job_id,
117-
'duration': duration,
118-
'data': data,
115+
'exec_time': exec_time,
116+
'data_volume': data_volume,
119117
'version': version,
120118
'next_refresh': next_refresh,
121119
}

import-automation/workflow/ingestion-helper/main.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,25 @@
1919
import logging
2020
import import_utils
2121
import os
22-
import sys
2322
from absl import flags
2423

2524
logging.getLogger().setLevel(logging.INFO)
2625

2726
FLAGS = flags.FLAGS
2827

29-
flags.DEFINE_string(
30-
'project_id', os.environ.get('PROJECT_ID', 'datcom-import-automation-prod'),
31-
'GCP Project ID')
32-
flags.DEFINE_string('spanner_project_id',
33-
os.environ.get('SPANNER_PROJECT_ID', 'datcom-store'),
28+
flags.DEFINE_string('project_id', os.environ.get('PROJECT_ID'),
29+
'GCP Project ID')
30+
flags.DEFINE_string('spanner_project_id', os.environ.get('SPANNER_PROJECT_ID'),
3431
'Spanner Project ID')
3532
flags.DEFINE_string('spanner_instance_id',
36-
os.environ.get('SPANNER_INSTANCE_ID', 'dc-kg-test'),
33+
os.environ.get('SPANNER_INSTANCE_ID'),
3734
'Spanner Instance ID')
3835
flags.DEFINE_string('spanner_database_id',
39-
os.environ.get('SPANNER_DATABASE_ID', 'dc_graph_import'),
36+
os.environ.get('SPANNER_DATABASE_ID'),
4037
'Spanner Database ID')
41-
flags.DEFINE_string('gcs_bucket_id',
42-
os.environ.get('GCS_BUCKET_ID', 'datcom-prod-imports'),
38+
flags.DEFINE_string('gcs_bucket_id', os.environ.get('GCS_BUCKET_ID'),
4339
'GCS Bucket ID')
44-
flags.DEFINE_string('location', os.environ.get('LOCATION', 'us-central1'),
45-
'GCP Location')
40+
flags.DEFINE_string('location', os.environ.get('LOCATION'), 'Location')
4641

4742

4843
def _validate_params(request_json, required_params):
@@ -72,11 +67,16 @@ def ingestion_helper(request):
7267

7368
if actionType == 'get_import_list':
7469
# Gets the list of imports that are ready for ingestion.
70+
# Input:
71+
# importList: list of import names to filter by (optional)
7572
import_list = request_json.get('importList', [])
7673
imports = spanner.get_import_list(import_list)
7774
return jsonify(imports)
7875
elif actionType == 'acquire_ingestion_lock':
7976
# Attempts to acquire the global lock for ingestion.
77+
# Input:
78+
# workflowId: ID of the workflow acquiring the lock
79+
# timeout: lock duration in seconds
8080
validation_error = _validate_params(request_json,
8181
['workflowId', 'timeout'])
8282
if validation_error:
@@ -89,6 +89,8 @@ def ingestion_helper(request):
8989
return ('Lock acquired', 200)
9090
elif actionType == 'release_ingestion_lock':
9191
# Releases the global ingestion lock.
92+
# Input:
93+
# workflowId: ID of the workflow releasing the lock
9294
validation_error = _validate_params(request_json, ['workflowId'])
9395
if validation_error:
9496
return (validation_error, 400)
@@ -99,8 +101,12 @@ def ingestion_helper(request):
99101
return ('Lock released', 200)
100102
elif actionType == 'update_ingestion_status':
101103
# Updates the status of imports after ingestion.
102-
validation_error = _validate_params(request_json,
103-
['importList', 'workflowId'])
104+
# Input:
105+
# importList: list of import names
106+
# workflowId: ID of the workflow
107+
# jobId: Dataflow job ID
108+
validation_error = _validate_params(
109+
request_json, ['importList', 'workflowId', 'jobId'])
104110
if validation_error:
105111
return (validation_error, 400)
106112
import_list = request_json['importList']
@@ -113,6 +119,14 @@ def ingestion_helper(request):
113119
return ('Updated ingestion status', 200)
114120
elif actionType == 'update_import_status':
115121
# Updates the status of a specific import job.
122+
# Input:
123+
# importName: name of the import
124+
# status: new status
125+
# jobId: Dataflow job ID (optional)
126+
# execTime: execution time in seconds (optional)
127+
# dataVolume: data volume in bytes (optional)
128+
# version: version string (optional)
129+
# schedule: cron schedule string (optional)
116130
validation_error = _validate_params(request_json,
117131
['importName', 'status'])
118132
if validation_error:
@@ -126,23 +140,28 @@ def ingestion_helper(request):
126140
200)
127141
elif actionType == 'update_import_version':
128142
# Updates the version of an import and marks it as READY.
129-
validation_error = _validate_params(request_json,
130-
['importName', 'version', 'reason'])
143+
# Input:
144+
# importName: name of the import
145+
# version: version string
146+
# comment: audit log comment
147+
validation_error = _validate_params(
148+
request_json, ['importName', 'version', 'comment'])
131149
if validation_error:
132150
return (validation_error, 400)
133151
import_name = request_json['importName']
134152
version = request_json['version']
135-
reason = request_json['reason']
153+
comment = request_json['comment']
136154
caller = import_utils.get_caller_identity(request)
137155
logging.info(
138-
f"[ImportVersionAuditLog] Import {import_name} version {version} caller: {caller} reason: {reason}"
156+
f"[ImportVersionAuditLog] Import {import_name} version {version} caller: {caller} comment: {comment}"
139157
)
140158
if version == 'staging':
141159
version = storage.get_staging_version(import_name)
142160
summary = storage.get_import_summary(import_name, version)
143161
params = import_utils.create_import_params(summary)
144162
params['status'] = 'READY'
145163
storage.update_version_file(import_name, version)
164+
spanner.update_version_history(import_name, version, comment)
146165
spanner.update_import_status(params)
147166
return (f'Updated import {import_name} to version {version}', 200)
148167
else:

import-automation/workflow/ingestion-helper/spanner_client.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ def update_import_status(self, params: dict):
197197
"""Updates the status for the specified import job."""
198198
import_name = params['import_name']
199199
job_id = params['job_id']
200-
duration = params['duration']
201-
data = params['data']
200+
exec_time = params['exec_time']
201+
data_volume = params['data_volume']
202202
status = params['status']
203203
version = params['version']
204204
next_refresh = params['next_refresh']
@@ -211,8 +211,8 @@ def _record(transaction: Transaction):
211211
]
212212

213213
row_values = [
214-
import_name, status, job_id, duration, data, next_refresh,
215-
version, spanner.COMMIT_TIMESTAMP
214+
import_name, status, job_id, exec_time, data_volume,
215+
next_refresh, version, spanner.COMMIT_TIMESTAMP
216216
]
217217

218218
if status == 'READY':
@@ -231,3 +231,31 @@ def _record(transaction: Transaction):
231231
logging.error(
232232
f'Error updating import status for {import_name}: {e}')
233233
raise
234+
235+
def update_version_history(self, import_name: str, version: str,
236+
comment: str):
237+
"""Updates the version history table.
238+
239+
Args:
240+
import_name: The name of the import.
241+
version: The version string.
242+
comment: The comment for the update.
243+
"""
244+
logging.info(f"Updating version history for {import_name} to {version}")
245+
246+
def _record(transaction: Transaction):
247+
columns = ["ImportName", "Version", "UpdateTimestamp", "Comment"]
248+
values = [[
249+
import_name, version, spanner.COMMIT_TIMESTAMP, comment
250+
]]
251+
transaction.insert(table="ImportVersionHistory",
252+
columns=columns,
253+
values=values)
254+
logging.info(f"Added version history entry for {import_name}")
255+
256+
try:
257+
self.database.run_in_transaction(_record)
258+
except Exception as e:
259+
logging.error(
260+
f'Error updating version history for {import_name}: {e}')
261+
raise

0 commit comments

Comments
 (0)