Skip to content

Commit 982ba01

Browse files
committed
Address reviewer comments
1 parent e1045ef commit 982ba01

10 files changed

Lines changed: 114 additions & 23 deletions

File tree

import-automation/executor/app/executor/import_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,9 +822,10 @@ def _import_one_helper(
822822
self.config.file_download_timeout)
823823

824824
output_dir = f'{relative_import_dir}/{import_name}'
825-
version = _clean_time(utils.pacific_time())
826825
if version == 'DATE_VERSION_PLACEHOLDER':
827826
version = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
827+
else:
828+
version = _clean_time(utils.pacific_time())
828829
import_summary.latest_version = 'gs://' + os.path.join(
829830
self.config.storage_prod_bucket_name, output_dir, version, '*', '*',
830831
'*.mcf')

import-automation/executor/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def run_import_job(absolute_import_name: str, import_config: str):
110110
import_config)
111111
logging.info(f'Import config: {config}')
112112
if config.dc_api_key:
113-
os.environ['DC_API_KEY'] = config.dc_api_key
113+
os.environ['AUTOPUSH_DC_API_KEY'] = config.dc_api_key
114114
executor = import_executor.ImportExecutor(
115115
uploader=file_uploader.GCSFileUploader(
116116
project_id=config.gcs_project_id,

import-automation/executor/update_import_version.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ if [ "$#" -ne 3 ]; then
1212
exit 1
1313
fi
1414

15+
# Deployed using import-automation/workflow/cloudbuild.yaml
1516
FUNCTION_URL="https://spanner-ingestion-helper-965988403328.us-central1.run.app"
1617
IMPORT_NAME=$1
1718
VERSION=$2

import-automation/workflow/cloudbuild.yaml

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
# gcloud builds submit --config=cloudbuild.yaml . --project=datcom-import-automation-prod
22

3+
substitutions:
4+
_PROJECT_ID: 'datcom-import-automation-prod'
5+
_SPANNER_PROJECT_ID: 'datcom-store'
6+
_SPANNER_INSTANCE_ID: 'dc-kg-test'
7+
_SPANNER_DATABASE_ID: 'dc_graph_import'
8+
_GCS_BUCKET_ID: 'datcom-prod-imports'
9+
_LOCATION: 'us-central1'
10+
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
11+
312
steps:
413
- id: 'import-automation-workflow'
514
name: 'gcr.io/cloud-builders/gcloud'
6-
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml']
15+
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
716

817
- id: 'spanner-ingestion-workflow'
918
name: 'gcr.io/cloud-builders/gcloud'
10-
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml']
19+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
1120

1221
- id: 'spanner-ingestion-helper'
1322
name: 'gcr.io/cloud-builders/gcloud'
14-
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper']
23+
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
1524

1625
- id: 'import-automation-helper'
1726
name: 'gcr.io/cloud-builders/gcloud'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ main:
44
- init:
55
assign:
66
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
7-
- region: "us-central1"
7+
- region: ${sys.get_env("LOCATION")}
88
- imageUri: "gcr.io/datcom-ci/dc-import-executor:stable"
99
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
1010
- importName: ${args.importName}
1111
- importConfig: ${args.importConfig}
12-
- gcsMountBucket: "datcom-volume-mount"
13-
- gcsImportBucket: "datcom-prod-imports"
12+
- gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")}
13+
- gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")}
1414
- gcsMountPath: "/tmp/gcs"
1515
- function_url: 'https://spanner-ingestion-helper-965988403328.us-central1.run.app'
1616
- startTime: ${sys.now()}

import-automation/workflow/ingestion-helper/import_utils.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
"""Utility functions for the ingestion helper."""
216

317
import logging
@@ -22,15 +36,14 @@ def get_caller_identity(request):
2236
unverified_claims = {}
2337
try:
2438
unverified_claims = jwt.decode(token, verify=False)
25-
#logging.info(f"Token claims (unverified): iss={unverified_claims.get('iss')}, aud={unverified_claims.get('aud')}, email={unverified_claims.get('email')}")
26-
logging.warning(
27-
f"Could not decode unverified token for debugging: {debug_e}"
28-
)
2939
id_info = id_token.verify_oauth2_token(token,
3040
requests.Request())
3141
return id_info.get('email', 'unknown_email')
3242
except Exception as e:
3343
if unverified_claims:
44+
logging.warning(
45+
f"Could not decode unverified token for debugging: {e}"
46+
)
3447
email = unverified_claims.get('email', 'unknown_email')
3548
return f"{email} (unverified)"
3649
return 'decode_error'

import-automation/workflow/ingestion-helper/main.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,56 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import functions_framework
216
from spanner_client import SpannerClient
317
from storage_client import StorageClient
418
from flask import jsonify
519
import logging
620
import import_utils
21+
import os
22+
import sys
23+
from absl import flags
724

825
logging.getLogger().setLevel(logging.INFO)
926

10-
_PROJECT_ID = 'datcom-import-automation-prod'
11-
_SPANNER_PROJECT_ID = 'datcom-store'
12-
_SPANNER_INSTANCE_ID = 'dc-kg-test'
13-
_SPANNER_DATABASE_ID = 'dc_graph_import'
14-
_GCS_BUCKET_ID = 'datcom-prod-imports'
15-
_LOCATION = 'us-central1'
27+
FLAGS = flags.FLAGS
28+
29+
if 'project_id' not in FLAGS:
30+
flags.DEFINE_string('project_id',
31+
os.environ.get('PROJECT_ID',
32+
'datcom-import-automation-prod'),
33+
'GCP Project ID')
34+
flags.DEFINE_string('spanner_project_id',
35+
os.environ.get('SPANNER_PROJECT_ID', 'datcom-store'),
36+
'Spanner Project ID')
37+
flags.DEFINE_string('spanner_instance_id',
38+
os.environ.get('SPANNER_INSTANCE_ID', 'dc-kg-test'),
39+
'Spanner Instance ID')
40+
flags.DEFINE_string('spanner_database_id',
41+
os.environ.get('SPANNER_DATABASE_ID', 'dc_graph_import'),
42+
'Spanner Database ID')
43+
flags.DEFINE_string('gcs_bucket_id',
44+
os.environ.get('GCS_BUCKET_ID', 'datcom-prod-imports'),
45+
'GCS Bucket ID')
46+
flags.DEFINE_string('location',
47+
os.environ.get('LOCATION', 'us-central1'),
48+
'GCP Location')
49+
50+
# Parse flags to ensure they are accessible.
51+
# Use known_only=True to avoid conflicts with framework arguments.
52+
if not FLAGS.is_parsed():
53+
FLAGS(sys.argv, known_only=True)
1654

1755

1856
def _validate_params(request_json, required_params):
@@ -36,9 +74,9 @@ def ingestion_helper(request):
3674
return (validation_error, 400)
3775

3876
actionType = request_json['actionType']
39-
spanner = SpannerClient(_SPANNER_PROJECT_ID, _SPANNER_INSTANCE_ID,
40-
_SPANNER_DATABASE_ID)
41-
storage = StorageClient(_GCS_BUCKET_ID)
77+
spanner = SpannerClient(FLAGS.spanner_project_id, FLAGS.spanner_instance_id,
78+
FLAGS.spanner_database_id)
79+
storage = StorageClient(FLAGS.gcs_bucket_id)
4280

4381
if actionType == 'get_import_list':
4482
# Gets the list of imports that are ready for ingestion.
@@ -76,7 +114,7 @@ def ingestion_helper(request):
76114
import_list = request_json['importList']
77115
workflow_id = request_json['workflowId']
78116
job_id = request_json['jobId']
79-
metrics = import_utils.get_ingestion_metrics(_PROJECT_ID, _LOCATION,
117+
metrics = import_utils.get_ingestion_metrics(FLAGS.project_id, FLAGS.location,
80118
job_id)
81119
spanner.update_ingestion_status(import_list, workflow_id, job_id,
82120
metrics)

import-automation/workflow/ingestion-helper/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ google-cloud-spanner
33
croniter
44
google-api-python-client
55
google-cloud-storage
6-
google-auth
6+
google-auth
7+
absl-py

import-automation/workflow/ingestion-helper/spanner_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import logging
216
from google.cloud import spanner
317
from google.cloud.spanner_v1 import Transaction

import-automation/workflow/ingestion-helper/storage_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
"""Storage client for the ingestion helper."""
216

317
import logging

0 commit comments

Comments
 (0)