diff --git a/etl/load/main.py b/etl/load/main.py index 1e94099b1..037b59f42 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -4,7 +4,7 @@ import json import logging import os -from typing import Any, Dict +from typing import Any, Dict, List import flask import functions_framework @@ -21,12 +21,12 @@ DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}') -def call_parser(parser_obj, row_json): +def call_parser(parser_obj, row_json) -> tuple[str, str]: """ This function calls parser_obj.from_json and returns status and result """ - tmp_res = [] - tmp_status = [] + tmp_res: List[str] = [] + tmp_status: List[str] = [] # GenericMetadataParser from_json is async # we call it from sync, so we need to wrap it in coroutine @@ -222,7 +222,7 @@ def etl_load(request: flask.Request): }, 500 -def extract_request_id(jbody: Dict[str, Any]) -> str | None: +def extract_request_id(jbody: Dict[str, Any]) -> tuple[str | None, str | None]: """Unwrapp request id from the payload Args: @@ -263,7 +263,7 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None: def get_parser_instance( parser_map: dict, sample_type: str | None, init_params: dict | None -) -> object | None: +) -> tuple[object | None, str | None]: """Extract parser name from sample_type Args: diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index b569db5b3..5ef26fc73 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -1,4 +1,3 @@ -# pylint: disable=missing-function-docstring,import-error """ Make metamist architecture available to production pulumi stack so it can be centrally deployed. Do this through a plugin, and submodule. @@ -343,7 +342,7 @@ def etl_bigquery_dataset(self): ), ) - def _setup_bq_table(self, schema_file_name: str, table_name: str): + def _setup_bq_table(self, schema_file_name: Path, table_name: str): """Setup Bigquery table""" with open(schema_file_name) as f: schema = f.read() @@ -498,7 +497,7 @@ def etl_load_function(self): """etl_load_function""" return self._etl_function('load', self.etl_load_service_account) - def _etl_function(self, f_name: str, sa: object): + def _etl_function(self, f_name: str, sa: gcp.serviceaccount.Account): """ Driver function to setup the etl cloud function """ @@ -511,8 +510,8 @@ def _etl_function(self, f_name: str, sa: object): str(path_to_func_folder.absolute()), allowed_extensions=frozenset({'.gz', '.py', '.txt', '.json'}), # TODO replace with metamist config, once it's available - private_repo_url=self.extra_sample_metadata_config['private_repo_url'], - private_repos=self.extra_sample_metadata_config['private_repos'], + private_repo_url=str(self.extra_sample_metadata_config['private_repo_url']), + private_repos=str(self.extra_sample_metadata_config['private_repos']), ) # Create the single Cloud Storage object, diff --git a/metamist_infrastructure/slack_notification.py b/metamist_infrastructure/slack_notification.py index cff5bfacb..85f7d0e6e 100644 --- a/metamist_infrastructure/slack_notification.py +++ b/metamist_infrastructure/slack_notification.py @@ -1,4 +1,4 @@ -# pylint: disable=missing-function-docstring,import-error +# pylint: disable=missing-function-docstring,import-error,no-member """ Make metamist architecture available to production pulumi stack so it can be centrally deployed. Do this through a plugin, and submodule. @@ -56,8 +56,8 @@ def __init__( self, project_name: str, location: str, # e.g. self.config.gcp.region - service_account: object, - source_bucket: object, + service_account: gcp.serviceaccount.Account, + source_bucket: gcp.storage.Bucket, slack_secret_project_id: str, slack_token_secret_name: str, slack_channel_name: str,