Skip to content

Commit 1a86bc1

Browse files
committed
Address reviewer comments
1 parent c375e66 commit 1a86bc1

9 files changed

Lines changed: 144 additions & 41 deletions

File tree

import-automation/executor/app/executor/import_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,9 +822,10 @@ def _import_one_helper(
822822
self.config.file_download_timeout)
823823

824824
output_dir = f'{relative_import_dir}/{import_name}'
825-
version = _clean_time(utils.pacific_time())
826825
if version == 'DATE_VERSION_PLACEHOLDER':
827826
version = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
827+
else:
828+
version = _clean_time(utils.pacific_time())
828829
import_summary.latest_version = 'gs://' + os.path.join(
829830
self.config.storage_prod_bucket_name, output_dir, version, '*', '*',
830831
'*.mcf')

import-automation/executor/update_import_version.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ if [ "$#" -ne 3 ]; then
1212
exit 1
1313
fi
1414

15-
FUNCTION_URL="https://spanner-ingestion-helper-965988403328.us-central1.run.app"
15+
# Deployed using import-automation/workflow/cloudbuild.yaml
16+
FUNCTION_URL="https://us-central1-datcom-import-automation-prod.cloudfunctions.net/spanner-ingestion-helper"
1617
IMPORT_NAME=$1
1718
VERSION=$2
1819
REASON=$3

import-automation/workflow/cloudbuild.yaml

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
# gcloud builds submit --config=cloudbuild.yaml . --project=datcom-import-automation-prod
22

3+
substitutions:
4+
_PROJECT_ID: 'datcom-import-automation-prod'
5+
_SPANNER_PROJECT_ID: 'datcom-store'
6+
_SPANNER_INSTANCE_ID: 'dc-kg-test'
7+
_SPANNER_DATABASE_ID: 'dc_graph_import'
8+
_GCS_BUCKET_ID: 'datcom-prod-imports'
9+
_LOCATION: 'us-central1'
10+
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
11+
312
steps:
413
- id: 'import-automation-workflow'
514
name: 'gcr.io/cloud-builders/gcloud'
6-
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml']
15+
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
716

817
- id: 'spanner-ingestion-workflow'
918
name: 'gcr.io/cloud-builders/gcloud'
10-
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml']
19+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
1120

1221
- id: 'spanner-ingestion-helper'
1322
name: 'gcr.io/cloud-builders/gcloud'
14-
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper']
23+
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
1524

1625
- id: 'import-automation-helper'
1726
name: 'gcr.io/cloud-builders/gcloud'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ main:
44
- init:
55
assign:
66
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
7-
- region: "us-central1"
7+
- region: ${sys.get_env("LOCATION")}
88
- imageUri: "gcr.io/datcom-ci/dc-import-executor:stable"
99
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
1010
- importName: ${args.importName}
1111
- importConfig: ${args.importConfig}
12-
- gcsMountBucket: "datcom-volume-mount"
13-
- gcsImportBucket: "datcom-prod-imports"
12+
- gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")}
13+
- gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")}
1414
- gcsMountPath: "/tmp/gcs"
15-
- function_url: 'https://spanner-ingestion-helper-965988403328.us-central1.run.app'
15+
- ingestionHelper: "spanner-ingestion-helper"
16+
- functionUrl: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/" + ingestionHelper}
1617
- startTime: ${sys.now()}
17-
- createAndRunBatchJob:
18+
- runImportJob:
1819
try:
1920
call: googleapis.batch.v1.projects.locations.jobs.create
2021
args:
@@ -59,39 +60,39 @@ main:
5960
initial_delay: 60
6061
multiplier: 2
6162
max_delay: 600
62-
result: createAndRunBatchJobResponse
63+
result: importJobResponse
6364
except:
6465
as: e
6566
steps:
66-
- update_import_status:
67+
- updateImportStatus:
6768
call: http.post
6869
args:
69-
url: ${function_url}
70+
url: ${functionUrl}
7071
auth:
7172
type: OIDC
7273
body:
73-
actionType: update_import_status
74+
actionType: 'update_import_status'
7475
jobId: ${jobId}
7576
importName: ${text.split(args.importName, ":")[1]}
7677
status: 'FAILED'
7778
duration: ${int(sys.now() - startTime)}
7879
version: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
7980
schedule: ${default(map.get(args, "schedule"), "")}
80-
result: function_response
81-
- fail_workflow:
81+
result: functionResponse
82+
- failWorkflow:
8283
raise: ${e}
83-
- update_import_version:
84+
- updateImportVersion:
8485
call: http.post
8586
args:
86-
url: ${function_url}
87+
url: ${functionUrl}
8788
auth:
8889
type: OIDC
8990
body:
90-
actionType: update_import_version
91+
actionType: 'update_import_version'
9192
importName: ${args.importName}
9293
version: 'staging'
9394
reason: 'import-automation'
94-
result: function_response
95+
result: functionResponse
9596
- returnResult:
9697
return:
9798
jobId: ${jobId}

import-automation/workflow/ingestion-helper/import_utils.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
"""Utility functions for the ingestion helper."""
216

317
import logging
@@ -22,15 +36,14 @@ def get_caller_identity(request):
2236
unverified_claims = {}
2337
try:
2438
unverified_claims = jwt.decode(token, verify=False)
25-
#logging.info(f"Token claims (unverified): iss={unverified_claims.get('iss')}, aud={unverified_claims.get('aud')}, email={unverified_claims.get('email')}")
26-
logging.warning(
27-
f"Could not decode unverified token for debugging: {debug_e}"
28-
)
2939
id_info = id_token.verify_oauth2_token(token,
3040
requests.Request())
3141
return id_info.get('email', 'unknown_email')
3242
except Exception as e:
3343
if unverified_claims:
44+
logging.warning(
45+
f"Could not decode unverified token for debugging: {e}"
46+
)
3447
email = unverified_claims.get('email', 'unknown_email')
3548
return f"{email} (unverified)"
3649
return 'decode_error'
@@ -111,23 +124,39 @@ def create_import_params(summary) -> dict:
111124

112125

113126
def get_ingestion_metrics(project_id, location, job_id):
114-
"""Fetches graph metrics (nodes, edges, observations) from a Dataflow job.
127+
"""Fetches graph metrics (nodes, edges, observations) and execution time from a Dataflow job.
115128
116129
Args:
117130
project_id: The GCP project ID.
118131
location: The location of the Dataflow job.
119132
job_id: The Dataflow job ID.
120133
121134
Returns:
122-
A dictionary containing 'obs_count', 'node_count', and 'edge_count'.
135+
A dictionary containing 'obs_count', 'node_count', 'edge_count', and 'execution_time'.
123136
"""
124137
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
125138
# Fetch Dataflow metrics
126139
node_count = 0
127140
edge_count = 0
128141
obs_count = 0
142+
execution_time = 0
129143
if project_id and job_id:
130144
try:
145+
# Fetch Job details for execution time
146+
job = dataflow.projects().locations().jobs().get(
147+
projectId=project_id, location=location, jobId=job_id).execute()
148+
149+
start_time_str = job.get('startTime')
150+
current_state_time_str = job.get('currentStateTime')
151+
152+
if start_time_str and current_state_time_str:
153+
# Handle potential 'Z' suffix by replacing it with '+00:00' for compatibility
154+
start_time = datetime.fromisoformat(
155+
start_time_str.replace('Z', '+00:00'))
156+
end_time = datetime.fromisoformat(
157+
current_state_time_str.replace('Z', '+00:00'))
158+
execution_time = (end_time - start_time).total_seconds()
159+
131160
metrics = dataflow.projects().locations().jobs().getMetrics(
132161
projectId=project_id, location=location,
133162
jobId=job_id).execute()
@@ -145,5 +174,6 @@ def get_ingestion_metrics(project_id, location, job_id):
145174
return {
146175
'obs_count': obs_count,
147176
'node_count': node_count,
148-
'edge_count': edge_count
177+
'edge_count': edge_count,
178+
'execution_time': execution_time
149179
}

import-automation/workflow/ingestion-helper/main.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,48 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import functions_framework
216
from spanner_client import SpannerClient
317
from storage_client import StorageClient
418
from flask import jsonify
519
import logging
620
import import_utils
21+
import os
22+
import sys
23+
from absl import flags
724

825
logging.getLogger().setLevel(logging.INFO)
926

10-
_PROJECT_ID = 'datcom-import-automation-prod'
11-
_SPANNER_PROJECT_ID = 'datcom-store'
12-
_SPANNER_INSTANCE_ID = 'dc-kg-test'
13-
_SPANNER_DATABASE_ID = 'dc_graph_import'
14-
_GCS_BUCKET_ID = 'datcom-prod-imports'
15-
_LOCATION = 'us-central1'
27+
FLAGS = flags.FLAGS
28+
29+
flags.DEFINE_string(
30+
'project_id', os.environ.get('PROJECT_ID', 'datcom-import-automation-prod'),
31+
'GCP Project ID')
32+
flags.DEFINE_string('spanner_project_id',
33+
os.environ.get('SPANNER_PROJECT_ID', 'datcom-store'),
34+
'Spanner Project ID')
35+
flags.DEFINE_string('spanner_instance_id',
36+
os.environ.get('SPANNER_INSTANCE_ID', 'dc-kg-test'),
37+
'Spanner Instance ID')
38+
flags.DEFINE_string('spanner_database_id',
39+
os.environ.get('SPANNER_DATABASE_ID', 'dc_graph_import'),
40+
'Spanner Database ID')
41+
flags.DEFINE_string('gcs_bucket_id',
42+
os.environ.get('GCS_BUCKET_ID', 'datcom-prod-imports'),
43+
'GCS Bucket ID')
44+
flags.DEFINE_string('location', os.environ.get('LOCATION', 'us-central1'),
45+
'GCP Location')
1646

1747

1848
def _validate_params(request_json, required_params):
@@ -36,9 +66,9 @@ def ingestion_helper(request):
3666
return (validation_error, 400)
3767

3868
actionType = request_json['actionType']
39-
spanner = SpannerClient(_SPANNER_PROJECT_ID, _SPANNER_INSTANCE_ID,
40-
_SPANNER_DATABASE_ID)
41-
storage = StorageClient(_GCS_BUCKET_ID)
69+
spanner = SpannerClient(FLAGS.spanner_project_id, FLAGS.spanner_instance_id,
70+
FLAGS.spanner_database_id)
71+
storage = StorageClient(FLAGS.gcs_bucket_id)
4272

4373
if actionType == 'get_import_list':
4474
# Gets the list of imports that are ready for ingestion.
@@ -76,8 +106,8 @@ def ingestion_helper(request):
76106
import_list = request_json['importList']
77107
workflow_id = request_json['workflowId']
78108
job_id = request_json['jobId']
79-
metrics = import_utils.get_ingestion_metrics(_PROJECT_ID, _LOCATION,
80-
job_id)
109+
metrics = import_utils.get_ingestion_metrics(FLAGS.project_id,
110+
FLAGS.location, job_id)
81111
spanner.update_ingestion_status(import_list, workflow_id, job_id,
82112
metrics)
83113
return ('Updated ingestion status', 200)

import-automation/workflow/ingestion-helper/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ google-cloud-spanner
33
croniter
44
google-api-python-client
55
google-cloud-storage
6-
google-auth
6+
google-auth
7+
absl-py

import-automation/workflow/ingestion-helper/spanner_client.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import logging
216
from google.cloud import spanner
317
from google.cloud.spanner_v1 import Transaction
@@ -158,11 +172,13 @@ def _record(transaction: Transaction):
158172
# 2. Insert into the IngestionHistory table
159173
columns = [
160174
"CompletionTimestamp", "WorkflowExecutionID", "DataflowJobId",
161-
"IngestedImports", "NodeCount", "EdgeCount", "ObservationCount"
175+
"IngestedImports", "ExecutionTime", "NodeCount", "EdgeCount",
176+
"ObservationCount"
162177
]
163178
values = [[
164179
spanner.COMMIT_TIMESTAMP, workflow_id, job_id,
165-
succeeded_imports, metrics['node_count'], metrics['edge_count'],
180+
succeeded_imports, metrics['execution_time'],
181+
metrics['node_count'], metrics['edge_count'],
166182
metrics['obs_count']
167183
]]
168184
transaction.insert_or_update(table="IngestionHistory",

import-automation/workflow/ingestion-helper/storage_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
"""Storage client for the ingestion helper."""
216

317
import logging

0 commit comments

Comments
 (0)