Skip to content

Commit

Permalink
Merge branch 'dev' into etl-gui-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Oct 3, 2023
2 parents 256ef0b + 0df355e commit b7b6223
Show file tree
Hide file tree
Showing 38 changed files with 2,413 additions and 409 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 6.2.1
current_version = 6.3.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>[A-z0-9-]+)
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
- name: "Run unit tests"
id: runtests
run: |
coverage run -m unittest discover -s test/
coverage run -m unittest discover -p 'test*.py' -s '.'
rc=$?
coverage xml
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ web/src/__generated__

# mypy
.mypy_cache/

# pulumi config files
Pulumi*.yaml
2 changes: 1 addition & 1 deletion api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from api.settings import PROFILE_REQUESTS, SKIP_DATABASE_CONNECTION

# This tag is automatically updated by bump2version
_VERSION = '6.2.1'
_VERSION = '6.3.0'

logger = get_logger()

Expand Down
2 changes: 1 addition & 1 deletion deploy/python/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.2.1
6.3.0
133 changes: 133 additions & 0 deletions etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# ETL

Cloud Functions for ETL.

## ETL_POST

etl_post function accepts json payload. It will insert new record in BigQuery table defined by env variable BIGQUERY_TABLE and pushes new message to PUBSUB_TOPIC


## ETL_LOAD

etl_load function expects "request_id" in the payload. It is setup as push subscriber to PUBSUB_TOPIC.


## How to test locally

Please use your personal dev project as `$PROJECT_NAME`.

### 1. Setup your environment

```bash
# setup gcloud authentication
gcloud auth application-default login

export PROJECT_NAME="gcp-project-name"
export BIGQUERY_TABLE="$PROJECT_NAME.metamist.etl-data"
export BIGQUERY_LOG_TABLE="$PROJECT_NAME.metamist.etl-logs"
export PUBSUB_TOPIC="projects/$PROJECT_NAME/topics/etl-topic"

# setup to run local version of sample-metadata
export SM_ENVIRONMENT=local
```

### 2. Create BQ table "$PROJECT_NAME.metamist.etl-data"

### 3. Create TOPIC "projects/$PROJECT_NAME/topics/etl-topic"

### 4. Setup python env

```bash
cd post
python3 -m venv env
source env/bin/activate
pip install -r requirements.txt
```

### 5. Start EXTRACT Fun locally

```bash
functions-framework-python --target etl_extract --debug
```

### 6. Call etl_extract

```bash
curl -X 'POST' \
'http://localhost:8080/' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
-d '{"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"}'
```

Should return something like this:

```bash
{
"id": "76263e55-a869-4604-afe2-441d9c20221e",
"success": true
}
```

### 7. Start LOAD Fun locally

Repeat Step 4 inside folder load

```bash
functions-framework-python --target etl_load --debug
```

### 8. Call etl_load

Replace request_id with the id returned in Step 6

```bash
curl -X 'POST' \
'http://localhost:8080/' \
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
-d '{"request_id": "76263e55-a869-4604-afe2-441d9c20221e"}'
```

Should return something like this:

```bash
{
"id": "76263e55-a869-4604-afe2-441d9c20221e",
"success": true
}
```


### 9. Deploy functions for testing on the cloud

```bash
cd ../load

gcloud functions deploy etl_load \
--gen2 \
--runtime=python311 \
--project=$PROJECT_NAME \
--region=australia-southeast1 \
--source=. \
--entry-point=etl_load \
--trigger-http \
--no-allow-unauthenticated \
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
```

```bash
cd ../extract

gcloud functions deploy etl_extract \
--gen2 \
--runtime=python311 \
--project=$PROJECT_NAME \
--region=australia-southeast1 \
--source=. \
--entry-point=etl_post \
--trigger-http \
--no-allow-unauthenticated \
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
```
26 changes: 26 additions & 0 deletions etl/bq_log_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"name": "request_id",
"type": "STRING",
"mode": "REQUIRED",
"description": "Unique UUID for the row"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "Timestamp of processing the row"
},
{
"name": "status",
"type": "STRING",
"mode": "REQUIRED",
"description": "Status of the processing FAILED/SUCCESS"
},
{
"name": "details",
"type": "JSON",
"mode": "REQUIRED",
"description": "Output the processing"
}
]
File renamed without changes.
32 changes: 25 additions & 7 deletions etl/endpoint/main.py → etl/extract/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@
import functions_framework
import google.cloud.bigquery as bq
from cpg_utils.cloud import email_from_id_token

from google.cloud import pubsub_v1 # type: ignore

BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
PUBSUB_TOPIC = os.getenv('PUBSUB_TOPIC')

_BQ_CLIENT = bq.Client()
_PUBSUB_CLIENT = pubsub_v1.PublisherClient()


@functions_framework.http
def etl_post(request: flask.Request):
def etl_extract(request: flask.Request):
"""HTTP Cloud Function.
Args:
request (flask.Request): The request object.
Expand All @@ -31,6 +29,20 @@ def etl_post(request: flask.Request):
For more information on how Flask integrates with Cloud
Functions, see the `Writing HTTP functions` page.
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
Example of payload:
json_data = {
'sample_id': '123456',
'external_id': 'GRK100311',
'individual_id': '608',
'sequencing_type': 'exome',
'collection_centre': 'KCCG',
'collection_date': '2023-08-05T01:39:28.611476',
'collection_specimen': 'blood'
}
"""

auth = request.authorization
Expand All @@ -40,6 +52,7 @@ def etl_post(request: flask.Request):
request_id = str(uuid.uuid4())

jbody = request.json

if callable(jbody):
# request.json is it in reality, but the type checker is saying it's callable
jbody = jbody()
Expand All @@ -59,11 +72,14 @@ def etl_post(request: flask.Request):
'timestamp': datetime.datetime.utcnow().isoformat(),
'type': request.path,
'submitting_user': email_from_id_token(auth.token),
'body': jbody_str,
}

# throw an exception if one occurs
errors = _BQ_CLIENT.insert_rows_json(BIGQUERY_TABLE, [bq_obj])
bq_client = bq.Client()
pubsub_client = pubsub_v1.PublisherClient()

errors = bq_client.insert_rows_json(BIGQUERY_TABLE, [bq_obj | {'body': jbody_str}])

if errors:
return {
'success': False,
Expand All @@ -72,8 +88,10 @@ def etl_post(request: flask.Request):
}, 500

# publish to pubsub
# message contains all the attributes except body which can be large
# and already stored in BQ table
try:
_PUBSUB_CLIENT.publish(PUBSUB_TOPIC, json.dumps(bq_obj).encode())
pubsub_client.publish(PUBSUB_TOPIC, json.dumps(bq_obj).encode())
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')

Expand Down
File renamed without changes.
Empty file added etl/load/__init__.py
Empty file.
Loading

0 comments on commit b7b6223

Please sign in to comment.