Skip to content

Commit a7979ed

Browse files
milo-hybenMilo Hyben
and
Milo Hyben
authored
Create a cloud function that processes an event from cloud pubsub, and transforms the JSON data using a parser, and upserts that data into metamist. (#532)
* Refactoring ETL folder, preparing first draft of etl_post and etl_load functions. * Sample Json Parser sceleton class. * Prepared test file for SampleJsonParser, defined simple parse function. * Fixing linting issues. * Added pytest to requirements-dev.txt. * Fixing small linting issues. * Update metamist_infrastructure setup to contain new cloud function. * Rename post to extract, implement Pub/Sub functionality in the ETL driver. * ETL - load func implemented pub/sub type of payload, cleanup driver class. * Implement simple unit tests for etl/load and etl/extract. * Update setup and add docstring to test functions. * Move ETL unit tests to own directory and update workflow script to include them. * Update readme for metamist_infra, update requirements-dev and ignore to be able to test pulumi up command on local project. * Remove cpg_infra dependency from requirements-dev. * Implement slack notification, update readme. * Integrated ETL load function with GenericMetadataParser, using temp metamist.tar.gz approach before new version of metamist is released. * Updated unit tests for etl functions, added mock for GenericMetadataParser. * Added metamist.tar.gz file to cloud function assets. * Integrated with DEV metamist, using dry_run=True for time being. * Fixed linting issues. * ETL load function - updating async run * Kept parser original formatting. * Added comments and small changes to reflect the pull request feedback. * Added etl log table, implemented matching parser to url path. * Updade ETL load to load available parser dynamically, fully implemented detailed Slack messaging. * Fixed linting issues. * Implemented entry points for metamist_parser, added ability to include extra private python repos in gc functions. * Linting * Added default ETL load config. * Resolving conflicts with dev branch. * Fixing linting issues after merging from dev. * Actioned pull request feedback. * Added more comments regarding serverless-robot-prod account. * bump mypy.ini to 3.11, small cleanup. * revert back mypy to 3.10. * bumping metamist_infra version, fixing import isssues, using new cpg_infra archive_folder implementation. * Integrate with latest cpg-infra changes, esp. constructing private repo url from ArtifactRegistry. * Cleanup unused code. * Makong Enums as string. * Changes as per review request. * Bump version: 6.2.1 → 6.3.0 * Removing metamist zip. * Regenerating package lock. --------- Co-authored-by: Milo Hyben <[email protected]>
1 parent f5aacdb commit a7979ed

33 files changed

+1957
-133
lines changed

.bumpversion.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 6.2.1
2+
current_version = 6.3.0
33
commit = True
44
tag = False
55
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>[A-z0-9-]+)

.github/workflows/test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ jobs:
6666
- name: "Run unit tests"
6767
id: runtests
6868
run: |
69-
coverage run -m unittest discover -s test/
69+
coverage run -m unittest discover -p 'test*.py' -s '.'
7070
rc=$?
7171
coverage xml
7272

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,6 @@ web/src/__generated__
5656

5757
# mypy
5858
.mypy_cache/
59+
60+
# pulumi config files
61+
Pulumi*.yaml

api/server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from api.settings import PROFILE_REQUESTS, SKIP_DATABASE_CONNECTION
2121

2222
# This tag is automatically updated by bump2version
23-
_VERSION = '6.2.1'
23+
_VERSION = '6.3.0'
2424

2525
logger = get_logger()
2626

deploy/python/version.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.2.1
1+
6.3.0

etl/README.md

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# ETL
2+
3+
Cloud Functions for ETL.
4+
5+
## ETL_POST
6+
7+
etl_post function accepts json payload. It will insert new record in BigQuery table defined by env variable BIGQUERY_TABLE and pushes new message to PUBSUB_TOPIC
8+
9+
10+
## ETL_LOAD
11+
12+
etl_load function expects "request_id" in the payload. It is setup as push subscriber to PUBSUB_TOPIC.
13+
14+
15+
## How to test locally
16+
17+
Please use your personal dev project as `$PROJECT_NAME`.
18+
19+
### 1. Setup your environment
20+
21+
```bash
22+
# setup gcloud authentication
23+
gcloud auth application-default login
24+
25+
export PROJECT_NAME="gcp-project-name"
26+
export BIGQUERY_TABLE="$PROJECT_NAME.metamist.etl-data"
27+
export BIGQUERY_LOG_TABLE="$PROJECT_NAME.metamist.etl-logs"
28+
export PUBSUB_TOPIC="projects/$PROJECT_NAME/topics/etl-topic"
29+
30+
# setup to run local version of sample-metadata
31+
export SM_ENVIRONMENT=local
32+
```
33+
34+
### 2. Create BQ table "$PROJECT_NAME.metamist.etl-data"
35+
36+
### 3. Create TOPIC "projects/$PROJECT_NAME/topics/etl-topic"
37+
38+
### 4. Setup python env
39+
40+
```bash
41+
cd post
42+
python3 -m venv env
43+
source env/bin/activate
44+
pip install -r requirements.txt
45+
```
46+
47+
### 5. Start EXTRACT Fun locally
48+
49+
```bash
50+
functions-framework-python --target etl_extract --debug
51+
```
52+
53+
### 6. Call etl_extract
54+
55+
```bash
56+
curl -X 'POST' \
57+
'http://localhost:8080/' \
58+
-H 'Content-Type: application/json' \
59+
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
60+
-d '{"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"}'
61+
```
62+
63+
Should return something like this:
64+
65+
```bash
66+
{
67+
"id": "76263e55-a869-4604-afe2-441d9c20221e",
68+
"success": true
69+
}
70+
```
71+
72+
### 7. Start LOAD Fun locally
73+
74+
Repeat Step 4 inside folder load
75+
76+
```bash
77+
functions-framework-python --target etl_load --debug
78+
```
79+
80+
### 8. Call etl_load
81+
82+
Replace request_id with the id returned in Step 6
83+
84+
```bash
85+
curl -X 'POST' \
86+
'http://localhost:8080/' \
87+
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
88+
-d '{"request_id": "76263e55-a869-4604-afe2-441d9c20221e"}'
89+
```
90+
91+
Should return something like this:
92+
93+
```bash
94+
{
95+
"id": "76263e55-a869-4604-afe2-441d9c20221e",
96+
"success": true
97+
}
98+
```
99+
100+
101+
### 9. Deploy functions for testing on the cloud
102+
103+
```bash
104+
cd ../load
105+
106+
gcloud functions deploy etl_load \
107+
--gen2 \
108+
--runtime=python311 \
109+
--project=$PROJECT_NAME \
110+
--region=australia-southeast1 \
111+
--source=. \
112+
--entry-point=etl_load \
113+
--trigger-http \
114+
--no-allow-unauthenticated \
115+
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
116+
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
117+
```
118+
119+
```bash
120+
cd ../extract
121+
122+
gcloud functions deploy etl_extract \
123+
--gen2 \
124+
--runtime=python311 \
125+
--project=$PROJECT_NAME \
126+
--region=australia-southeast1 \
127+
--source=. \
128+
--entry-point=etl_post \
129+
--trigger-http \
130+
--no-allow-unauthenticated \
131+
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
132+
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
133+
```

etl/bq_log_schema.json

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[
2+
{
3+
"name": "request_id",
4+
"type": "STRING",
5+
"mode": "REQUIRED",
6+
"description": "Unique UUID for the row"
7+
},
8+
{
9+
"name": "timestamp",
10+
"type": "TIMESTAMP",
11+
"mode": "REQUIRED",
12+
"description": "Timestamp of processing the row"
13+
},
14+
{
15+
"name": "status",
16+
"type": "STRING",
17+
"mode": "REQUIRED",
18+
"description": "Status of the processing FAILED/SUCCESS"
19+
},
20+
{
21+
"name": "details",
22+
"type": "JSON",
23+
"mode": "REQUIRED",
24+
"description": "Output the processing"
25+
}
26+
]
File renamed without changes.

etl/endpoint/main.py renamed to etl/extract/main.py

+25-7
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,15 @@
88
import functions_framework
99
import google.cloud.bigquery as bq
1010
from cpg_utils.cloud import email_from_id_token
11+
1112
from google.cloud import pubsub_v1 # type: ignore
1213

1314
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
1415
PUBSUB_TOPIC = os.getenv('PUBSUB_TOPIC')
1516

16-
_BQ_CLIENT = bq.Client()
17-
_PUBSUB_CLIENT = pubsub_v1.PublisherClient()
18-
1917

2018
@functions_framework.http
21-
def etl_post(request: flask.Request):
19+
def etl_extract(request: flask.Request):
2220
"""HTTP Cloud Function.
2321
Args:
2422
request (flask.Request): The request object.
@@ -31,6 +29,20 @@ def etl_post(request: flask.Request):
3129
For more information on how Flask integrates with Cloud
3230
Functions, see the `Writing HTTP functions` page.
3331
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
32+
33+
34+
Example of payload:
35+
36+
json_data = {
37+
'sample_id': '123456',
38+
'external_id': 'GRK100311',
39+
'individual_id': '608',
40+
'sequencing_type': 'exome',
41+
'collection_centre': 'KCCG',
42+
'collection_date': '2023-08-05T01:39:28.611476',
43+
'collection_specimen': 'blood'
44+
}
45+
3446
"""
3547

3648
auth = request.authorization
@@ -40,6 +52,7 @@ def etl_post(request: flask.Request):
4052
request_id = str(uuid.uuid4())
4153

4254
jbody = request.json
55+
4356
if callable(jbody):
4457
# request.json is it in reality, but the type checker is saying it's callable
4558
jbody = jbody()
@@ -59,11 +72,14 @@ def etl_post(request: flask.Request):
5972
'timestamp': datetime.datetime.utcnow().isoformat(),
6073
'type': request.path,
6174
'submitting_user': email_from_id_token(auth.token),
62-
'body': jbody_str,
6375
}
6476

6577
# throw an exception if one occurs
66-
errors = _BQ_CLIENT.insert_rows_json(BIGQUERY_TABLE, [bq_obj])
78+
bq_client = bq.Client()
79+
pubsub_client = pubsub_v1.PublisherClient()
80+
81+
errors = bq_client.insert_rows_json(BIGQUERY_TABLE, [bq_obj | {'body': jbody_str}])
82+
6783
if errors:
6884
return {
6985
'success': False,
@@ -72,8 +88,10 @@ def etl_post(request: flask.Request):
7288
}, 500
7389

7490
# publish to pubsub
91+
# message contains all the attributes except body which can be large
92+
# and already stored in BQ table
7593
try:
76-
_PUBSUB_CLIENT.publish(PUBSUB_TOPIC, json.dumps(bq_obj).encode())
94+
pubsub_client.publish(PUBSUB_TOPIC, json.dumps(bq_obj).encode())
7795
except Exception as e: # pylint: disable=broad-exception-caught
7896
logging.error(f'Failed to publish to pubsub: {e}')
7997

File renamed without changes.

etl/load/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)