Skip to content

Commit 41f497b

Browse files
author
Milo Hyben
committed
Refactoring ETL folder, preparing first draft of etl_post and etl_load functions.
1 parent 38ac796 commit 41f497b

File tree

7 files changed

+212
-1
lines changed

7 files changed

+212
-1
lines changed

etl/README.md

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# ETL
2+
3+
Cloud Functions for ETL.
4+
5+
## ETL_POST
6+
7+
etl_post function accepts json payload. It will insert new record in BigQuery table defined by env variable BIGQUERY_TABLE and pushes new message to PUBSUB_TOPIC
8+
9+
10+
## ETL_LOAD
11+
12+
etl_load function expects "request_id" in the payload. It is setup as push subscriber to PUBSUB_TOPIC.
13+
14+
15+
## How to test locally
16+
17+
### 1. Create BQ table "<YOUR-PROJECT-NAME>.metamist.etl-data"
18+
19+
### 2. Create TOPIC "projects/<YOUR-PROJECT-NAME>/topics/etl-topic"
20+
21+
### 3. Setup your environment
22+
23+
```bash
24+
# setup authentication
25+
gcloud auth application-default login
26+
export GOOGLE_APPLICATION_CREDENTIALS='/Users/<USERNAME>/.config/gcloud/application_default_credentials.json'
27+
28+
export BIGQUERY_TABLE=<YOUR-PROJECT-NAME>.metamist.etl-data
29+
export PUBSUB_TOPIC='projects/<YOUR-PROJECT-NAME>/topics/etl-topic'
30+
```
31+
32+
### 4. Setup python env
33+
34+
```bash
35+
cd post
36+
python3 -m venv env
37+
source env/bin/activate
38+
pip install -r requirements.txt
39+
```
40+
41+
### 5. Start POST Fun locally
42+
43+
```bash
44+
functions-framework-python --target etl_post --debug
45+
```
46+
47+
### 6. Call etl_post
48+
49+
```bash
50+
curl -X 'POST' \
51+
'http://localhost:8080/' \
52+
-H 'Content-Type: application/json' \
53+
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
54+
-d '{"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"}'
55+
```
56+
57+
Should return something like this:
58+
59+
```bash
60+
{
61+
"id": "76263e55-a869-4604-afe2-441d9c20221e",
62+
"success": true
63+
}
64+
```
65+
66+
### 7. Start LOAD Fun locally
67+
Repeat Step 4 inside folder load
68+
69+
```bash
70+
functions-framework-python --target etl_load --debug
71+
```
72+
73+
### 7. Call etl_load
74+
75+
Replace request_id with the id returned in Step 6
76+
77+
```bash
78+
curl -X 'POST' \
79+
'http://localhost:8080/' \
80+
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
81+
-d '{"request_id": "76263e55-a869-4604-afe2-441d9c20221e"}'
82+
```
83+
84+
Should return something like this:
85+
86+
```bash
87+
{
88+
"id": "76263e55-a869-4604-afe2-441d9c20221e",
89+
"success": true
90+
}
91+
```
92+
93+
94+
### 8. Deploy functions for testing on the cloud
95+
96+
```bash
97+
cd ../load
98+
99+
gcloud functions deploy etl_load \
100+
--gen2 \
101+
--runtime=python311 \
102+
--project=<YOUR-PROJECT-NAME> \
103+
--region=australia-southeast1 \
104+
--source=. \
105+
--entry-point=etl_load \
106+
--trigger-http \
107+
--set-env-vars BIGQUERY_TABLE='<YOUR-PROJECT-NAME>.metamist.etl-data'
108+
```
109+
110+
```bash
111+
cd ../post
112+
113+
gcloud functions deploy etl_post \
114+
--gen2 \
115+
--runtime=python311 \
116+
--project=<YOUR-PROJECT-NAME> \
117+
--region=australia-southeast1 \
118+
--source=. \
119+
--entry-point=etl_post \
120+
--trigger-http \
121+
--set-env-vars BIGQUERY_TABLE=<YOUR-PROJECT-NAME>.metamist.etl-data' \
122+
--set-env-vars PUBSUB_TOPIC='projects/<YOUR-PROJECT-NAME>/topics/my-topic'
123+
```
124+
125+
File renamed without changes.

etl/load/main.py

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import datetime
2+
import json
3+
import logging
4+
import os
5+
import uuid
6+
import functions_framework
7+
import flask
8+
import google.cloud.bigquery as bq
9+
10+
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
11+
12+
_BQ_CLIENT = bq.Client()
13+
14+
15+
@functions_framework.http
16+
def etl_load(request: flask.Request):
17+
"""HTTP Cloud Function.
18+
Args:
19+
request (flask.Request): The request object.
20+
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
21+
Returns:
22+
The response text, or any set of values that can be turned into a
23+
Response object using `make_response`
24+
<https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
25+
Note:
26+
For more information on how Flask integrates with Cloud
27+
Functions, see the `Writing HTTP functions` page.
28+
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
29+
30+
This function accepts Pub/Sub message, expected request_id in the payload:
31+
32+
{
33+
"request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699"
34+
}
35+
36+
"""
37+
38+
auth = request.authorization
39+
if not auth or not auth.token:
40+
return {'success': False, 'message': 'No auth token provided'}, 401
41+
42+
# if mimetype might not be set esp. when PubSub pushing from another topic,
43+
# try to force conversion and if fails just return None
44+
jbody = request.get_json(force=True, silent=True)
45+
46+
if callable(jbody):
47+
# request.json is it in reality, but the type checker is saying it's callable
48+
jbody = jbody()
49+
50+
request_id = jbody.get('request_id', None)
51+
# check if request_id present
52+
if not request_id:
53+
jbody_str = json.dumps(jbody)
54+
return {
55+
'success': False,
56+
'message': f'Missing or empty request_id: {jbody_str}',
57+
}, 400
58+
59+
# locate the request_id in bq
60+
query = f"""
61+
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
62+
"""
63+
query_params = [
64+
bq.ScalarQueryParameter("request_id", "STRING", request_id),
65+
]
66+
67+
job_config = bq.QueryJobConfig()
68+
job_config.query_parameters = query_params
69+
query_job = _BQ_CLIENT.query(query, job_config=job_config).result()
70+
71+
# should be only one record, look into loading multiple objects in one call?
72+
for row in query_job:
73+
# TODO
74+
# Parse row.body -> Model and upload to metamist database
75+
row_body = json.loads(row.body)
76+
logging.info(f'row_body {row_body}')
77+
78+
79+
return {'id': request_id, 'success': True}

etl/load/requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
flask
2+
functions_framework
3+
google-cloud-bigquery
4+
google-cloud-logging

etl/post/__init__.py

Whitespace-only changes.

etl/endpoint/main.py renamed to etl/post/main.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from cpg_utils.cloud import email_from_id_token
1212

13+
1314
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
1415
PUBSUB_TOPIC = os.getenv('PUBSUB_TOPIC')
1516

@@ -72,8 +73,10 @@ def etl_post(request: flask.Request):
7273
}, 500
7374

7475
# publish to pubsub
76+
# message contains all the attributes except body which can be large and already stored in BQ table
77+
pb_obj = {x: bq_obj[x] for x in bq_obj if x not in ['body']}
7578
try:
76-
_PUBSUB_CLIENT.publish(PUBSUB_TOPIC, json.dumps(bq_obj).encode())
79+
_PUBSUB_CLIENT.publish(PUBSUB_TOPIC, json.dumps(pb_obj).encode(), content_type='application/json')
7780
except Exception as e: # pylint: disable=broad-exception-caught
7881
logging.error(f'Failed to publish to pubsub: {e}')
7982

File renamed without changes.

0 commit comments

Comments
 (0)