Skip to content

Commit

Permalink
Fixing linting issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
Milo Hyben committed Aug 16, 2023
1 parent 485f603 commit fca79d9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 40 deletions.
29 changes: 15 additions & 14 deletions etl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ etl_load function expects "request_id" in the payload. It is setup as push subsc

## How to test locally

### 1. Create BQ table "<YOUR-PROJECT-NAME>.metamist.etl-data"
### 2. Create TOPIC "projects/<YOUR-PROJECT-NAME>/topics/etl-topic"
### 1. Create BQ table "$PROJECT_NAME.metamist.etl-data"

### 2. Create TOPIC "projects/$PROJECT_NAME/topics/etl-topic"

### 3. Setup your environment

Expand All @@ -25,8 +25,10 @@ etl_load function expects "request_id" in the payload. It is setup as push subsc
gcloud auth application-default login
export GOOGLE_APPLICATION_CREDENTIALS='/Users/<USERNAME>/.config/gcloud/application_default_credentials.json'

export BIGQUERY_TABLE=<YOUR-PROJECT-NAME>.metamist.etl-data
export PUBSUB_TOPIC='projects/<YOUR-PROJECT-NAME>/topics/etl-topic'
export PROJECT_NAME='some_name'
export BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data'
export PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/etl-topic'

```

### 4. Setup python env
Expand Down Expand Up @@ -64,13 +66,14 @@ Should return something like this:
```

### 7. Start LOAD Fun locally

Repeat Step 4 inside folder load

```bash
functions-framework-python --target etl_load --debug
```

### 7. Call etl_load
### 8. Call etl_load

Replace request_id with the id returned in Step 6

Expand All @@ -91,20 +94,20 @@ Should return something like this:
```


### 8. Deploy functions for testing on the cloud
### 9. Deploy functions for testing on the cloud

```bash
cd ../load

gcloud functions deploy etl_load \
--gen2 \
--runtime=python311 \
--project=<YOUR-PROJECT-NAME> \
--project='$PROJECT_NAME' \
--region=australia-southeast1 \
--source=. \
--entry-point=etl_load \
--trigger-http \
--set-env-vars BIGQUERY_TABLE='<YOUR-PROJECT-NAME>.metamist.etl-data'
--set-env-vars BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data'
```

```bash
Expand All @@ -113,13 +116,11 @@ cd ../post
gcloud functions deploy etl_post \
--gen2 \
--runtime=python311 \
--project=<YOUR-PROJECT-NAME> \
--project='$PROJECT_NAME' \
--region=australia-southeast1 \
--source=. \
--entry-point=etl_post \
--trigger-http \
--set-env-vars BIGQUERY_TABLE=<YOUR-PROJECT-NAME>.metamist.etl-data' \
--set-env-vars PUBSUB_TOPIC='projects/<YOUR-PROJECT-NAME>/topics/my-topic'
--set-env-vars BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data' \
--set-env-vars PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/my-topic'
```
29 changes: 13 additions & 16 deletions etl/load/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import datetime
import json
import logging
import os
import uuid
import functions_framework
import flask
import google.cloud.bigquery as bq
Expand All @@ -26,23 +24,23 @@ def etl_load(request: flask.Request):
For more information on how Flask integrates with Cloud
Functions, see the `Writing HTTP functions` page.
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
This function accepts Pub/Sub message, expected request_id in the payload:
{
"request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699"
}
"""

auth = request.authorization
if not auth or not auth.token:
return {'success': False, 'message': 'No auth token provided'}, 401
# if mimetype might not be set esp. when PubSub pushing from another topic,

# if mimetype might not be set esp. when PubSub pushing from another topic,
# try to force conversion and if fails just return None
jbody = request.get_json(force=True, silent=True)

if callable(jbody):
# request.json is it in reality, but the type checker is saying it's callable
jbody = jbody()
Expand All @@ -55,25 +53,24 @@ def etl_load(request: flask.Request):
'success': False,
'message': f'Missing or empty request_id: {jbody_str}',
}, 400

# locate the request_id in bq
query = f"""
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
"""
query_params = [
bq.ScalarQueryParameter("request_id", "STRING", request_id),
bq.ScalarQueryParameter('request_id', 'STRING', request_id),
]

job_config = bq.QueryJobConfig()
job_config.query_parameters = query_params
query_job = _BQ_CLIENT.query(query, job_config=job_config).result()

# should be only one record, look into loading multiple objects in one call?
for row in query_job:
# TODO
# Parse row.body -> Model and upload to metamist database
row_body = json.loads(row.body)
logging.info(f'row_body {row_body}')



return {'id': request_id, 'success': True}
8 changes: 3 additions & 5 deletions metamist/parser/sample_json_parser.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#!/usr/bin/env python3
# pylint: disable=too-many-instance-attributes,too-many-locals,unused-argument,wrong-import-order,unused-argument
from typing import List
import logging

from metamist.parser.generic_metadata_parser import (
GenericMetadataParser
)
from metamist.parser.generic_parser import SingleRow

logger = logging.getLogger(__file__)
logger.addHandler(logging.StreamHandler())
Expand All @@ -31,12 +29,12 @@ class SampleJsonColumns:
MEASUREMENT = 'measurement'
OBSERVATION = 'observation'
RECEIPT_DATE = 'receipt_date'

@staticmethod
def participant_meta_map():
"""Participant meta map"""
return {}

@staticmethod
def sequence_meta_map():
"""Columns that will be put into sequence.meta"""
Expand Down Expand Up @@ -68,4 +66,4 @@ def __init__(
async def parse(
self, record: str, confirm=False, dry_run=False
):
raise NotImplementedError('TO BE IMPLEMENTED')
raise NotImplementedError('TO BE IMPLEMENTED')
9 changes: 4 additions & 5 deletions test/test_parse_sample_json.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from test.testbase import DbIsolatedTest, run_as_sync
import pytest

from metamist.parser.sample_json_parser import SampleJsonParser

Expand All @@ -12,7 +12,7 @@ async def test_empty_json(self):
"""
Test empty json
"""

empty_record = {}

parser = SampleJsonParser(
Expand All @@ -22,9 +22,8 @@ async def test_empty_json(self):
# TODO
# check the output of parse fun
# for time being check for Exception

with pytest.raises(NotImplementedError):
result = await parser.parse(
await parser.parse(
empty_record, dry_run=True
)

0 comments on commit fca79d9

Please sign in to comment.