Skip to content

Commit

Permalink
Added default ETL load config.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Sep 18, 2023
1 parent 7747f12 commit c93661a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
12 changes: 9 additions & 3 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE')
NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC')
DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}')


def call_parser(parser_obj, row_json):
Expand All @@ -38,7 +39,7 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to parse: {e}')
# add to the output
res.append(e)
res.append(f'Failed to parse: {e}')
status.append('FAILED')

asyncio.run(run_parser_capture_result(parser_obj, row_json, tmp_res, tmp_status))
Expand All @@ -58,8 +59,13 @@ def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
# get config from payload or use default
config_data = row_json.get('config')

# get config from payload and merge with the default
config_data = json.loads(DEFAULT_LOAD_CONFIG)
payload_config_data = row_json.get('config')
if payload_config_data:
config_data.update(payload_config_data)

# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

Expand Down
31 changes: 18 additions & 13 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import contextlib
import os
import json
from functools import cached_property
from pathlib import Path

Expand All @@ -26,7 +27,7 @@
PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json'


# TODO: update implementation in cpg_infra project to enable binary files & private repos?
# TODO: update implementation in cpg_infra to enable binary files & private repos?
def append_private_repositories_to_requirements(
file_content: str, private_repo_url: str, private_repos: str
) -> str:
Expand Down Expand Up @@ -83,11 +84,21 @@ def main(self):
# todo, eventually configure metamist cloud run server
# to be deployed here, but for now it's manually deployed

# TODO: the following should be added to SampleMetadataConfig
self.extra_sample_metadata_config = {
# TODO: the following should be added to SampleMetadataConfig in cpg_infra
self.extra_sample_metadata_config = { # pylint: disable=attribute-defined-outside-init
'private_repo_url': 'https://australia-southeast1-python.pkg.dev/milo-dev-396001/python-repo/simple',
'private_repos': 'metamist_private',
'environment': 'DEVELOPMENT',
'etl_load_default_config': {
# Order of config overides:
# 1. parser default config values
# 2. etl_load_default_config
# 3. config from payload
'project': 'milo-dev',
'default_sequencing_type': 'genome',
'default_sequencing_technology': 'long-read',
'default_sample_type': 'blood',
},
}
self._setup_etl()

Expand Down Expand Up @@ -282,10 +293,7 @@ def etl_pubsub_push_subscription(self):
opts=pulumi.ResourceOptions(
depends_on=[
self._svc_pubsub,
self.etl_pubsub_topic,
self.etl_load_function,
self.etl_pubsub_dead_letter_subscription,
self.etl_extract_service_account,
]
),
)
Expand All @@ -312,9 +320,6 @@ def etl_pubsub_dead_letter_subscription(self):
topic=self.etl_pubsub_dead_letters_topic.name,
project=self.config.sample_metadata.gcp.project,
ack_deadline_seconds=20,
opts=pulumi.ResourceOptions(
depends_on=[self.etl_pubsub_dead_letters_topic]
),
)

@cached_property
Expand Down Expand Up @@ -353,9 +358,6 @@ def _setup_bq_table(self, schema_file_name: str, table_name: str):
# docs say: Note: On newer versions of the provider,
# you must explicitly set
deletion_protection=False,
opts=pulumi.ResourceOptions(
depends_on=[self.etl_bigquery_dataset],
),
)
return etl_table

Expand Down Expand Up @@ -568,6 +570,10 @@ def _etl_function(self, f_name: str, sa: object):
else '',
# TODO replace with metamist config, once it's available
'SM_ENVIRONMENT': self.extra_sample_metadata_config['environment'],
# TODO replace with etl_load_default_config config, once it's available
'DEFAULT_LOAD_CONFIG': json.dumps(
self.extra_sample_metadata_config['etl_load_default_config']
),
},
ingress_settings='ALLOW_ALL',
all_traffic_on_latest_revision=True,
Expand All @@ -580,7 +586,6 @@ def _etl_function(self, f_name: str, sa: object):
self._svc_functions,
self._svc_build,
sa,
self.etl_slack_notification_topic,
]
),
)
Expand Down
2 changes: 1 addition & 1 deletion metamist_infrastructure/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
description='Metamist infrastructure plugin for cpg-infrastructure',
long_description=readme,
long_description_content_type='text/markdown',
url=f'https://github.com/populationgenomics/sample-metadata',
url='https://github.com/populationgenomics/sample-metadata',
license='MIT',
packages=[
'metamist_infrastructure',
Expand Down
22 changes: 11 additions & 11 deletions metamist_infrastructure/slack_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def notification_cloudfun(self):
name=f'{self.topic_name}-func',
build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs(
runtime='python311',
entry_point=f'etl_notify',
entry_point='etl_notify',
environment_variables={},
source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs(
storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs(
Expand Down Expand Up @@ -263,13 +263,13 @@ def notification_pubsub_push_subscription(self):
},
),
project=self.config.project_name,
opts=pulumi.ResourceOptions(
depends_on=[
self.notification_pubsub_topic,
self.notification_cloudfun,
self.notification_dead_letters_pubsub_topic,
]
),
# opts=pulumi.ResourceOptions(
# depends_on=[
# self.notification_pubsub_topic,
# self.notification_cloudfun,
# self.notification_dead_letters_pubsub_topic,
# ]
# ),
)

# give subscriber permission to service account
Expand Down Expand Up @@ -330,9 +330,9 @@ def notification_dead_letters_pubsub_subscription(self):
topic=self.notification_dead_letters_pubsub_topic.name,
project=self.config.project_name,
ack_deadline_seconds=20,
opts=pulumi.ResourceOptions(
depends_on=[self.notification_dead_letters_pubsub_topic]
),
# opts=pulumi.ResourceOptions(
# depends_on=[self.notification_dead_letters_pubsub_topic]
# ),
)

def setup_notification(self):
Expand Down

0 comments on commit c93661a

Please sign in to comment.