Skip to content

Commit 445391b

Browse files
committed
Address reviewer comments
1 parent babc837 commit 445391b

5 files changed

Lines changed: 34 additions & 14 deletions

File tree

import-automation/executor/app/configs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,18 @@ class ExecutorConfig:
142142
# Invoke import tool genmcf.
143143
invoke_import_tool: bool = True
144144
# Invoke differ tool.
145-
invoke_differ_tool: bool = True
145+
invoke_differ_tool: bool = True
146146
# Invoke validations before upload.
147-
invoke_import_validation: bool = True
147+
invoke_import_validation: bool = True
148148
# Ignore validation status during import.
149149
ignore_validation_status: bool = False
150150
# Import validation config file path (relative to data repo).
151151
validation_config_file: str = 'tools/import_validation/validation_config.json'
152152
# Latest import version (overwrite)
153153
import_version_override: str = ''
154154
# Relative path to version folder for graph files.
155-
graph_data_paths: List[str] = dataclasses.field(default_factory=lambda: ['/*/*/*.mcf'])
155+
graph_data_paths: List[str] = dataclasses.field(
156+
default_factory=lambda: ['/*/*/*.mcf'])
156157
# Maximum time venv creation can take in seconds.
157158
venv_create_timeout: float = 3600
158159
# Maximum time downloading a file can take in seconds.

import-automation/workflow/import-helper/main.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import base64
216
import functions_framework
317
import json

import-automation/workflow/ingestion-helper/main.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
if not FLAGS.is_parsed():
4343
FLAGS(['ingestion_helper'])
4444

45+
4546
def _validate_params(request_json, required_params):
4647
for param in required_params:
4748
if param not in request_json:
@@ -135,7 +136,7 @@ def ingestion_helper(request):
135136
return (validation_error, 400)
136137
import_name = request_json['importName']
137138
status = request_json['status']
138-
logging.info(f'Updating {import_name} {status}')
139+
logging.info(f'Updating import {import_name} to status {status}')
139140
params = import_utils.get_import_params(request_json)
140141
spanner.update_import_status(params)
141142
return (f"Updated import {import_name} to status {params['status']}",
@@ -153,18 +154,19 @@ def ingestion_helper(request):
153154
import_name = request_json['importName']
154155
version = request_json['version']
155156
comment = request_json['comment']
157+
short_import_name = import_name.split(':')[-1]
156158
caller = import_utils.get_caller_identity(request)
157159
logging.info(
158-
f"[ImportVersionAuditLog] Import {import_name} version {version} caller: {caller} comment: {comment}"
160+
f"Import {short_import_name} version {version} caller: {caller} comment: {comment}"
159161
)
160162
if version == 'staging':
161163
version = storage.get_staging_version(import_name)
162164
summary = storage.get_import_summary(import_name, version)
163165
params = import_utils.create_import_params(summary)
164166
params['status'] = 'READY'
165167
storage.update_version_file(import_name, version)
166-
spanner.update_version_history(import_name, version, comment)
168+
spanner.update_version_history(import_name, version, caller, comment)
167169
spanner.update_import_status(params)
168-
return (f'Updated import {import_name} to version {version}', 200)
170+
return (f'Updated import {short_import_name} to version {version}', 200)
169171
else:
170172
return (f'Unknown actionType: {actionType}', 400)

import-automation/workflow/ingestion-helper/spanner_client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ class SpannerClient:
2727
and getting/updating import statuses.
2828
"""
2929
_LOCK_ID = "global_ingestion_lock"
30-
_DEFAULT_GRAPH_PATH = "/*/*/*.mcf"
3130

3231
def __init__(self, project_id: str, instance_id: str, database_id: str):
3332
"""Initializes a Spanner client and connects to a specific database."""
@@ -202,9 +201,9 @@ def update_import_status(self, params: dict):
202201
exec_time = params['exec_time']
203202
data_volume = params['data_volume']
204203
status = params['status']
205-
version = params['version'].removesuffix(self._DEFAULT_GRAPH_PATH)
204+
version = params['version']
206205
next_refresh = params['next_refresh']
207-
graph_paths = params.get('graph_paths', [self._DEFAULT_GRAPH_PATH])
206+
graph_paths = params['graph_paths']
208207
logging.info(f"Updating import status for {import_name} to {status}")
209208

210209
def _record(transaction: Transaction):
@@ -237,7 +236,7 @@ def _record(transaction: Transaction):
237236
raise
238237

239238
def update_version_history(self, import_name: str, version: str,
240-
comment: str):
239+
caller: str, comment: str):
241240
"""Updates the version history table.
242241
243242
Args:
@@ -248,8 +247,12 @@ def update_version_history(self, import_name: str, version: str,
248247
logging.info(f"Updating version history for {import_name} to {version}")
249248

250249
def _record(transaction: Transaction):
251-
columns = ["ImportName", "Version", "UpdateTimestamp", "Comment"]
252-
values = [[import_name, version, spanner.COMMIT_TIMESTAMP, comment]]
250+
columns = [
251+
"ImportName", "Version", "UpdateTimestamp", "Caller", "Comment"
252+
]
253+
values = [[
254+
import_name, version, spanner.COMMIT_TIMESTAMP, caller, comment
255+
]]
253256
transaction.insert(table="ImportVersionHistory",
254257
columns=columns,
255258
values=values)

import-automation/workflow/ingestion-helper/storage_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ def update_version_file(self, import_name: str, version: str):
8888
output_dir = import_name.replace(':', '/')
8989
version_file = self.bucket.blob(
9090
os.path.join(output_dir, 'latest_version.txt'))
91-
version_file.upload_from_string(version)
9291
self.bucket.copy_blob(
9392
self.bucket.blob(
9493
os.path.join(output_dir, version,
9594
'import_metadata_mcf.mcf')), self.bucket,
9695
os.path.join(output_dir, 'import_metadata_mcf.mcf'))
96+
version_file.upload_from_string(version)
9797
except exceptions.NotFound as e:
9898
logging.error(f'Error updating version file for {import_name}: {e}')
9999
raise

0 commit comments

Comments
 (0)