diff --git a/etl/load/main.py b/etl/load/main.py index 750a37d55..9cc27b170 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -18,6 +18,7 @@ BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE') NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC') +DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}') def call_parser(parser_obj, row_json): @@ -38,7 +39,7 @@ async def run_parser_capture_result(parser_obj, row_data, res, status): except Exception as e: # pylint: disable=broad-exception-caught logging.error(f'Failed to parse: {e}') # add to the output - res.append(e) + res.append(f'Failed to parse: {e}') status.append('FAILED') asyncio.run(run_parser_capture_result(parser_obj, row_json, tmp_res, tmp_status)) @@ -58,8 +59,13 @@ def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_ # sample_type should be in the format /ParserName/Version e.g.: /bbv/v1 row_json = json.loads(row.body) - # get config from payload or use default - config_data = row_json.get('config') + + # get config from payload and merge with the default + config_data = json.loads(DEFAULT_LOAD_CONFIG) + payload_config_data = row_json.get('config') + if payload_config_data: + config_data.update(payload_config_data) + # get data from payload or use payload as data record_data = row_json.get('data', row_json) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 252b84fd6..b569db5b3 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -5,6 +5,7 @@ """ import contextlib import os +import json from functools import cached_property from pathlib import Path @@ -26,7 +27,7 @@ PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json' -# TODO: update implementation in cpg_infra project to enable binary files & private repos? +# TODO: update implementation in cpg_infra to enable binary files & private repos? def append_private_repositories_to_requirements( file_content: str, private_repo_url: str, private_repos: str ) -> str: @@ -83,11 +84,21 @@ def main(self): # todo, eventually configure metamist cloud run server # to be deployed here, but for now it's manually deployed - # TODO: the following should be added to SampleMetadataConfig - self.extra_sample_metadata_config = { + # TODO: the following should be added to SampleMetadataConfig in cpg_infra + self.extra_sample_metadata_config = { # pylint: disable=attribute-defined-outside-init 'private_repo_url': 'https://australia-southeast1-python.pkg.dev/milo-dev-396001/python-repo/simple', 'private_repos': 'metamist_private', 'environment': 'DEVELOPMENT', + 'etl_load_default_config': { + # Order of config overides: + # 1. parser default config values + # 2. etl_load_default_config + # 3. config from payload + 'project': 'milo-dev', + 'default_sequencing_type': 'genome', + 'default_sequencing_technology': 'long-read', + 'default_sample_type': 'blood', + }, } self._setup_etl() @@ -282,10 +293,7 @@ def etl_pubsub_push_subscription(self): opts=pulumi.ResourceOptions( depends_on=[ self._svc_pubsub, - self.etl_pubsub_topic, - self.etl_load_function, self.etl_pubsub_dead_letter_subscription, - self.etl_extract_service_account, ] ), ) @@ -312,9 +320,6 @@ def etl_pubsub_dead_letter_subscription(self): topic=self.etl_pubsub_dead_letters_topic.name, project=self.config.sample_metadata.gcp.project, ack_deadline_seconds=20, - opts=pulumi.ResourceOptions( - depends_on=[self.etl_pubsub_dead_letters_topic] - ), ) @cached_property @@ -353,9 +358,6 @@ def _setup_bq_table(self, schema_file_name: str, table_name: str): # docs say: Note: On newer versions of the provider, # you must explicitly set deletion_protection=False, - opts=pulumi.ResourceOptions( - depends_on=[self.etl_bigquery_dataset], - ), ) return etl_table @@ -568,6 +570,10 @@ def _etl_function(self, f_name: str, sa: object): else '', # TODO replace with metamist config, once it's available 'SM_ENVIRONMENT': self.extra_sample_metadata_config['environment'], + # TODO replace with etl_load_default_config config, once it's available + 'DEFAULT_LOAD_CONFIG': json.dumps( + self.extra_sample_metadata_config['etl_load_default_config'] + ), }, ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, @@ -580,7 +586,6 @@ def _etl_function(self, f_name: str, sa: object): self._svc_functions, self._svc_build, sa, - self.etl_slack_notification_topic, ] ), ) diff --git a/metamist_infrastructure/setup.py b/metamist_infrastructure/setup.py index 3a231821c..ecb1bb892 100644 --- a/metamist_infrastructure/setup.py +++ b/metamist_infrastructure/setup.py @@ -19,7 +19,7 @@ description='Metamist infrastructure plugin for cpg-infrastructure', long_description=readme, long_description_content_type='text/markdown', - url=f'https://github.com/populationgenomics/sample-metadata', + url='https://github.com/populationgenomics/sample-metadata', license='MIT', packages=[ 'metamist_infrastructure', diff --git a/metamist_infrastructure/slack_notification.py b/metamist_infrastructure/slack_notification.py index 56cd646cb..cff5bfacb 100644 --- a/metamist_infrastructure/slack_notification.py +++ b/metamist_infrastructure/slack_notification.py @@ -174,7 +174,7 @@ def notification_cloudfun(self): name=f'{self.topic_name}-func', build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs( runtime='python311', - entry_point=f'etl_notify', + entry_point='etl_notify', environment_variables={}, source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( @@ -263,13 +263,13 @@ def notification_pubsub_push_subscription(self): }, ), project=self.config.project_name, - opts=pulumi.ResourceOptions( - depends_on=[ - self.notification_pubsub_topic, - self.notification_cloudfun, - self.notification_dead_letters_pubsub_topic, - ] - ), + # opts=pulumi.ResourceOptions( + # depends_on=[ + # self.notification_pubsub_topic, + # self.notification_cloudfun, + # self.notification_dead_letters_pubsub_topic, + # ] + # ), ) # give subscriber permission to service account @@ -330,9 +330,9 @@ def notification_dead_letters_pubsub_subscription(self): topic=self.notification_dead_letters_pubsub_topic.name, project=self.config.project_name, ack_deadline_seconds=20, - opts=pulumi.ResourceOptions( - depends_on=[self.notification_dead_letters_pubsub_topic] - ), + # opts=pulumi.ResourceOptions( + # depends_on=[self.notification_dead_letters_pubsub_topic] + # ), ) def setup_notification(self):