diff --git a/.gitignore b/.gitignore index f15be313..a6a295f3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__ db.sqlite3 media/* media +.envrc docker/dev-local diff --git a/src/ingest-pipeline/airflow/dags/error_catching/failure_callback.py b/src/ingest-pipeline/airflow/dags/error_catching/failure_callback.py deleted file mode 100644 index e469dcb5..00000000 --- a/src/ingest-pipeline/airflow/dags/error_catching/failure_callback.py +++ /dev/null @@ -1,164 +0,0 @@ -import json -from pprint import pprint -from requests.exceptions import HTTPError -from requests import codes -import traceback - -from airflow.providers.http.hooks.http import HttpHook -from airflow.utils.email import send_email - -from utils import get_auth_tok - - -class FailureCallbackException(Exception): - pass - - -class FailureCallback: - """ - List should be overwritten by each subclass with appropriate values. - """ - external_exceptions = [] - # TODO: Switch to curator email(s) - internal_email_recipients = ["gesina@psc.edu"] - - def __init__(self, context, execute_methods=True): - self.context = context - self.uuid = self.context.get("task_instance").xcom_pull(key="uuid") - self.auth_tok = get_auth_tok(**context) - self.dag_run = self.context.get("dag_run") - self.task = self.context.get("task") - self.exception = self.context.get("exception") - self.exception_name = type(self.exception).__name__ - - if execute_methods: - self.set_status() - self.send_notifications() - - def send_notifications(self): - """ - Subclasses should override the send_notifications method - in order to add self.send_asana_notification() or other - custom notifications. - """ - self.send_failure_email() - - # This is simplified from pythonop_get_dataset_state in utils - def get_submission_context(self): - method = "GET" - headers = { - "authorization": f"Bearer {self.auth_tok}", - "content-type": "application/json", - "X-Hubmap-Application": "ingest-pipeline", - } - http_hook = HttpHook(method, http_conn_id="entity_api_connection") - - endpoint = f"entities/{self.uuid}" - - try: - response = http_hook.run( - endpoint, headers=headers, extra_options={"check_response": False} - ) - response.raise_for_status() - submission_data = response.json() - return submission_data - except HTTPError as e: - print(f"ERROR: {e}") - if e.response.status_code == codes.unauthorized: - raise RuntimeError("entity database authorization was rejected?") - else: - print("benign error") - return {} - - def get_status_and_message(self): - """ - Error message might need to be overwritten when - subclassed for various DAGs. - """ - return { - "status": "Invalid", - "validation_message": f"Process {self.dag_run.dag_id} started {self.dag_run.execution_date} failed at task {self.task.task_id} with error {self.exception_name} {self.exception}", - } - - def set_status(self): - """ - The failure callback needs to set the dataset status, - otherwise it will remain in the "Processing" state - """ - data = self.get_status_and_message() - endpoint = f"/entities/{self.uuid}" - headers = { - "authorization": "Bearer " + self.auth_tok, - "X-Hubmap-Application": "ingest-pipeline", - "content-type": "application/json", - } - extra_options = [] - http_conn_id = "entity_api_connection" - http_hook = HttpHook("PUT", http_conn_id=http_conn_id) - print("data: ") - pprint(data) - response = http_hook.run( - endpoint, - json.dumps(data), - headers, - extra_options, - ) - - def get_failure_email_template( - self, formatted_exception=None, external_template=False, submission_data=None, **kwargs - ): - """ - Generic template, can be overridden or super() called - in subclass. - """ - subject = f"DAG {self.dag_run.dag_id} failed at task {self.task.task_id}" - if formatted_exception: - msg = f""" - DAG run: {self.dag_run.id} {self.dag_run.dag_id}
- Task: {self.task.task_id}
- Execution date: {self.dag_run.execution_date}
- Run id: {self.dag_run.run_id}
- Error: {self.exception_name}
- Traceback: {formatted_exception} - """ - - else: - msg = f""" - DAG run: {self.dag_run.id} {self.dag_run.dag_id}
- Task: {self.task.task_id}
- Execution date: {self.dag_run.execution_date}
- Run id: {self.dag_run.run_id}
- Error: {self.exception_name}
- """ - return subject, msg - - def send_failure_email(self, **kwargs): - # traceback logic borrowed from https://stackoverflow.com/questions/51822029/get-exception-details-on-airflow-on-failure-callback-context - try: - formatted_exception = "".join( - traceback.TracebackException.from_exception(self.exception).format() - ).replace("\n", "
") - except: - formatted_exception = None - self.submission_data = self.get_submission_context() - subject, msg = self.get_failure_email_template( - formatted_exception=formatted_exception, submission_data=self.submission_data, **kwargs - ) - send_email(to=self.internal_email_recipients, subject=subject, html_content=msg) - if self.exception_name in self.external_exceptions: - try: - created_by_user_email = self.submission_data.get("created_by_user_email") - subject, msg = self.get_failure_email_template( - formatted_exception=formatted_exception, - external_template=True, - submission_data=self.submission_data, - **kwargs, - ) - send_email(to=[created_by_user_email], subject=subject, html_content=msg) - except: - raise FailureCallbackException( - "Failure retrieving created_by_user_email or sending email." - ) - - def send_asana_notification(self, **kwargs): - pass diff --git a/src/ingest-pipeline/airflow/dags/error_catching/validate_upload_failure_callback.py b/src/ingest-pipeline/airflow/dags/error_catching/validate_upload_failure_callback.py deleted file mode 100644 index dc8cae5d..00000000 --- a/src/ingest-pipeline/airflow/dags/error_catching/validate_upload_failure_callback.py +++ /dev/null @@ -1,54 +0,0 @@ -from airflow.utils.email import send_email - -from .failure_callback import FailureCallback, FailureCallbackException - - -class ValidateUploadFailure(FailureCallback): - # Should probably be importing custom exceptions rather than comparing strings - external_exceptions = [ - "ValueError", - "PreflightError", - "ValidatorError", - "DirectoryValidationErrors", - "FileNotFoundError", - ] - - def get_failure_email_template( - self, - formatted_exception=None, - external_template=False, - submission_data=None, - report_txt=False, - ): - if external_template: - if report_txt and submission_data: - subject = f"Your {submission_data.get('entity_type')} has failed!" - msg = f""" - Error: {report_txt} - """ - return subject, msg - else: - if report_txt and submission_data: - subject = f"{submission_data.get('entity_type')} {self.uuid} has failed!" - msg = f""" - Error: {report_txt} - """ - return subject, msg - return super().get_failure_email_template(formatted_exception) - - def send_failure_email(self, **kwargs): - super().send_failure_email(**kwargs) - if "report_txt" in kwargs: - try: - created_by_user_email = self.submission_data.get("created_by_user_email") - subject, msg = self.get_failure_email_template( - formatted_exception=None, - external_template=True, - submission_data=self.submission_data, - **kwargs, - ) - send_email(to=[created_by_user_email], subject=subject, html_content=msg) - except: - raise FailureCallbackException( - "Failure retrieving created_by_user_email or sending email in ValidateUploadFailure." - ) diff --git a/src/ingest-pipeline/airflow/dags/export_and_backup/plugins/dataset_published.py b/src/ingest-pipeline/airflow/dags/export_and_backup/plugins/dataset_published.py index 14ee43aa..6d3595c2 100644 --- a/src/ingest-pipeline/airflow/dags/export_and_backup/plugins/dataset_published.py +++ b/src/ingest-pipeline/airflow/dags/export_and_backup/plugins/dataset_published.py @@ -1,13 +1,11 @@ -from requests.exceptions import HTTPError -from requests import codes import traceback -from airflow.configuration import conf as airflow_conf -from airflow.hooks.http_hook import HttpHook - from export_and_backup.export_and_backup_plugin import ExportAndBackupPlugin, add_path +from status_change.status_utils import get_hubmap_id_from_uuid from utils import get_auth_tok +from airflow.configuration import conf as airflow_conf + with add_path(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')): from submodules import hubmapinventory @@ -18,7 +16,7 @@ class PublishedBackupPlugin(ExportAndBackupPlugin): def run_plugin(self): return "PublishedBackupPlugin ran successfully" - ## Future functionality + # Future functionality # Back-up published datasets to appropriate location (S3) # Also stage for inclusion in 6-month Glacier backup? # Not sure how datasets are updated post-publication; that is likely a separate process-- @@ -32,34 +30,8 @@ def __init__(self, **kwargs): self.kwargs = kwargs self.token = get_auth_tok(**self.kwargs) - # Should I add this to utils instead, so it can be reused? - def get_hubmap_id(self, uuid): - method = 'GET' - headers = { - 'authorization': f'Bearer {self.token}', - 'content-type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline', - } - http_hook = HttpHook(method, http_conn_id='entity_api_connection') - - endpoint = f'entities/{uuid}' - - try: - response = http_hook.run(endpoint, - headers=headers, - extra_options={'check_response': False}) - response.raise_for_status() - return response.json() - except HTTPError as e: - print(f'ERROR: {e}') - if e.response.status_code == codes.unauthorized: - raise RuntimeError('entity database authorization was rejected?') - else: - print('benign error') - return {} - def run_plugin(self): - hubmap_id = self.get_hubmap_id(self.kwargs["uuid"])["hubmap_id"] + hubmap_id = get_hubmap_id_from_uuid(self.token, self.kwargs["uuid"])["hubmap_id"] dbgap_study_id = self.kwargs.get("dbgap_study_id", None) # instance will need to be changed try: diff --git a/src/ingest-pipeline/airflow/dags/obsolete/test_scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/obsolete/test_scan_and_begin_processing.py index 375f2a23..a1e5c339 100644 --- a/src/ingest-pipeline/airflow/dags/obsolete/test_scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/obsolete/test_scan_and_begin_processing.py @@ -68,7 +68,7 @@ def test_params(*argv, **kwargs): pprint(kwargs) return 'maybe this will make it run only once' - + # TODO: this code looks potentially out of date? Did not update with StatusManager yet. def send_status_msg(**kwargs): md_extract_retcode = int(kwargs['ti'].xcom_pull(task_ids="run_md_extract")) print('md_extract_retcode: ', md_extract_retcode) diff --git a/src/ingest-pipeline/airflow/dags/status_change/failure_callback.py b/src/ingest-pipeline/airflow/dags/status_change/failure_callback.py new file mode 100644 index 00000000..d4fcdd6a --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/status_change/failure_callback.py @@ -0,0 +1,66 @@ +import logging +from pprint import pprint + +from status_change.status_manager import StatusChanger +from status_change.status_utils import formatted_exception +from utils import get_auth_tok + + +class FailureCallbackException(Exception): + pass + + +class FailureCallback: + """ + List should be overridden by each subclass with appropriate values. + """ + + def __init__(self, context): + self.context = context + self.uuid = self.context.get("task_instance").xcom_pull(key="uuid") + self.context["uuid"] = self.uuid + self.auth_tok = get_auth_tok(**context) + self.dag_run = self.context.get("dag_run") + self.task = self.context.get("task") + exception = self.context.get("exception") + self.formatted_exception = formatted_exception(exception) + + self.pre_set_status() + + def get_extra_fields(self): + """ + Error message might need to be overwritten when + subclassed for various DAGs. + 'Error' is the default for FailureCallback, which indicates a pipeline has failed. + """ + return { + "validation_message": f""" + Process {self.dag_run.dag_id} started {self.dag_run.execution_date} + failed at task {self.task.task_id}. + {f'Error: {self.formatted_exception}' if self.formatted_exception else ""} + """, + } + + def pre_set_status(self): + # Allows for alterations to props, before calling StatusChanger + # This was added to support some email functions and is perhaps + # at the moment over-engineered. + self.set_status() + + def set_status(self): + """ + The failure callback needs to set the dataset status, + otherwise it will remain in the "Processing" state + """ + data = self.get_extra_fields() + logging.info("data: ") + logging.info(pprint(data)) + StatusChanger( + self.uuid, + self.auth_tok, + "error", + { + "extra_fields": self.get_extra_fields(), + "extra_options": {}, + }, + ).on_status_change() diff --git a/src/ingest-pipeline/airflow/dags/status_change/status_manager.py b/src/ingest-pipeline/airflow/dags/status_change/status_manager.py new file mode 100644 index 00000000..805fa78d --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/status_change/status_manager.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import json +import logging +from enum import Enum +from functools import cached_property +from typing import Any, Dict, TypedDict, Union + +from airflow.providers.http.hooks.http import HttpHook + +from .status_utils import get_submission_context + + +class Statuses(str, Enum): + # Dataset Hold and Deprecated are not currently in use but are valid for Entity API + DATASET_DEPRECATED = "deprecated" + DATASET_ERROR = "error" + DATASET_HOLD = "hold" + DATASET_INVALID = "invalid" + DATASET_NEW = "new" + DATASET_PROCESSING = "processing" + DATASET_PUBLISHED = "published" + DATASET_QA = "qa" + PUBLICATION_ERROR = "error" + PUBLICATION_HOLD = "hold" + PUBLICATION_INVALID = "invalid" + PUBLICATION_NEW = "new" + PUBLICATION_PROCESSING = "processing" + PUBLICATION_PUBLISHED = "published" + PUBLICATION_QA = "qa" + PUBLICATION_SUBMITTED = "submitted" + UPLOAD_ERROR = "error" + UPLOAD_INVALID = "invalid" + UPLOAD_NEW = "new" + UPLOAD_PROCESSING = "processing" + UPLOAD_REORGANIZED = "reorganized" + UPLOAD_SUBMITTED = "submitted" + UPLOAD_VALID = "valid" + + +# Needed some way to disambiguate statuses shared by datasets and uploads +ENTITY_STATUS_MAP = { + "dataset": { + "deprecated": Statuses.DATASET_DEPRECATED, + "error": Statuses.DATASET_ERROR, + "hold": Statuses.DATASET_HOLD, + "invalid": Statuses.DATASET_INVALID, + "new": Statuses.DATASET_NEW, + "processing": Statuses.DATASET_PROCESSING, + "published": Statuses.DATASET_PUBLISHED, + "qa": Statuses.DATASET_QA, + }, + "publication": { + "error": Statuses.PUBLICATION_ERROR, + "hold": Statuses.PUBLICATION_HOLD, + "invalid": Statuses.PUBLICATION_INVALID, + "new": Statuses.PUBLICATION_NEW, + "processing": Statuses.PUBLICATION_PROCESSING, + "published": Statuses.PUBLICATION_PUBLISHED, + "qa": Statuses.PUBLICATION_QA, + "submitted": Statuses.PUBLICATION_SUBMITTED, + }, + "upload": { + "error": Statuses.UPLOAD_ERROR, + "invalid": Statuses.UPLOAD_INVALID, + "new": Statuses.UPLOAD_NEW, + "processing": Statuses.UPLOAD_PROCESSING, + "reorganized": Statuses.UPLOAD_REORGANIZED, + "submitted": Statuses.UPLOAD_SUBMITTED, + "valid": Statuses.UPLOAD_VALID, + }, +} + + +class StatusChangerExtras(TypedDict): + extra_fields: dict[str, Any] + extra_options: dict[str, Any] + + +class StatusChangerException(Exception): + pass + + +""" +Example usage, simple path (e.g. status string, no validation message): + from status_manager import StatusChanger + StatusChanger( + "uuid_string", + "token_string", + "status", + ).on_status_change() + +Example usage, optional params path: + from status_manager import StatusChanger, Statuses + StatusChanger( + "uuid_string", + "token_string", + Statuses.STATUS_ENUM or "status", + # optional { + "extra_fields": {}, + "extra_options": {}, + }, + #optional entity_type="Dataset"|"Upload"|"Publication" + #optional http_conn_id="entity_api_connection" + ).on_status_change() +""" + + +class StatusChanger: + def __init__( + self, + uuid: str, + token: str, + # NOTE: status is currently required; should it be possible + # to add extra info without updating status? + status: Statuses | str, + extras: StatusChangerExtras | None = None, + entity_type: str | None = None, + http_conn_id: str = "entity_api_connection", + verbose: bool = True, + ): + self.uuid = uuid + self.token = token + self.http_conn_id = http_conn_id + self.verbose = verbose + self.status = ( + self.check_status(status) + if isinstance(status, Statuses) + else self.get_status(status, entity_type) + ) + self.extras = ( + extras + if extras + else { + "extra_fields": {}, + "extra_options": {}, + } + ) + + def get_status(self, status: str, entity_type: str | None) -> Union[Statuses, None]: + """ + If status is passed as a string, get the entity type and match + to correct entry in ENTITY_STATUS_MAP. Also check current status, + because ingest-pipeline will error if you try to set the same status + over the existing status. + Potential TODO: could stop any operation involving "Published" + statuses at this stage. + """ + if entity_type is None: + try: + entity_type = self.entity_data["entity_type"] + assert entity_type is not None + except Exception as e: + raise StatusChangerException( + f""" + Could not find entity type for {self.uuid}. + Error {e} + """ + ) + try: + entity_status = ENTITY_STATUS_MAP[entity_type.lower()][status.lower()] + except KeyError: + raise StatusChangerException( + f""" + Could not retrieve status for {self.uuid}. + Check that status is valid for entity type. + Status not changed. + """ + ) + return self.check_status(entity_status) + + @cached_property + def entity_data(self): + return get_submission_context(self.token, self.uuid) + + def check_status(self, status: Statuses) -> Union[Statuses, None]: + if status == self.entity_data["status"].lower(): + return None + return status + + def format_status_data(self) -> Dict[str, str | Dict]: + data = {} + if self.status: + data["status"] = self.status + # Double-check that you're not accidentally overwriting status + if (extra_status := self.extras.get("status")) is not None and isinstance( + extra_status, str + ): + assert ( + extra_status.lower() == self.status + ), f"Entity {self.uuid} passed multiple statuses ({self.status} and {extra_status})." + data.update(self.extras["extra_fields"]) + logging.info(f"COMPILED DATA: {data}") + return data + + def set_entity_api_status(self) -> Dict: + endpoint = f"/entities/{self.uuid}" + headers = { + "authorization": "Bearer " + self.token, + "X-Hubmap-Application": "ingest-pipeline", + "content-type": "application/json", + } + http_hook = HttpHook("PUT", http_conn_id=self.http_conn_id) + data = self.format_status_data() + if self.extras["extra_options"].get("check_response") is None: + self.extras["extra_options"].update({"check_response": True}) + logging.info( + f""" + data: + {data} + """ + ) + try: + if self.verbose: + logging.info(f"Updating {self.uuid} with data {data}...") + response = http_hook.run( + endpoint, json.dumps(data), headers, self.extras["extra_options"] + ) + return response.json() + except Exception as e: + raise StatusChangerException( + f""" + Encountered error with request to change status/fields + for {self.uuid}, status not set. + Error: {e} + """ + ) + + def update_asana(self) -> None: + # Separating logic for updating Asana into a separate PR + # UpdateAsana(self.uuid, self.token, self.status).update_process_stage() + pass + + def send_email(self) -> None: + # This is underdeveloped and also requires a separate PR + pass + + status_map = {} + """ + Default behavior is to call both set_entity_api_status and update_asana. + Add any statuses to map that require a different process. + Example: + { + # "Statuses.UPLOAD_INVALID": [set_entity_api_status, update_asana, send_email], + # "Statuses.DATASET_INVALID": [set_entity_api_status, update_asana, send_email], + # "Statuses.DATASET_PROCESSING": [set_entity_api_status], + } + """ + + def on_status_change(self) -> None: + if self.status in self.status_map: + for func in self.status_map[self.status]: + func(self) + else: + self.set_entity_api_status() + self.send_email() + self.update_asana() diff --git a/src/ingest-pipeline/airflow/dags/status_change/status_utils.py b/src/ingest-pipeline/airflow/dags/status_change/status_utils.py new file mode 100644 index 00000000..f7ca6822 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/status_change/status_utils.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import traceback +from typing import Any, Dict + +from requests import codes +from requests.exceptions import HTTPError + +from airflow.hooks.http_hook import HttpHook + + +# This is simplified from pythonop_get_dataset_state in utils +def get_submission_context(token: str, uuid: str) -> Dict[str, Any]: + """ + uuid can also be a HuBMAP ID. + """ + method = "GET" + headers = { + "authorization": f"Bearer {token}", + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + } + http_hook = HttpHook(method, http_conn_id="entity_api_connection") + + endpoint = f"entities/{uuid}" + + try: + response = http_hook.run( + endpoint, headers=headers, extra_options={"check_response": False} + ) + response.raise_for_status() + return response.json() + except HTTPError as e: + print(f"ERROR: {e}") + if e.response.status_code == codes.unauthorized: + raise RuntimeError("entity database authorization was rejected?") + else: + print("benign error") + return {} + + +def get_hubmap_id_from_uuid(token: str, uuid: str) -> str | None: + method = "GET" + headers = { + "authorization": f"Bearer {token}", + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + } + http_hook = HttpHook(method, http_conn_id="entity_api_connection") + + endpoint = f"entities/{uuid}" + + try: + response = http_hook.run( + endpoint, headers=headers, extra_options={"check_response": False} + ) + response.raise_for_status() + return response.json().get("hubmap_id") + except HTTPError as e: + print(f"ERROR: {e}") + if e.response.status_code == codes.unauthorized: + raise RuntimeError("entity database authorization was rejected?") + else: + print("benign error") + return None + + +def formatted_exception(exception): + """ + traceback logic from + https://stackoverflow.com/questions/51822029/get-exception-details-on-airflow-on-failure-callback-context + """ + if not ( + formatted_exception := "".join( + traceback.TracebackException.from_exception(exception).format() + ).replace("\n", "
") + ): + return None + return formatted_exception diff --git a/src/ingest-pipeline/airflow/dags/status_change/tests/test_status_changer.py b/src/ingest-pipeline/airflow/dags/status_change/tests/test_status_changer.py new file mode 100644 index 00000000..e938246c --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/status_change/tests/test_status_changer.py @@ -0,0 +1,183 @@ +import unittest +from functools import cached_property +from unittest.mock import patch + +from status_manager import StatusChanger, StatusChangerException, Statuses +from utils import pythonop_set_dataset_state + + +class TestStatusChanger(unittest.TestCase): + @cached_property + @patch("status_manager.HttpHook.run") + def upload_valid(self, hhr_mock): + return StatusChanger( + "upload_valid_uuid", + "upload_valid_token", + "Valid", + { + "extra_fields": {}, + "extra_options": {}, + }, + entity_type="Upload", + ) + + @patch("status_manager.HttpHook.run") + def test_unrecognized_status(self, hhr_mock): + with self.assertRaises(StatusChangerException): + StatusChanger( + "invalid_status_uuid", + "invalid_status_token", + "invalid_status_string", + { + "extra_fields": {}, + "extra_options": {}, + }, + entity_type="Upload", + ) + + def test_recognized_status(self): + data = self.upload_valid.format_status_data() + self.assertEqual(data["status"], self.upload_valid.status) + + @patch("status_manager.HttpHook.run") + def test_extra_fields(self, hhr_mock): + with_extra_field = StatusChanger( + "extra_field_uuid", + "extra_field_token", + Statuses.UPLOAD_PROCESSING, + { + "extra_fields": {"test_extra_field": True}, + "extra_options": {}, + }, + ) + data = with_extra_field.format_status_data() + self.assertIn("test_extra_field", data) + self.assertEqual(data["test_extra_field"], True) + + @patch("status_manager.HttpHook.run") + def test_extra_options(self, hhr_mock): + with_extra_option = StatusChanger( + "extra_options_uuid", + "extra_options_token", + Statuses.UPLOAD_VALID, + {"extra_fields": {}, "extra_options": {"check_response": False}}, + verbose=False, + ) + with_extra_option.set_entity_api_status() + self.assertIn({"check_response": False}, hhr_mock.call_args.args) + without_extra_option = StatusChanger( + "extra_options_uuid", + "extra_options_token", + Statuses.UPLOAD_VALID, + {"extra_fields": {}, "extra_options": {}}, + verbose=False, + ) + without_extra_option.set_entity_api_status() + self.assertIn({"check_response": True}, hhr_mock.call_args.args) + + @patch("status_manager.HttpHook.run") + def test_extra_options_and_fields(self, hhr_mock): + with_extra_option_and_field = StatusChanger( + "extra_options_uuid", + "extra_options_token", + Statuses.UPLOAD_VALID, + { + "extra_fields": {"test_extra_field": True}, + "extra_options": {"check_response": False}, + }, + verbose=False, + ) + with_extra_option_and_field.set_entity_api_status() + self.assertIn({"check_response": False}, hhr_mock.call_args.args) + self.assertIn('{"status": "valid", "test_extra_field": true}', hhr_mock.call_args.args) + + @patch("status_manager.HttpHook.run") + def test_valid_status_in_request(self, hhr_mock): + self.upload_valid.set_entity_api_status() + self.assertIn('{"status": "valid"}', hhr_mock.call_args.args) + + @patch("status_manager.HttpHook.run") + def test_http_conn_id(self, hhr_mock): + with_http_conn_id = StatusChanger( + "http_conn_uuid", + "http_conn_token", + Statuses.DATASET_NEW, + { + "extra_fields": {}, + "extra_options": {}, + }, + http_conn_id="test_conn_id", + ) + assert with_http_conn_id.http_conn_id == "test_conn_id" + + @patch("status_manager.HttpHook.run") + @patch("status_manager.StatusChanger.send_email") + def test_status_map(self, test_send_email, hhr_mock): + self.assertFalse(test_send_email.called) + self.upload_valid.status_map = {Statuses.UPLOAD_VALID: [self.upload_valid.send_email]} + self.upload_valid.on_status_change() + self.assertTrue(test_send_email.called) + + @staticmethod + def my_callable(**kwargs): + return kwargs["uuid"] + + @patch("utils.StatusChanger") + @patch("utils.get_auth_tok") + def test_pythonop_set_dataset_state_valid(self, gat_mock, sc_mock): + uuid = "test_uuid" + token = "test_token" + gat_mock.return_value = token + message = "Test message" + # Not passing a ds_state kwarg sets status to Processing + pythonop_set_dataset_state( + crypt_auth_tok=token, + dataset_uuid_callable=self.my_callable, + uuid=uuid, + message=message, + ) + sc_mock.assert_called_with( + uuid, + token, + "Processing", + { + "extra_fields": {"pipeline_message": message}, + "extra_options": {}, + }, + http_conn_id="entity_api_connection", + ) + # Pass a valid ds_state and assert it was passed properly + pythonop_set_dataset_state( + crypt_auth_tok=token, + dataset_uuid_callable=self.my_callable, + uuid=uuid, + message=message, + ds_state="QA", + ) + sc_mock.assert_called_with( + uuid, + token, + "QA", + { + "extra_fields": {"pipeline_message": message}, + "extra_options": {}, + }, + http_conn_id="entity_api_connection", + ) + + @patch("status_manager.HttpHook.run") + @patch("utils.get_auth_tok") + def test_pythonop_set_dataset_state_invalid(self, gat_mock, hhr_mock): + uuid = "test_uuid" + token = "test_token" + gat_mock.return_value = token + message = "Test message" + # Pass an invalid ds_state + with self.assertRaises(Exception): + pythonop_set_dataset_state( + crypt_auth_tok=token, + dataset_uuid_callable=self.my_callable, + uuid=uuid, + message=message, + ds_state="Unknown", + ) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 1f3bc83e..d7230b74 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1,137 +1,122 @@ +import json import math import os +import re +import shlex +import sys import urllib.parse +import uuid from abc import ABC, abstractmethod from collections import namedtuple +from copy import deepcopy from functools import lru_cache -import json from os import environ, fspath, walk -from os.path import ( - basename, dirname, relpath, split, join, getsize, - realpath, exists -) +from os.path import basename, dirname, exists, getsize, join, realpath, relpath, split from pathlib import Path from pprint import pprint -import re -import shlex -import sys -import uuid -from subprocess import check_output, CalledProcessError +from subprocess import CalledProcessError, check_output from typing import ( - Any, Callable, Dict, Iterable, List, Mapping, Optional, - Pattern, Tuple, TypeVar, Union + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Pattern, + Tuple, + TypeVar, + Union, ) -from requests.exceptions import HTTPError -from requests import codes -from copy import deepcopy +import cwltool # used to find its path import yaml -from airflow import DAG -from airflow.models.baseoperator import BaseOperator -from airflow.configuration import conf as airflow_conf -from airflow.hooks.http_hook import HttpHook from cryptography.fernet import Fernet - -from hubmap_commons.schema_tools import ( - assert_json_matches_schema, - set_schema_base_path -) +from hubmap_commons.schema_tools import assert_json_matches_schema, set_schema_base_path from hubmap_commons.type_client import TypeClient +from requests import codes +from requests.exceptions import HTTPError +from status_change.status_manager import StatusChanger -import cwltool # used to find its path - +from airflow import DAG +from airflow.configuration import conf as airflow_conf +from airflow.hooks.http_hook import HttpHook +from airflow.models.baseoperator import BaseOperator -airflow_conf.read(join(environ['AIRFLOW_HOME'], 'instance', 'app.cfg')) +airflow_conf.read(join(environ["AIRFLOW_HOME"], "instance", "app.cfg")) try: - sys.path.append(airflow_conf.as_dict()['connections']['SRC_PATH'] - .strip("'").strip('"')) + sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')) from misc.tools.survey import ENDPOINTS + sys.path.pop() -except KeyError: +except Exception: ENDPOINTS = {} JSONType = Union[str, int, float, bool, None, Dict[str, Any], List[Any]] # Some functions accept a `str` or `List[str]` and return that same type -StrOrListStr = TypeVar('StrOrListStr', str, List[str]) +StrOrListStr = TypeVar("StrOrListStr", str, List[str]) PathStrOrList = Union[str, Path, Iterable[Union[str, Path]]] -SCHEMA_BASE_PATH = join(dirname(dirname(dirname(realpath(__file__)))), - 'schemata') -SCHEMA_BASE_URI = 'http://schemata.hubmapconsortium.org/' +SCHEMA_BASE_PATH = join(dirname(dirname(dirname(realpath(__file__)))), "schemata") +SCHEMA_BASE_URI = "http://schemata.hubmapconsortium.org/" # Some constants -PIPELINE_BASE_DIR = Path(__file__).resolve().parent / 'cwl' +PIPELINE_BASE_DIR = Path(__file__).resolve().parent / "cwl" -RE_ID_WITH_SLICES = re.compile(r'([a-zA-Z0-9\-]*)-(\d*)_(\d*)') +RE_ID_WITH_SLICES = re.compile(r"([a-zA-Z0-9\-]*)-(\d*)_(\d*)") -RE_GIT_URL_PATTERN = re.compile(r'(^git@github.com:)(.*)(\.git)') +RE_GIT_URL_PATTERN = re.compile(r"(^git@github.com:)(.*)(\.git)") # default maximum for number of files for which info should be returned in_line # rather than via an alternative scratch file MAX_IN_LINE_FILES = 500 -GIT = 'git' +GIT = "git" GIT_CLONE_COMMAND = [ GIT, - 'clone', - '{repository}', + "clone", + "{repository}", ] GIT_FETCH_COMMAND = [ GIT, - 'fetch', + "fetch", ] GIT_CHECKOUT_COMMAND = [ GIT, - 'checkout', - '{ref}', -] -GIT_LOG_COMMAND = [ - GIT, - 'log', - '-n1', - '--oneline' -] -GIT_ORIGIN_COMMAND = [ - GIT, - 'config', - '--get', - 'remote.origin.url' -] -GIT_ROOT_COMMAND = [ - GIT, - 'rev-parse', - '--show-toplevel' + "checkout", + "{ref}", ] -SHA1SUM_COMMAND = [ - 'sha1sum', - '{fname}' +GIT_LOG_COMMAND = [GIT, "log", "-n1", "--oneline"] +GIT_ORIGIN_COMMAND = [GIT, "config", "--get", "remote.origin.url"] +GIT_ROOT_COMMAND = [GIT, "rev-parse", "--show-toplevel"] +SHA1SUM_COMMAND = ["sha1sum", "{fname}"] +FILE_TYPE_MATCHERS = [ + (r"^.*\.csv$", "csv"), # format is (regex, type) + (r"^.*\.hdf5$", "hdf5"), + (r"^.*\.h5ad$", "h5ad"), + (r"^.*\.pdf$", "pdf"), + (r"^.*\.json$", "json"), + (r"^.*\.arrow$", "arrow"), + (r"(^.*\.fastq$)|(^.*\.fastq.gz$)", "fastq"), + (r"(^.*\.yml$)|(^.*\.yaml$)", "yaml"), ] -FILE_TYPE_MATCHERS = [(r'^.*\.csv$', 'csv'), # format is (regex, type) - (r'^.*\.hdf5$', 'hdf5'), - (r'^.*\.h5ad$', 'h5ad'), - (r'^.*\.pdf$', 'pdf'), - (r'^.*\.json$', 'json'), - (r'^.*\.arrow$', 'arrow'), - (r'(^.*\.fastq$)|(^.*\.fastq.gz$)', 'fastq'), - (r'(^.*\.yml$)|(^.*\.yaml$)', 'yaml') - ] COMPILED_TYPE_MATCHERS: Optional[List[Tuple[Pattern, str]]] = None """ Lazy construction: a list of tuples (collection_type_regex, assay_type_regex, workflow) """ -WORKFLOW_MAP_FILENAME = 'workflow_map.yml' # Expected to be found in this same dir -WORKFLOW_MAP_SCHEMA = 'workflow_map_schema.yml' +WORKFLOW_MAP_FILENAME = "workflow_map.yml" # Expected to be found in this same dir +WORKFLOW_MAP_SCHEMA = "workflow_map_schema.yml" COMPILED_WORKFLOW_MAP: Optional[List[Tuple[Pattern, Pattern, str]]] = None """ Lazy construction; a list of tuples (dag_id_reges, task_id_regex, {key:value}) """ -RESOURCE_MAP_FILENAME = 'resource_map.yml' # Expected to be found in this same dir -RESOURCE_MAP_SCHEMA = 'resource_map_schema.yml' +RESOURCE_MAP_FILENAME = "resource_map.yml" # Expected to be found in this same dir +RESOURCE_MAP_SCHEMA = "resource_map_schema.yml" COMPILED_RESOURCE_MAP: Optional[List[Tuple[Pattern, int, Dict[str, Any]]]] = None TYPE_CLIENT: Optional[TypeClient] = None @@ -139,12 +124,12 @@ # Parameters used to generate scRNA and scATAC analysis DAGs; these # are the only fields which differ between assays and DAGs SequencingDagParameters = namedtuple( - 'SequencingDagParameters', + "SequencingDagParameters", [ - 'dag_id', - 'pipeline_name', - 'assay', - 'dataset_type', + "dag_id", + "pipeline_name", + "assay", + "dataset_type", ], ) @@ -177,16 +162,20 @@ def __init__(self): self.matchers = [] @classmethod - def read_manifest(cls, pipeline_file_manifest: Path) -> Iterable[Tuple[Pattern, str, str, bool, bool]]: + def read_manifest( + cls, pipeline_file_manifest: Path + ) -> Iterable[Tuple[Pattern, str, str, bool, bool]]: with open(pipeline_file_manifest) as f: manifest = json.load(f) - localized_assert_json_matches_schema(manifest, 'pipeline_file_manifest.yml') + localized_assert_json_matches_schema(manifest, "pipeline_file_manifest.yml") for annotation in manifest: - pattern = re.compile(annotation['pattern']) - is_qa_qc = annotation.get('is_qa_qc', False) - is_data_product = annotation.get('is_data_product', False) - yield pattern, annotation['description'], annotation['edam_ontology_term'], is_qa_qc, is_data_product + pattern = re.compile(annotation["pattern"]) + is_qa_qc = annotation.get("is_qa_qc", False) + is_data_product = annotation.get("is_data_product", False) + yield pattern, annotation["description"], annotation[ + "edam_ontology_term" + ], is_qa_qc, is_data_product @classmethod def create_from_files(cls, pipeline_file_manifests: Iterable[Path]): @@ -203,7 +192,13 @@ def get_file_metadata(self, file_path: Path) -> ManifestMatch: the "first-match" behavior is deliberate. """ path_str = fspath(file_path) - for pattern, description_template, ontology_term, is_qa_qc, is_data_product in self.matchers: + for ( + pattern, + description_template, + ontology_term, + is_qa_qc, + is_data_product, + ) in self.matchers: # TODO: walrus operator m = pattern.search(path_str) if m: @@ -217,8 +212,9 @@ class DummyFileMatcher(FileMatcher): Drop-in replacement for PipelineFileMatcher which allows everything and always provides empty descriptions and ontology terms. """ + def get_file_metadata(self, file_path: Path) -> ManifestMatch: - return True, '', '', False + return True, "", "", False class HMDAG(DAG): @@ -227,13 +223,14 @@ class HMDAG(DAG): Defaults are applied to the DAG itself, and to any Tasks added to the DAG. """ + def __init__(self, dag_id: str, **kwargs): """ Provide "max_active_runs" from the lanes resource, if it is not already present. """ - if 'max_active_runs' not in kwargs: - kwargs['max_active_runs'] = get_lanes_resource(dag_id) + if "max_active_runs" not in kwargs: + kwargs["max_active_runs"] = get_lanes_resource(dag_id) super().__init__(dag_id, **kwargs) def add_task(self, task: BaseOperator): @@ -252,7 +249,7 @@ def add_task(self, task: BaseOperator): task.queue = res_queue super().add_task(task) - + def find_pipeline_manifests(cwl_files: Iterable[Path]) -> List[Path]: """ Constructs manifest paths from CWL files (strip '.cwl', append @@ -261,7 +258,7 @@ def find_pipeline_manifests(cwl_files: Iterable[Path]) -> List[Path]: """ manifests = [] for cwl_file in cwl_files: - manifest_file = cwl_file.with_name(f'{cwl_file.stem}-manifest.json') + manifest_file = cwl_file.with_name(f"{cwl_file.stem}-manifest.json") if manifest_file.is_file(): manifests.append(manifest_file) return manifests @@ -275,10 +272,7 @@ def get_absolute_workflows(*workflows: Path) -> List[Path]: already absolute, they are returned unchanged; if relative, they are anchored to `PIPELINE_BASE_DIR` """ - return [ - PIPELINE_BASE_DIR / workflow - for workflow in workflows - ] + return [PIPELINE_BASE_DIR / workflow for workflow in workflows] def get_named_absolute_workflows(**workflow_kwargs: Path) -> Dict[str, Path]: @@ -292,19 +286,16 @@ def get_named_absolute_workflows(**workflow_kwargs: Path) -> Dict[str, Path]: if the input paths were already absolute, they are returned unchanged; if relative, they are anchored to `PIPELINE_BASE_DIR` """ - return { - name: PIPELINE_BASE_DIR / workflow - for name, workflow in workflow_kwargs.items() - } + return {name: PIPELINE_BASE_DIR / workflow for name, workflow in workflow_kwargs.items()} def build_dataset_name(dag_id: str, pipeline_str: str, **kwargs) -> str: - parent_submission_str = '_'.join(get_parent_dataset_uuids_list(**kwargs)) - return f'{dag_id}__{parent_submission_str}__{pipeline_str}' + parent_submission_str = "_".join(get_parent_dataset_uuids_list(**kwargs)) + return f"{dag_id}__{parent_submission_str}__{pipeline_str}" def get_parent_dataset_uuids_list(**kwargs) -> List[str]: - uuid_list = kwargs['dag_run'].conf['parent_submission_id'] + uuid_list = kwargs["dag_run"].conf["parent_submission_id"] if not isinstance(uuid_list, list): uuid_list = [uuid_list] return uuid_list @@ -317,7 +308,7 @@ def get_parent_dataset_uuid(**kwargs) -> str: def get_parent_dataset_paths_list(**kwargs) -> List[Path]: - path_list = kwargs['dag_run'].conf['parent_lz_path'] + path_list = kwargs["dag_run"].conf["parent_lz_path"] if not isinstance(path_list, list): path_list = [path_list] return [Path(p) for p in path_list] @@ -339,9 +330,13 @@ def get_parent_data_dirs_list(**kwargs) -> List[Path]: ctx_md_list = ctx["metadata"] if not isinstance(ctx_md_list, list): ctx_md_list = [ctx_md_list] - assert len(data_dir_list) == len(ctx_md_list), "lengths of data directory and md lists do not match" - return [Path(data_dir) / ctx_md['metadata']['data_path'] - for data_dir, ctx_md in zip(data_dir_list, ctx_md_list)] + assert len(data_dir_list) == len( + ctx_md_list + ), "lengths of data directory and md lists do not match" + return [ + Path(data_dir) / ctx_md["metadata"]["data_path"] + for data_dir, ctx_md in zip(data_dir_list, ctx_md_list) + ] def get_parent_data_dir(**kwargs) -> Path: @@ -351,12 +346,11 @@ def get_parent_data_dir(**kwargs) -> Path: def get_previous_revision_uuid(**kwargs) -> Optional[str]: - return kwargs['dag_run'].conf.get('previous_version_uuid', None) + return kwargs["dag_run"].conf.get("previous_version_uuid", None) def get_dataset_uuid(**kwargs) -> str: - return kwargs['ti'].xcom_pull(key='derived_dataset_uuid', - task_ids="send_create_dataset") + return kwargs["ti"].xcom_pull(key="derived_dataset_uuid", task_ids="send_create_dataset") def get_uuid_for_error(**kwargs) -> Optional[str]: @@ -380,18 +374,17 @@ def get_git_commits(file_list: StrOrListStr) -> StrOrListStr: else: unroll = False for fname in file_list: - log_command = [piece.format(fname=fname) - for piece in GIT_LOG_COMMAND] + log_command = [piece.format(fname=fname) for piece in GIT_LOG_COMMAND] try: dirnm = dirname(fname) - if dirnm == '': - dirnm = '.' + if dirnm == "": + dirnm = "." line = check_output(log_command, cwd=dirnm) except CalledProcessError as e: # Git will fail if this is not running from a git repo - line = 'DeadBeef git call failed: {}'.format(e.output) - line = line.encode('utf-8') - hashval = line.split()[0].strip().decode('utf-8') + line = "DeadBeef git call failed: {}".format(e.output) + line = line.encode("utf-8") + hashval = line.split()[0].strip().decode("utf-8") rslt.append(hashval) if unroll: return rslt[0] @@ -406,7 +399,7 @@ def _convert_git_to_proper_url(raw_url: str) -> str: """ m = RE_GIT_URL_PATTERN.fullmatch(raw_url) if m: - return f'https://github.com/{m[2]}' + return f"https://github.com/{m[2]}" else: return raw_url @@ -422,18 +415,17 @@ def get_git_origins(file_list: StrOrListStr) -> StrOrListStr: else: unroll = False for fname in file_list: - command = [piece.format(fname=fname) - for piece in GIT_ORIGIN_COMMAND] + command = [piece.format(fname=fname) for piece in GIT_ORIGIN_COMMAND] try: dirnm = dirname(fname) - if dirnm == '': - dirnm = '.' + if dirnm == "": + dirnm = "." line = check_output(command, cwd=dirnm) except CalledProcessError as e: # Git will fail if this is not running from a git repo - line = 'https://unknown/unknown.git git call failed: {}'.format(e.output) - line = line.encode('utf-8') - url = line.split()[0].strip().decode('utf-8') + line = "https://unknown/unknown.git git call failed: {}".format(e.output) + line = line.encode("utf-8") + url = line.split()[0].strip().decode("utf-8") url = _convert_git_to_proper_url(url) rslt.append(url) if unroll: @@ -454,17 +446,16 @@ def get_git_root_paths(file_list: Iterable[str]) -> Union[str, List[str]]: else: unroll = False for fname in file_list: - command = [piece.format(fname=fname) - for piece in GIT_ROOT_COMMAND] + command = [piece.format(fname=fname) for piece in GIT_ROOT_COMMAND] try: dirnm = dirname(fname) - if dirnm == '': - dirnm = '.' + if dirnm == "": + dirnm = "." root_path = check_output(command, cwd=dirnm) except CalledProcessError as e: - print(f'Exception {e}') - root_path = dirname(fname).encode('utf-8') - rslt.append(root_path.strip().decode('utf-8')) + print(f"Exception {e}") + root_path = dirname(fname).encode("utf-8") + rslt.append(root_path.strip().decode("utf-8")) if unroll: return rslt[0] else: @@ -474,19 +465,18 @@ def get_git_root_paths(file_list: Iterable[str]) -> Union[str, List[str]]: def get_git_provenance_dict(file_list: PathStrOrList) -> Mapping[str, str]: """ Given a list of file paths, return a list of dicts of the form: - + [{:}, ...] """ if isinstance(file_list, (str, Path)): # sadly, a str is an Iterable[str] file_list = [file_list] - return {basename(fname): get_git_commits(realpath(fname)) - for fname in file_list} + return {basename(fname): get_git_commits(realpath(fname)) for fname in file_list} def get_git_provenance_list(file_list: Iterable[str]) -> List[Mapping[str, Any]]: """ Given a list of file paths, return a list of dicts of the form: - + [{'name':, 'hash':, 'origin':},...] """ if isinstance(file_list, str): # sadly, a str is an Iterable[str] @@ -497,14 +487,16 @@ def get_git_provenance_list(file_list: Iterable[str]) -> List[Mapping[str, Any]] root_l = get_git_root_paths(file_list) rel_name_l = [relpath(name, root) for name, root in zip(name_l, root_l)] # Make sure each repo appears only once - repo_d = {origin: {'name': name, 'hash': hashed} - for origin, name, hashed in zip(origin_l, rel_name_l, hash_l)} + repo_d = { + origin: {"name": name, "hash": hashed} + for origin, name, hashed in zip(origin_l, rel_name_l, hash_l) + } rslt = [] for origin in repo_d: dct = repo_d[origin].copy() - dct['origin'] = origin - if not dct['name'].endswith('cwl'): - del dct['name'] # include explicit names for workflows only + dct["origin"] = origin + if not dct["name"].endswith("cwl"): + del dct["name"] # include explicit names for workflows only rslt.append(dct) # pprint(rslt) return rslt @@ -524,13 +516,13 @@ def _get_file_type(path: Path) -> str: # print('testing ', regex, tpnm) if regex.match(fspath(path)): return tpnm - return 'unknown' + return "unknown" def get_file_metadata(root_dir: str, matcher: FileMatcher) -> List[Mapping[str, Any]]: """ Given a root directory, return a list of the form: - + [ { 'rel_path': , @@ -543,17 +535,23 @@ def get_file_metadata(root_dir: str, matcher: FileMatcher) -> List[Mapping[str, }, ... ] - + containing an entry for every file below the given root directory: """ root_path = Path(root_dir) rslt = [] - for dirpth, dirnames, fnames in walk(root_dir): + for dirpth, _, fnames in walk(root_dir): dp = Path(dirpth) for fn in fnames: full_path = dp / fn relative_path = full_path.relative_to(root_path) - add_to_index, description, ontology_term, is_qa_qc, is_data_product = matcher.get_file_metadata(relative_path) + ( + add_to_index, + description, + ontology_term, + is_qa_qc, + is_data_product, + ) = matcher.get_file_metadata(relative_path) if add_to_index: # sha1sum disabled because of run time issues on large data collections # line = check_output([word.format(fname=full_path) @@ -561,13 +559,13 @@ def get_file_metadata(root_dir: str, matcher: FileMatcher) -> List[Mapping[str, # cs = line.split()[0].strip().decode('utf-8') rslt.append( { - 'rel_path': fspath(relative_path), - 'type': _get_file_type(full_path), - 'size': getsize(full_path), - 'description': description, - 'edam_term': ontology_term, - 'is_qa_qc': is_qa_qc, - 'is_data_product': is_data_product, + "rel_path": fspath(relative_path), + "type": _get_file_type(full_path), + "size": getsize(full_path), + "description": description, + "edam_term": ontology_term, + "is_qa_qc": is_qa_qc, + "is_data_product": is_data_product, # 'sha1sum': cs, } ) @@ -575,10 +573,10 @@ def get_file_metadata(root_dir: str, matcher: FileMatcher) -> List[Mapping[str, def get_file_metadata_dict( - root_dir: str, - alt_file_dir: str, - pipeline_file_manifests: List[Path], - max_in_line_files: int = MAX_IN_LINE_FILES, + root_dir: str, + alt_file_dir: str, + pipeline_file_manifests: List[Path], + max_in_line_files: int = MAX_IN_LINE_FILES, ) -> Mapping[str, Any]: """ This routine returns file metadata, either directly as JSON in the form @@ -592,13 +590,13 @@ def get_file_metadata_dict( matcher = PipelineFileMatcher.create_from_files(pipeline_file_manifests) file_info = get_file_metadata(root_dir, matcher) if len(file_info) > max_in_line_files: - localized_assert_json_matches_schema(file_info, 'file_info_schema.yml') - fpath = join(alt_file_dir, '{}.json'.format(uuid.uuid4())) - with open(fpath, 'w') as f: - json.dump({'files': file_info}, f) - return {'files_info_alt_path': relpath(fpath, _get_scratch_base_path())} + localized_assert_json_matches_schema(file_info, "file_info_schema.yml") + fpath = join(alt_file_dir, "{}.json".format(uuid.uuid4())) + with open(fpath, "w") as f: + json.dump({"files": file_info}, f) + return {"files_info_alt_path": relpath(fpath, _get_scratch_base_path())} else: - return {'files': file_info} + return {"files": file_info} def pythonop_trigger_target(**kwargs) -> None: @@ -606,12 +604,12 @@ def pythonop_trigger_target(**kwargs) -> None: When used as the python_callable of a PythonOperator,this just logs data provided to the running DAG. """ - ctx = kwargs['dag_run'].conf - run_id = kwargs['run_id'] - print('run_id: ', run_id) - print('dag_run.conf:') + ctx = kwargs["dag_run"].conf + run_id = kwargs["run_id"] + print("run_id: ", run_id) + print("dag_run.conf:") pprint(ctx) - print('kwargs:') + print("kwargs:") pprint(kwargs) @@ -623,13 +621,13 @@ def pythonop_maybe_keep(**kwargs) -> str: 'test_op': the operator providing the success code 'test_key': xcom key to test. Defaults to None for return code """ - bail_op = kwargs['bail_op'] if 'bail_op' in kwargs else 'no_keep' - test_op = kwargs['test_op'] - test_key = kwargs['test_key'] if 'test_key' in kwargs else None - retcode = int(kwargs['ti'].xcom_pull(task_ids=test_op, key=test_key)) - print('%s key %s: %s\n' % (test_op, test_key, retcode)) + bail_op = kwargs["bail_op"] if "bail_op" in kwargs else "no_keep" + test_op = kwargs["test_op"] + test_key = kwargs["test_key"] if "test_key" in kwargs else None + retcode = int(kwargs["ti"].xcom_pull(task_ids=test_op, key=test_key)) + print("%s key %s: %s\n" % (test_op, test_key, retcode)) if retcode == 0: - return kwargs['next_op'] + return kwargs["next_op"] else: return bail_op @@ -639,17 +637,21 @@ def get_auth_tok(**kwargs) -> str: Recover the authorization token from the environment, and decrpyt it. """ - crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs - else kwargs['dag_run'].conf['crypt_auth_tok']) - auth_tok = ''.join(e for e in decrypt_tok(crypt_auth_tok.encode()) - if e.isalnum()) # strip out non-alnum characters + crypt_auth_tok = ( + kwargs["crypt_auth_tok"] + if "crypt_auth_tok" in kwargs + else kwargs["dag_run"].conf["crypt_auth_tok"] + ) + auth_tok = "".join( + e for e in decrypt_tok(crypt_auth_tok.encode()) if e.isalnum() + ) # strip out non-alnum characters return auth_tok def pythonop_send_create_dataset(**kwargs) -> str: """ Requests creation of a new dataset. Returns dataset info via XCOM - + Accepts the following via the caller's op_kwargs: 'http_conn_id' : the http connection to be used 'parent_dataset_uuid_callable' : called with **kwargs; returns uuid @@ -665,30 +667,30 @@ def pythonop_send_create_dataset(**kwargs) -> str: or 'dataset_types_callable' : called with **kwargs; returns the types list of the new dataset - + Returns the following via XCOM: (no key) : data_directory_path for the new dataset 'derived_dataset_uuid' : uuid for the created dataset 'group_uuid' : group uuid for the created dataset """ - for arg in ['parent_dataset_uuid_callable', 'http_conn_id']: + for arg in ["parent_dataset_uuid_callable", "http_conn_id"]: assert arg in kwargs, "missing required argument {}".format(arg) - for arg_options in [['dataset_types', 'dataset_types_callable']]: + for arg_options in [["dataset_types", "dataset_types_callable"]]: assert any([arg in kwargs for arg in arg_options]) - http_conn_id = kwargs['http_conn_id'] + http_conn_id = kwargs["http_conn_id"] # ctx = kwargs['dag_run'].conf headers = { - 'authorization': 'Bearer ' + get_auth_tok(**kwargs), - 'content-type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline' + "authorization": "Bearer " + get_auth_tok(**kwargs), + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", } - if 'dataset_types' in kwargs: - dataset_types = kwargs['dataset_types'] + if "dataset_types" in kwargs: + dataset_types = kwargs["dataset_types"] else: - dataset_types = kwargs['dataset_types_callable'](**kwargs) + dataset_types = kwargs["dataset_types_callable"](**kwargs) if not isinstance(dataset_types, list): dataset_types = [dataset_types] canonical_types = set() # to avoid duplicates @@ -699,78 +701,75 @@ def pythonop_send_create_dataset(**kwargs) -> str: contains_seq |= type_info.contains_pii # canonical_types = list(canonical_types) - source_uuids = kwargs['parent_dataset_uuid_callable'](**kwargs) + source_uuids = kwargs["parent_dataset_uuid_callable"](**kwargs) if not isinstance(source_uuids, list): source_uuids = [source_uuids] - dataset_name = kwargs['dataset_name_callable'](**kwargs) - + dataset_name = kwargs["dataset_name_callable"](**kwargs) + try: - response = HttpHook('GET', http_conn_id=http_conn_id).run( - endpoint=f'entities/{source_uuids[0]}', + response = HttpHook("GET", http_conn_id=http_conn_id).run( + endpoint=f"entities/{source_uuids[0]}", headers=headers, - extra_options={'check_response': False} + extra_options={"check_response": False}, ) response.raise_for_status() response_json = response.json() - if 'group_uuid' not in response_json: - print(f'response from GET on entities{source_uuids[0]}:') + if "group_uuid" not in response_json: + print(f"response from GET on entities{source_uuids[0]}:") pprint(response_json) - raise ValueError('entities response did not contain group_uuid') - parent_group_uuid = response_json['group_uuid'] + raise ValueError("entities response did not contain group_uuid") + parent_group_uuid = response_json["group_uuid"] data = { "direct_ancestor_uuids": source_uuids, "dataset_info": dataset_name, "data_types": dataset_types, "group_uuid": parent_group_uuid, - "contains_human_genetic_sequences": contains_seq + "contains_human_genetic_sequences": contains_seq, } - if 'previous_revision_uuid_callable' in kwargs: - previous_revision_uuid = kwargs['previous_revision_uuid_callable'](**kwargs) + if "previous_revision_uuid_callable" in kwargs: + previous_revision_uuid = kwargs["previous_revision_uuid_callable"](**kwargs) if previous_revision_uuid is not None: - data['previous_revision_uuid'] = previous_revision_uuid - print('data for dataset creation:') + data["previous_revision_uuid"] = previous_revision_uuid + print("data for dataset creation:") pprint(data) - response = HttpHook('POST', http_conn_id=http_conn_id).run( - endpoint='datasets', - data=json.dumps(data), - headers=headers, - extra_options={} + response = HttpHook("POST", http_conn_id=http_conn_id).run( + endpoint="datasets", data=json.dumps(data), headers=headers, extra_options={} ) response.raise_for_status() response_json = response.json() - print('response to dataset creation:') + print("response to dataset creation:") pprint(response_json) - for elt in ['uuid', 'group_uuid']: + for elt in ["uuid", "group_uuid"]: if elt not in response_json: - raise ValueError(f'datasets response did not contain {elt}') - uuid = response_json['uuid'] - group_uuid = response_json['group_uuid'] - - response = HttpHook('GET', http_conn_id=http_conn_id).run( - endpoint=f'datasets/{uuid}/file-system-abs-path', + raise ValueError(f"datasets response did not contain {elt}") + uuid = response_json["uuid"] + group_uuid = response_json["group_uuid"] + + response = HttpHook("GET", http_conn_id=http_conn_id).run( + endpoint=f"datasets/{uuid}/file-system-abs-path", headers=headers, - extra_options={'check_response': False} + extra_options={"check_response": False}, ) response.raise_for_status() response_json = response.json() - if 'path' not in response_json: - print(f'response from datasets/{uuid}/file-system-abs-path:') + if "path" not in response_json: + print(f"response from datasets/{uuid}/file-system-abs-path:") pprint(response_json) - raise ValueError(f'datasets/{uuid}/file-system-abs-path' - ' did not return a path') - abs_path = response_json['path'] + raise ValueError(f"datasets/{uuid}/file-system-abs-path" " did not return a path") + abs_path = response_json["path"] except HTTPError as e: - print(f'ERROR: {e}') + print(f"ERROR: {e}") if e.response.status_code == codes.unauthorized: - raise RuntimeError(f'authorization for {endpoint} was rejected?') + # TODO: endpoint is undefined + raise RuntimeError(f"authorization for {endpoint} was rejected?") else: - raise RuntimeError(f'misc error {e} on {endpoint}') - - kwargs['ti'].xcom_push(key='group_uuid', value=group_uuid) - kwargs['ti'].xcom_push(key='derived_dataset_uuid', value=uuid) + raise RuntimeError(f"misc error {e} on {endpoint}") + + kwargs["ti"].xcom_push(key="group_uuid", value=group_uuid) + kwargs["ti"].xcom_push(key="derived_dataset_uuid", value=uuid) return abs_path @@ -779,7 +778,7 @@ def pythonop_set_dataset_state(**kwargs) -> None: Sets the status of a dataset, to 'Processing' if no specific state is specified. NOTE that this routine cannot change a dataset into or out of the Published state. - + Accepts the following via the caller's op_kwargs: 'dataset_uuid_callable' : called with **kwargs; returns the uuid of the dataset to be modified @@ -788,34 +787,22 @@ def pythonop_set_dataset_state(**kwargs) -> None: 'message' : update message, saved as dataset metadata element "pipeline_messsage". The default is not to save any message. """ - for arg in ['dataset_uuid_callable']: + for arg in ["dataset_uuid_callable"]: assert arg in kwargs, "missing required argument {}".format(arg) - dataset_uuid = kwargs['dataset_uuid_callable'](**kwargs) - http_conn_id = kwargs.get('http_conn_id', 'entity_api_connection') - endpoint = f'/entities/{dataset_uuid}' - ds_state = kwargs['ds_state'] if 'ds_state' in kwargs else 'Processing' - message = kwargs.get('message', None) - headers = { - 'authorization': 'Bearer ' + get_auth_tok(**kwargs), - 'content-type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline'} - extra_options = {} - - http_hook = HttpHook('PUT', - http_conn_id=http_conn_id) - - data = {'status': ds_state} - if message is not None: - data['pipeline_message'] = message - print('data: ') - pprint(data) - - response = http_hook.run(endpoint, - json.dumps(data), - headers, - extra_options) - print('response: ') - pprint(response.json()) + dataset_uuid = kwargs["dataset_uuid_callable"](**kwargs) + http_conn_id = kwargs.get("http_conn_id", "entity_api_connection") + status = kwargs["ds_state"] if "ds_state" in kwargs else "Processing" + message = kwargs.get("message", None) + StatusChanger( + dataset_uuid, + get_auth_tok(**kwargs), + status, + { + "extra_fields": {"pipeline_message": message} if message else {}, + "extra_options": {}, + }, + http_conn_id=http_conn_id, + ).on_status_change() def restructure_entity_metadata(raw_metadata: JSONType) -> JSONType: @@ -830,15 +817,15 @@ def restructure_entity_metadata(raw_metadata: JSONType) -> JSONType: de-restructured version can be used by workflows in liu of the original. """ md = {} - if 'ingest_metadata' in raw_metadata: - if 'metadata' in raw_metadata['ingest_metadata']: - md['metadata'] = deepcopy(raw_metadata['ingest_metadata']['metadata']) - if 'extra_metadata' in raw_metadata['ingest_metadata']: - md.update(raw_metadata['ingest_metadata']['extra_metadata']) - if 'contributors' in raw_metadata: - md['contributors'] = deepcopy(raw_metadata['contributors']) - if 'antibodies' in raw_metadata: - md['antibodies'] = deepcopy(raw_metadata['antibodies']) + if "ingest_metadata" in raw_metadata: + if "metadata" in raw_metadata["ingest_metadata"]: + md["metadata"] = deepcopy(raw_metadata["ingest_metadata"]["metadata"]) + if "extra_metadata" in raw_metadata["ingest_metadata"]: + md.update(raw_metadata["ingest_metadata"]["extra_metadata"]) + if "contributors" in raw_metadata: + md["contributors"] = deepcopy(raw_metadata["contributors"]) + if "antibodies" in raw_metadata: + md["antibodies"] = deepcopy(raw_metadata["antibodies"]) # print('reconstructed metadata follows') # pprint(md) return md @@ -848,52 +835,54 @@ def pythonop_get_dataset_state(**kwargs) -> JSONType: """ Gets the status JSON structure for a dataset. Works for Uploads and Publications as well as Datasets. - + Accepts the following via the caller's op_kwargs: 'dataset_uuid_callable' : called with **kwargs; returns the uuid of the Dataset or Upload to be examined """ - for arg in ['dataset_uuid_callable']: + for arg in ["dataset_uuid_callable"]: assert arg in kwargs, "missing required argument {}".format(arg) - uuid = kwargs['dataset_uuid_callable'](**kwargs) - method = 'GET' + uuid = kwargs["dataset_uuid_callable"](**kwargs) + method = "GET" auth_tok = get_auth_tok(**kwargs) headers = { - 'authorization': f'Bearer {auth_tok}', - 'content-type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline', - } - http_hook = HttpHook(method, http_conn_id='entity_api_connection') + "authorization": f"Bearer {auth_tok}", + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + } + http_hook = HttpHook(method, http_conn_id="entity_api_connection") - endpoint = f'entities/{uuid}' + endpoint = f"entities/{uuid}" try: - response = http_hook.run(endpoint, - headers=headers, - extra_options={'check_response': False}) + response = http_hook.run( + endpoint, headers=headers, extra_options={"check_response": False} + ) response.raise_for_status() ds_rslt = response.json() - print('ds rslt:') + print("ds rslt:") pprint(ds_rslt) except HTTPError as e: - print(f'ERROR: {e}') + print(f"ERROR: {e}") if e.response.status_code == codes.unauthorized: - raise RuntimeError('entity database authorization was rejected?') + raise RuntimeError("entity database authorization was rejected?") else: - print('benign error') + print("benign error") return {} - for key in ['status', 'uuid', 'entity_type']: + for key in ["status", "uuid", "entity_type"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if ds_rslt['entity_type'] in ['Dataset', 'Publication']: - assert 'data_types' in ds_rslt, f"Dataset status for {uuid} has no data_types" - data_types = ds_rslt['data_types'] - parent_dataset_uuid_list = [ancestor['uuid'] - for ancestor in ds_rslt['direct_ancestors'] - if ancestor['entity_type'] == 'Dataset'] + if ds_rslt["entity_type"] in ["Dataset", "Publication"]: + assert "data_types" in ds_rslt, f"Dataset status for {uuid} has no data_types" + data_types = ds_rslt["data_types"] + parent_dataset_uuid_list = [ + ancestor["uuid"] + for ancestor in ds_rslt["direct_ancestors"] + if ancestor["entity_type"] == "Dataset" + ] metadata = restructure_entity_metadata(ds_rslt) endpoint = f"datasets/{ds_rslt['uuid']}/file-system-abs-path" - elif ds_rslt['entity_type'] == 'Upload': + elif ds_rslt["entity_type"] == "Upload": data_types = [] metadata = {} endpoint = f"uploads/{ds_rslt['uuid']}/file-system-abs-path" @@ -901,76 +890,71 @@ def pythonop_get_dataset_state(**kwargs) -> JSONType: else: raise RuntimeError(f"Unknown entity_type {ds_rslt['entity_type']}") try: - http_hook = HttpHook(method, http_conn_id='ingest_api_connection') - response = http_hook.run(endpoint, - headers=headers, - extra_options={'check_response': False}) + http_hook = HttpHook(method, http_conn_id="ingest_api_connection") + response = http_hook.run( + endpoint, headers=headers, extra_options={"check_response": False} + ) response.raise_for_status() path_query_rslt = response.json() - print('path_query rslt:') + print("path_query rslt:") pprint(path_query_rslt) except HTTPError as e: - print(f'ERROR: {e}') + print(f"ERROR: {e}") if e.response.status_code == codes.unauthorized: - raise RuntimeError('entity database authorization was rejected?') + raise RuntimeError("entity database authorization was rejected?") else: - print('benign error') + print("benign error") return {} - assert 'path' in path_query_rslt, (f"Dataset path for {uuid} produced" - " no path") - full_path = path_query_rslt['path'] + assert "path" in path_query_rslt, f"Dataset path for {uuid} produced" " no path" + full_path = path_query_rslt["path"] rslt = { - 'entity_type': ds_rslt['entity_type'], - 'status': ds_rslt['status'], - 'uuid': ds_rslt['uuid'], - 'parent_dataset_uuid_list': parent_dataset_uuid_list, - 'data_types': data_types, - 'local_directory_full_path': full_path, - 'metadata': metadata, + "entity_type": ds_rslt["entity_type"], + "status": ds_rslt["status"], + "uuid": ds_rslt["uuid"], + "parent_dataset_uuid_list": parent_dataset_uuid_list, + "data_types": data_types, + "local_directory_full_path": full_path, + "metadata": metadata, } - if ds_rslt['entity_type'] == 'Dataset': - http_hook = HttpHook('GET', http_conn_id='entity_api_connection') + if ds_rslt["entity_type"] == "Dataset": + http_hook = HttpHook("GET", http_conn_id="entity_api_connection") endpoint = f"datasets/{ds_rslt['uuid']}/organs" try: - response = http_hook.run(endpoint, - headers=headers, - extra_options={'check_response': False}) + response = http_hook.run( + endpoint, headers=headers, extra_options={"check_response": False} + ) response.raise_for_status() organs_query_rslt = response.json() - print('organs_query_rslt:') + print("organs_query_rslt:") pprint(organs_query_rslt) - rslt['organs'] = [entry['organ'] for entry in organs_query_rslt] + rslt["organs"] = [entry["organ"] for entry in organs_query_rslt] except HTTPError as e: - print(f'ERROR: {e}') + print(f"ERROR: {e}") if e.response.status_code == codes.unauthorized: - raise RuntimeError('entity database authorization was rejected?') + raise RuntimeError("entity database authorization was rejected?") else: - print('benign error') + print("benign error") return {} - + return rslt def _uuid_lookup(uuid, **kwargs): - http_conn_id = 'uuid_api_connection' - endpoint = 'hmuuid/{}'.format(uuid) - method = 'GET' - headers = {'authorization': 'Bearer ' + get_auth_tok(**kwargs)} -# print('headers:') -# pprint(headers) + http_conn_id = "uuid_api_connection" + endpoint = "hmuuid/{}".format(uuid) + method = "GET" + headers = {"authorization": "Bearer " + get_auth_tok(**kwargs)} + # print('headers:') + # pprint(headers) extra_options = {} - http_hook = HttpHook(method, - http_conn_id=http_conn_id) + http_hook = HttpHook(method, http_conn_id=http_conn_id) - response = http_hook.run(endpoint, - None, - headers, - extra_options) -# print('response: ') -# pprint(response.json()) + response = http_hook.run(endpoint, None, headers, extra_options) + # print('response: ') + # pprint(response.json()) return response.json() @@ -980,8 +964,8 @@ def _generate_slices(id: str) -> Iterable[str]: base, lidx, hidx = mo.groups() lidx = int(lidx) hidx = int(hidx) - for idx in range(lidx, hidx+1): - yield f'{base}-{idx}' + for idx in range(lidx, hidx + 1): + yield f"{base}-{idx}" else: yield id @@ -991,12 +975,12 @@ def assert_id_known(id: str, **kwargs) -> None: Is the given id string known to the uuid database? Id strings with suffixes like myidstr-n1_n2 where n1 and n2 are integers are interpreted as representing multiple ids with suffix integers in the range n1 to n2 inclusive. - + Raises AssertionError if the ID is not known. """ for slice in _generate_slices(id): tissue_info = _uuid_lookup(slice, **kwargs) - assert tissue_info and len(tissue_info) >= 1, f'tissue_id {slice} not found on lookup' + assert tissue_info and len(tissue_info) >= 1, f"tissue_id {slice} not found on lookup" def pythonop_md_consistency_tests(**kwargs) -> int: @@ -1004,41 +988,39 @@ def pythonop_md_consistency_tests(**kwargs) -> int: Perform simple consistency checks of the metadata stored as YAML in kwargs['metadata_fname']. This includes accessing the UUID api via its Airflow connection ID to verify uuids. """ - md_path = join(get_tmp_dir_path(kwargs['run_id']), kwargs['metadata_fname']) + md_path = join(get_tmp_dir_path(kwargs["run_id"]), kwargs["metadata_fname"]) if exists(md_path): - with open(md_path, 'r') as f: + with open(md_path, "r") as f: md = yaml.safe_load(f) - # print('metadata from {} follows:'.format(md_path)) - # pprint(md) - if '_from_metadatatsv' in md and md['_from_metadatatsv']: + # print('metadata from {} follows:'.format(md_path)) + # pprint(md) + if "_from_metadatatsv" in md and md["_from_metadatatsv"]: try: - for elt in ['tissue_id', 'donor_id']: - assert elt in md, 'metadata is missing {}'.format(elt) - assert md['tissue_id'].startswith(md['donor_id']+'-'), 'tissue_id does not match' - assert_id_known(md['tissue_id'], **kwargs) + for elt in ["tissue_id", "donor_id"]: + assert elt in md, "metadata is missing {}".format(elt) + assert md["tissue_id"].startswith(md["donor_id"] + "-"), "tissue_id does not match" + assert_id_known(md["tissue_id"], **kwargs) return 0 except AssertionError as e: - kwargs['ti'].xcom_push(key='err_msg', - value='Assertion Failed: {}'.format(e)) + kwargs["ti"].xcom_push(key="err_msg", value="Assertion Failed: {}".format(e)) return 1 else: return 0 else: - kwargs['ti'].xcom_push(key='err_msg', - value='Expected metadata file is missing') + kwargs["ti"].xcom_push(key="err_msg", value="Expected metadata file is missing") return 1 - + def _get_scratch_base_path() -> Path: - dct = airflow_conf.as_dict(display_sensitive=True)['connections'] - if 'WORKFLOW_SCRATCH' in dct: - scratch_path = dct['WORKFLOW_SCRATCH'] - elif 'workflow_scratch' in dct: + dct = airflow_conf.as_dict(display_sensitive=True)["connections"] + if "WORKFLOW_SCRATCH" in dct: + scratch_path = dct["WORKFLOW_SCRATCH"] + elif "workflow_scratch" in dct: # support for lower case is necessary setting the scratch path via the # environment variable AIRFLOW__CONNECTIONS__WORKFLOW_SCRATCH - scratch_path = dct['workflow_scratch'] + scratch_path = dct["workflow_scratch"] else: - raise KeyError('WORKFLOW_SCRATCH') # preserve original code behavior + raise KeyError("WORKFLOW_SCRATCH") # preserve original code behavior scratch_path = scratch_path.strip("'").strip('"') # remove quotes that may be on the string return Path(scratch_path) @@ -1059,21 +1041,22 @@ def get_cwltool_bin_path() -> Path: while cwltool_dir: part1, part2 = split(cwltool_dir) cwltool_dir = part1 - if part2 == 'lib': + if part2 == "lib": break - assert cwltool_dir, 'Failed to find cwltool bin directory' - cwltool_dir = Path(cwltool_dir, 'bin') + assert cwltool_dir, "Failed to find cwltool bin directory" + cwltool_dir = Path(cwltool_dir, "bin") return cwltool_dir def get_cwltool_base_cmd(tmpdir: Path) -> List[str]: return [ - 'env', - 'TMPDIR={}'.format(tmpdir), - '_JAVA_OPTIONS={}'.format('-XX:ActiveProcessorCount=2'), - 'cwltool', - '--timestamps', - '--preserve-environment', '_JAVA_OPTIONS', + "env", + "TMPDIR={}".format(tmpdir), + "_JAVA_OPTIONS={}".format("-XX:ActiveProcessorCount=2"), + "cwltool", + "--timestamps", + "--preserve-environment", + "_JAVA_OPTIONS", # The trailing slashes in the next two lines are deliberate. # cwltool treats these path prefixes as *strings*, not as # directories in which new temporary dirs should be created, so @@ -1081,184 +1064,21 @@ def get_cwltool_base_cmd(tmpdir: Path) -> List[str]: # like '/tmp/cwl-tmpXXXXXXXX' with 'XXXXXXXX' as a random string. # Adding the trailing slash is ensures that temporary directories # are created as *subdirectories* of 'cwl-tmp' and 'cwl-out-tmp'. - '--tmpdir-prefix={}/'.format(tmpdir / 'cwl-tmp'), - '--tmp-outdir-prefix={}/'.format(tmpdir / 'cwl-out-tmp'), + "--tmpdir-prefix={}/".format(tmpdir / "cwl-tmp"), + "--tmp-outdir-prefix={}/".format(tmpdir / "cwl-out-tmp"), ] -def make_send_status_msg_function_old( - dag_file: str, - retcode_ops: List[str], - cwl_workflows: List[Path], - http_conn_id: str = 'ingest_api_connection', - uuid_src_task_id: str = 'send_create_dataset', - dataset_uuid_fun: Optional[Callable[..., str]] = None, - dataset_lz_path_fun: Optional[Callable[..., str]] = None, - metadata_fun: Optional[Callable[..., dict]] = None, - include_file_metadata: Optional[bool] = True -) -> Callable[..., None]: - """ - The function which is generated by this function will return a boolean, - True if the message which was ultimately sent was for a success and - False otherwise. This return value is not necessary in most circumstances - but is useful when the generated function is being wrapped. - - The user can specify dataset_uuid_fun and dataset_lz_path_fun, or leave - both to their empty default values and specify 'uuid_src_task_id'. - - `dag_file` should always be `__file__` wherever this function is used, - to include the DAG file in the provenance. This could be "automated" with - something like `sys._getframe(1).f_code.co_filename`, but that doesn't - seem worth it at the moment - - 'http_conn_id' is the Airflow connection id associated with the /datasets/status service. - - 'dataset_uuid_fun' is a function which returns the uuid of the dataset to be - updated, or None. If given, it will be called with **kwargs arguments. - - 'dataset_lz_path_fun' is a function which returns the full path of the dataset - data directory, or None. If given, it will be called with **kwargs arguments. - If the return value of this callable is None or the empty string, no file metadata - will be ultimately be included in the status message. - - 'uuid_src_task_id' is the Airflow task_id of a task providing the uuid via - the XCOM key 'derived_dataset_uuid' and the dataset data directory - via the None key. This is used only if dataset_uuid is None or dataset_lz_path - is None. - - 'metadata_fun' is a function which returns additional metadata in JSON form, - or None. If given, it will be called with **kwargs arguments. This function - will only be evaluated if retcode_ops have all returned 0. - - 'include_file_metadata is a boolean defaulting to True which indicates whether - file metadata should be included in the transmitted metadata structure. If False, - no file metadata will be included. Note that file metadata may also be excluded - based on the return value of 'dataset_lz_path_fun' above. - """ - def send_status_msg(**kwargs) -> bool: - retcodes = [ - kwargs['ti'].xcom_pull(task_ids=op) - for op in retcode_ops - ] - retcodes = [int(rc or '0') for rc in retcodes] - print('retcodes: ', {k: v for k, v in zip(retcode_ops, retcodes)}) - success = all(rc == 0 for rc in retcodes) - if dataset_uuid_fun is None: - dataset_uuid = kwargs['ti'].xcom_pull( - key='derived_dataset_uuid', - task_ids=uuid_src_task_id, - ) - else: - dataset_uuid = dataset_uuid_fun(**kwargs) - if dataset_lz_path_fun is None: - ds_dir = kwargs['ti'].xcom_pull(task_ids=uuid_src_task_id) - else: - ds_dir = dataset_lz_path_fun(**kwargs) - endpoint = '/datasets/status' - method = 'PUT' - headers = { - 'authorization': 'Bearer ' + get_auth_tok(**kwargs), - 'content-type': 'application/json', - } - extra_options = {} - return_status = True # mark false on failure - - http_hook = HttpHook(method, http_conn_id=http_conn_id) - - if success: - md = {} - files_for_provenance = [dag_file, *cwl_workflows] - - if 'dag_provenance' in kwargs['dag_run'].conf: - md['dag_provenance'] = kwargs['dag_run'].conf['dag_provenance'].copy() - new_prv_dct = get_git_provenance_dict(files_for_provenance) - md['dag_provenance'].update(new_prv_dct) - else: - dag_prv = (kwargs['dag_run'].conf['dag_provenance_list'] - if 'dag_provenance_list' in kwargs['dag_run'].conf - else []) - dag_prv.extend(get_git_provenance_list(files_for_provenance)) - md['dag_provenance_list'] = dag_prv - - if metadata_fun: - md['metadata'] = metadata_fun(**kwargs) - - if dataset_lz_path_fun: - dataset_dir_abs_path = dataset_lz_path_fun(**kwargs) - if dataset_dir_abs_path: - ######################################################################### - # Added by Zhou 6/16/2021 for registering thumbnail image - # This is the only place that uses this hardcoded extras/thumbnail.jpg - thumbnail_file_abs_path = join(dataset_dir_abs_path, - 'extras/thumbnail.jpg') - if exists(thumbnail_file_abs_path): - md['thumbnail_file_abs_path'] = thumbnail_file_abs_path - ######################################################################### - - manifest_files = find_pipeline_manifests(cwl_workflows) - if include_file_metadata and ds_dir is not None and not ds_dir == '': - md.update( - get_file_metadata_dict( - ds_dir, - get_tmp_dir_path(kwargs['run_id']), - manifest_files, - ), - ) - try: - assert_json_matches_schema(md, 'dataset_metadata_schema.yml') - data = { - 'dataset_id': dataset_uuid, - 'status': 'QA', - 'message': 'the process ran', - 'metadata': md, - } - except AssertionError as e: - print('invalid metadata follows:') - pprint(md) - data = { - 'dataset_id': dataset_uuid, - 'status': 'Error', - 'message': 'internal error; schema violation: {}'.format(e), - 'metadata': {}, - } - return_status = False - else: - log_fname = Path(get_tmp_dir_path(kwargs['run_id']), 'session.log') - with open(log_fname, 'r') as f: - err_txt = '\n'.join(f.readlines()) - data = { - 'dataset_id': dataset_uuid, - 'status': 'Invalid', - 'message': err_txt, - } - return_status = False - print('data: ') - pprint(data) - - response = http_hook.run( - endpoint, - json.dumps(data), - headers, - extra_options, - ) - print('response: ') - pprint(response.json()) - - return return_status - - return send_status_msg - - def make_send_status_msg_function( - dag_file: str, - retcode_ops: List[str], - cwl_workflows: List[Path], - uuid_src_task_id: str = 'send_create_dataset', - dataset_uuid_fun: Optional[Callable[..., str]] = None, - dataset_lz_path_fun: Optional[Callable[..., str]] = None, - metadata_fun: Optional[Callable[..., dict]] = None, - include_file_metadata: Optional[bool] = True -) -> Callable[..., None]: + dag_file: str, + retcode_ops: List[str], + cwl_workflows: List[Path], + uuid_src_task_id: str = "send_create_dataset", + dataset_uuid_fun: Optional[Callable[..., str]] = None, + dataset_lz_path_fun: Optional[Callable[..., str]] = None, + metadata_fun: Optional[Callable[..., dict]] = None, + include_file_metadata: Optional[bool] = True, +) -> Callable[..., bool]: """ The function which is generated by this function will return a boolean, True if the message which was ultimately sent was for a success and @@ -1300,10 +1120,11 @@ def make_send_status_msg_function( # Does the string represent a "true" value, or an int that is 1 def __is_true(val): - if val is None: return False + if val is None: + return False if isinstance(val, str): uval = val.upper().strip() - if uval in ['TRUE', 'T', '1', 'Y', 'YES']: + if uval in ["TRUE", "T", "1", "Y", "YES"]: return True else: return False @@ -1313,53 +1134,44 @@ def __is_true(val): return False def send_status_msg(**kwargs) -> bool: - retcodes = [ - kwargs['ti'].xcom_pull(task_ids=op) - for op in retcode_ops - ] - retcodes = [int(rc or '0') for rc in retcodes] - print('retcodes: ', {k: v for k, v in zip(retcode_ops, retcodes)}) + retcodes = [kwargs["ti"].xcom_pull(task_ids=op) for op in retcode_ops] + retcodes = [int(rc or "0") for rc in retcodes] + print("retcodes: ", {k: v for k, v in zip(retcode_ops, retcodes)}) success = all(rc == 0 for rc in retcodes) if dataset_uuid_fun is None: - dataset_uuid = kwargs['ti'].xcom_pull( - key='derived_dataset_uuid', + dataset_uuid = kwargs["ti"].xcom_pull( + key="derived_dataset_uuid", task_ids=uuid_src_task_id, ) else: dataset_uuid = dataset_uuid_fun(**kwargs) if dataset_lz_path_fun is None: - ds_dir = kwargs['ti'].xcom_pull(task_ids=uuid_src_task_id) + ds_dir = kwargs["ti"].xcom_pull(task_ids=uuid_src_task_id) else: ds_dir = dataset_lz_path_fun(**kwargs) - endpoint = '/entities/' + dataset_uuid - method = 'PUT' - headers = { - 'authorization': 'Bearer ' + get_auth_tok(**kwargs), - 'content-type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline', - } - extra_options = {} return_status = True # mark false on failure - - http_hook = HttpHook(method, http_conn_id='entity_api_connection') + status = None + extra_fields = {} if success: md = {} files_for_provenance = [dag_file, *cwl_workflows] - if 'dag_provenance' in kwargs['dag_run'].conf: - md['dag_provenance'] = kwargs['dag_run'].conf['dag_provenance'].copy() + if "dag_provenance" in kwargs["dag_run"].conf: + md["dag_provenance"] = kwargs["dag_run"].conf["dag_provenance"].copy() new_prv_dct = get_git_provenance_dict(files_for_provenance) - md['dag_provenance'].update(new_prv_dct) + md["dag_provenance"].update(new_prv_dct) else: - dag_prv = (kwargs['dag_run'].conf['dag_provenance_list'] - if 'dag_provenance_list' in kwargs['dag_run'].conf - else []) + dag_prv = ( + kwargs["dag_run"].conf["dag_provenance_list"] + if "dag_provenance_list" in kwargs["dag_run"].conf + else [] + ) dag_prv.extend(get_git_provenance_list(files_for_provenance)) - md['dag_provenance_list'] = dag_prv + md["dag_provenance_list"] = dag_prv if metadata_fun: - md['metadata'] = metadata_fun(**kwargs) + md["metadata"] = metadata_fun(**kwargs) thumbnail_file_abs_path = [] if dataset_lz_path_fun: @@ -1368,8 +1180,7 @@ def send_status_msg(**kwargs) -> bool: ######################################################################### # Added by Zhou 6/16/2021 for registering thumbnail image # This is the only place that uses this hardcoded extras/thumbnail.jpg - thumbnail_file_abs_path = join(dataset_dir_abs_path, - 'extras/thumbnail.jpg') + thumbnail_file_abs_path = join(dataset_dir_abs_path, "extras/thumbnail.jpg") if exists(thumbnail_file_abs_path): thumbnail_file_abs_path = thumbnail_file_abs_path else: @@ -1377,88 +1188,93 @@ def send_status_msg(**kwargs) -> bool: ######################################################################### manifest_files = find_pipeline_manifests(cwl_workflows) - if include_file_metadata and ds_dir is not None and not ds_dir == '': + if include_file_metadata and ds_dir is not None and not ds_dir == "": md.update( get_file_metadata_dict( ds_dir, - get_tmp_dir_path(kwargs['run_id']), + get_tmp_dir_path(kwargs["run_id"]), manifest_files, ), ) # Refactoring metadata structure + contacts = [] if metadata_fun: - md['files'] = md['metadata'].pop('files_info_alt_path', []) - md['extra_metadata'] = {'collectiontype': md['metadata'].pop('collectiontype', None)} - md['thumbnail_file_abs_path'] = thumbnail_file_abs_path - antibodies = md['metadata'].pop('antibodies', []) - contributors = md['metadata'].pop('contributors', []) - md['metadata'] = md['metadata'].pop('metadata', []) - contacts = [] + md["files"] = md["metadata"].pop("files_info_alt_path", []) + md["extra_metadata"] = { + "collectiontype": md["metadata"].pop("collectiontype", None) + } + md["thumbnail_file_abs_path"] = thumbnail_file_abs_path + antibodies = md["metadata"].pop("antibodies", []) + contributors = md["metadata"].pop("contributors", []) + md["metadata"] = md["metadata"].pop("metadata", []) for contrib in contributors: - if 'is_contact' in contrib: - v = contrib['is_contact'] + if "is_contact" in contrib: + v = contrib["is_contact"] if __is_true(val=v): contacts.append(contrib) def my_callable(**kwargs): return dataset_uuid - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=my_callable, - **kwargs - ) + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) if not ds_rslt: - status = 'QA' + status = "QA" else: - status = ds_rslt.get('status', 'QA') - if status in ['Processing', 'New', 'Error']: - status = 'QA' + status = ds_rslt.get("status", "QA") + if status in ["Processing", "New"]: + status = "QA" if metadata_fun: if not contacts: - contacts = ds_rslt.get('contacts', []) + contacts = ds_rslt.get("contacts", []) try: - assert_json_matches_schema(md, 'dataset_metadata_schema.yml') - data = { - 'pipeline_message': 'the process ran', - 'ingest_metadata': md, + assert_json_matches_schema(md, "dataset_metadata_schema.yml") + extra_fields = { + "pipeline_message": "the process ran", + "ingest_metadata": md, } if metadata_fun: - data.update({'antibodies': antibodies, - 'contributors': contributors, - 'contacts': contacts}) - if status not in ['Published']: - data.update({'status': status}) + extra_fields.update( + { + "antibodies": antibodies, + "contributors": contributors, + "contacts": contacts, + } + ) + if status in ["Published"]: + status = None except AssertionError as e: - print('invalid metadata follows:') + print("invalid metadata follows:") pprint(md) - data = { - 'status': 'Error', - 'pipeline_message': 'internal error; schema violation: {}'.format(e), - 'ingest_metadata': {}, + status = "Error" + extra_fields = { + "status": "Error", + "pipeline_message": "internal error; schema violation: {}".format(e), + "ingest_metadata": {}, } return_status = False else: - log_fname = Path(get_tmp_dir_path(kwargs['run_id']), 'session.log') - with open(log_fname, 'r') as f: - err_txt = '\n'.join(f.readlines()) - data = { - 'status': 'Invalid', - 'pipeline_message': err_txt, + log_fname = Path(get_tmp_dir_path(kwargs["run_id"]), "session.log") + with open(log_fname, "r") as f: + err_txt = "\n".join(f.readlines()) + status = "Invalid" + extra_fields = { + "status": "Invalid", + "pipeline_message": err_txt, } return_status = False - print('data: ') - pprint(data) - - response = http_hook.run( - endpoint, - json.dumps(data), - headers, - extra_options, - ) - print('response: ') - pprint(response.json()) + entity_type = ds_rslt.get("entity_type") + StatusChanger( + dataset_uuid, + get_auth_tok(**kwargs), + status, + { + "extra_fields": extra_fields, + "extra_options": {}, + }, + entity_type=entity_type if entity_type else None, + ).on_status_change() return return_status @@ -1471,8 +1287,8 @@ def map_queue_name(raw_queue_name: str) -> str: provided queue name. This allows job separation under Celery. """ conf_dict = airflow_conf.as_dict() - if 'QUEUE_NAME_TEMPLATE' in conf_dict.get('connections', {}): - template = conf_dict['connections']['QUEUE_NAME_TEMPLATE'] + if "QUEUE_NAME_TEMPLATE" in conf_dict.get("connections", {}): + template = conf_dict["connections"]["QUEUE_NAME_TEMPLATE"] template = template.strip("'").strip('"') # remove quotes that may be on the config string rslt = template.format(raw_queue_name) return rslt @@ -1480,22 +1296,28 @@ def map_queue_name(raw_queue_name: str) -> str: return raw_queue_name -def create_dataset_state_error_callback(dataset_uuid_callable: Callable[[Any], str]) -> Callable[[Mapping, Any], - None]: +def create_dataset_state_error_callback( + dataset_uuid_callable: Callable[[Any], str] +) -> Callable[[Mapping, Any], None]: def set_dataset_state_error(context_dict: Mapping, **kwargs) -> None: """ This routine is meant to be """ - msg = 'An internal error occurred in the {} workflow step {}'.format(context_dict['dag'].dag_id, - context_dict['task'].task_id) + msg = "An internal error occurred in the {} workflow step {}".format( + context_dict["dag"].dag_id, context_dict["task"].task_id + ) new_kwargs = kwargs.copy() new_kwargs.update(context_dict) - new_kwargs.update({'dataset_uuid_callable': dataset_uuid_callable, - 'http_conn_id': 'entity_api_connection', - 'ds_state': 'Error', - 'message': msg - }) + new_kwargs.update( + { + "dataset_uuid_callable": dataset_uuid_callable, + "http_conn_id": "entity_api_connection", + "ds_state": "Error", + "message": msg, + } + ) pythonop_set_dataset_state(**new_kwargs) + return set_dataset_state_error @@ -1509,7 +1331,7 @@ def localized_assert_json_matches_schema(jsn: JSONType, schemafile: str) -> None try: return assert_json_matches_schema(jsn, schemafile) # localized by set_schema_base_path except AssertionError as e: - print('ASSERTION FAILED: {}'.format(e)) + print("ASSERTION FAILED: {}".format(e)) raise @@ -1520,14 +1342,14 @@ def _get_workflow_map() -> List[Tuple[Pattern, Pattern, str]]: global COMPILED_WORKFLOW_MAP if COMPILED_WORKFLOW_MAP is None: map_path = join(dirname(__file__), WORKFLOW_MAP_FILENAME) - with open(map_path, 'r') as f: + with open(map_path, "r") as f: map = yaml.safe_load(f) localized_assert_json_matches_schema(map, WORKFLOW_MAP_SCHEMA) cmp_map = [] - for dct in map['workflow_map']: - ct_re = re.compile(dct['collection_type']) - at_re = re.compile(dct['assay_type']) - cmp_map.append((ct_re, at_re, dct['workflow'])) + for dct in map["workflow_map"]: + ct_re = re.compile(dct["collection_type"]) + at_re = re.compile(dct["assay_type"]) + cmp_map.append((ct_re, at_re, dct["workflow"])) COMPILED_WORKFLOW_MAP = cmp_map return COMPILED_WORKFLOW_MAP @@ -1539,29 +1361,25 @@ def _get_resource_map() -> List[Tuple[Pattern, Pattern, Dict[str, str]]]: global COMPILED_RESOURCE_MAP if COMPILED_RESOURCE_MAP is None: map_path = join(dirname(__file__), RESOURCE_MAP_FILENAME) - with open(map_path, 'r') as f: + with open(map_path, "r") as f: map = yaml.safe_load(f) localized_assert_json_matches_schema(map, RESOURCE_MAP_SCHEMA) cmp_map = [] - for dct in map['resource_map']: - dag_re = re.compile(dct['dag_re']) - dag_dct = {key: dct[key] for key in dct - if key not in ['dag_re', 'tasks']} + for dct in map["resource_map"]: + dag_re = re.compile(dct["dag_re"]) + dag_dct = {key: dct[key] for key in dct if key not in ["dag_re", "tasks"]} tasks = [] - for inner_dct in dct['tasks']: - assert 'task_re' in inner_dct, ('schema should guarantee' - ' "task_re" is present?') - task_re = re.compile(inner_dct['task_re']) - task_dct = {key: inner_dct[key] for key in inner_dct - if key not in ['task_re']} + for inner_dct in dct["tasks"]: + assert "task_re" in inner_dct, "schema should guarantee" ' "task_re" is present?' + task_re = re.compile(inner_dct["task_re"]) + task_dct = {key: inner_dct[key] for key in inner_dct if key not in ["task_re"]} tasks.append((task_re, task_dct)) cmp_map.append((dag_re, dag_dct, tasks)) COMPILED_RESOURCE_MAP = cmp_map return COMPILED_RESOURCE_MAP -def _lookup_resource_record(dag_id: str, - task_id: Optional[str] = None) -> Tuple[int, Dict]: +def _lookup_resource_record(dag_id: str, task_id: Optional[str] = None) -> Tuple[int, Dict]: """ Look up the resource map entry for the given dag_id and task_id. The first match is returned. If the task_id is None, the first record matching only @@ -1577,25 +1395,28 @@ def _lookup_resource_record(dag_id: str, rslt.update(task_dict) break else: - raise ValueError(f'Resource map entry for dag_id <{dag_id}>' - f' has no match for task_id <{task_id}>') + raise ValueError( + f"Resource map entry for dag_id <{dag_id}>" + f" has no match for task_id <{task_id}>" + ) return rslt else: - raise ValueError('No resource map entry found for' - f' dag_id <{dag_id}> task_id <{task_id}>') + raise ValueError( + "No resource map entry found for" f" dag_id <{dag_id}> task_id <{task_id}>" + ) + - def get_queue_resource(dag_id: str, task_id: Optional[str] = None) -> str: """ Look up the queue defined for this dag_id and task_id in the current - resource map. If the task_id is None, the lookup is done with - task_id='__default__', which presumably only matches the wildcard case. + resource map. If the task_id is None, the lookup is done with + task_id='__default__', which presumably only matches the wildcard case. """ if task_id is None: - task_id = '__default__' + task_id = "__default__" rec = _lookup_resource_record(dag_id, task_id) - assert 'queue' in rec, 'schema should guarantee "queue" is present?' - return map_queue_name(rec['queue']) + assert "queue" in rec, 'schema should guarantee "queue" is present?' + return map_queue_name(rec["queue"]) def get_lanes_resource(dag_id: str) -> int: @@ -1604,8 +1425,8 @@ def get_lanes_resource(dag_id: str) -> int: resource map. """ rec = _lookup_resource_record(dag_id) - assert 'lanes' in rec, 'schema should guarantee "lanes" is present?' - return int(rec['lanes']) + assert "lanes" in rec, 'schema should guarantee "lanes" is present?' + return int(rec["lanes"]) def get_preserve_scratch_resource(dag_id: str) -> bool: @@ -1614,9 +1435,8 @@ def get_preserve_scratch_resource(dag_id: str) -> bool: resource map. """ rec = _lookup_resource_record(dag_id) - assert 'preserve_scratch' in rec, ('schema should guarantee' - ' "preserve_scratch" is present?') - return bool(rec['preserve_scratch']) + assert "preserve_scratch" in rec, "schema should guarantee" ' "preserve_scratch" is present?' + return bool(rec["preserve_scratch"]) def get_threads_resource(dag_id: str, task_id: Optional[str] = None) -> int: @@ -1624,18 +1444,22 @@ def get_threads_resource(dag_id: str, task_id: Optional[str] = None) -> int: Look up the number of threads defined for this dag_id and task_id in the current resource map. If the task_id is None, the lookup is done with task_id='__default__', which presumably only matches the wildcard - case. + case. """ if task_id is None: - task_id = '__default__' + task_id = "__default__" rec = _lookup_resource_record(dag_id, task_id) - assert any(['threads' in rec, 'coreuse' in rec]), 'schema should guarantee "threads" or "coreuse" is present?' - if rec.get('coreuse'): - return math.ceil(os.cpu_count() * (int(rec.get('coreuse')) / 100)) \ - if int(rec.get('coreuse')) > 0 \ + assert any( + ["threads" in rec, "coreuse" in rec] + ), 'schema should guarantee "threads" or "coreuse" is present?' + if rec.get("coreuse"): + return ( + math.ceil(os.cpu_count() * (int(rec.get("coreuse")) / 100)) + if int(rec.get("coreuse")) > 0 else math.ceil(os.cpu_count() / 4) + ) else: - return int(rec.get('threads')) + return int(rec.get("threads")) def get_type_client() -> TypeClient: @@ -1651,14 +1475,14 @@ def _get_type_client() -> TypeClient: """ global TYPE_CLIENT if TYPE_CLIENT is None: - conn = HttpHook.get_connection('search_api_connection') - if conn.host.startswith('https'): - conn.host = urllib.parse.unquote(conn.host).split('https://')[1] - conn.conn_type = 'https' + conn = HttpHook.get_connection("search_api_connection") + if conn.host.startswith("https"): + conn.host = urllib.parse.unquote(conn.host).split("https://")[1] + conn.conn_type = "https" if conn.port is None: - url = f'{conn.conn_type}://{conn.host}' + url = f"{conn.conn_type}://{conn.host}" else: - url = f'{conn.conn_type}://{conn.host}:{conn.port}' + url = f"{conn.conn_type}://{conn.host}:{conn.port}" TYPE_CLIENT = TypeClient(url) return TYPE_CLIENT @@ -1685,9 +1509,9 @@ def downstream_workflow_iter(collectiontype: str, assay_type: StrOrListStr) -> I collectiontype and assay_type. Each workflow name is expected to correspond to a known workflow, e.g. an Airflow DAG implemented by workflow_name.py . """ - collectiontype = collectiontype or '' + collectiontype = collectiontype or "" assay_type = _canonicalize_assay_type_if_possible(assay_type) - assay_type = assay_type or '' + assay_type = assay_type or "" for ct_re, at_re, workflow in _get_workflow_map(): if isinstance(assay_type, str): at_match = at_re.match(assay_type) @@ -1698,42 +1522,43 @@ def downstream_workflow_iter(collectiontype: str, assay_type: StrOrListStr) -> I def encrypt_tok(cleartext_tok: str) -> bytes: - key = airflow_conf.as_dict(display_sensitive=True)['core']['fernet_key'] + key = airflow_conf.as_dict(display_sensitive=True)["core"]["fernet_key"] fernet = Fernet(key.encode()) return fernet.encrypt(cleartext_tok.encode()) def decrypt_tok(crypt_tok: bytes) -> str: - key = airflow_conf.as_dict(display_sensitive=True)['core']['fernet_key'] + key = airflow_conf.as_dict(display_sensitive=True)["core"]["fernet_key"] fernet = Fernet(key.encode()) return fernet.decrypt(crypt_tok).decode() def join_quote_command_str(pieces: List[Any]): - command_str = ' '.join(shlex.quote(str(piece)) for piece in pieces) - print('final command_str:', command_str) + command_str = " ".join(shlex.quote(str(piece)) for piece in pieces) + print("final command_str:", command_str) return command_str def _strip_url(url): - return url.split(':')[1].strip('/') + return url.split(":")[1].strip("/") def find_matching_endpoint(host_url: str) -> str: """ Find the identity of the 'instance' of Airflow infrastructure based on environment information. - + host_url: the URL of entity-api in the current context returns: an instance string, for example 'PROD' or 'DEV' """ assert ENDPOINTS, "Context information is unavailable" stripped_url = _strip_url(host_url) - print(f'stripped_url: {stripped_url}') - candidates = [ep for ep in ENDPOINTS - if stripped_url == _strip_url(ENDPOINTS[ep]['entity_url'])] - assert len(candidates) == 1, f'Found {candidates}, expected 1 match' + print(f"stripped_url: {stripped_url}") + candidates = [ + ep for ep in ENDPOINTS if stripped_url == _strip_url(ENDPOINTS[ep]["entity_url"]) + ] + assert len(candidates) == 1, f"Found {candidates}, expected 1 match" return candidates[0] @@ -1742,7 +1567,7 @@ def main(): This provides some unit tests. To run it, you will need to define the 'search_api_connection' connection ID and the Fernet key. The easiest way to do that is with something like: - + export AIRFLOW_CONN_SEARCH_API_CONNECTION='https://search.api.hubmapconsortium.org/v3/ fernet_key=`python -c 'from cryptography.fernet import Fernet ; print(Fernet.generate_key().decode())'` export AIRFLOW__CORE__FERNET_KEY=${fernet_key} @@ -1751,34 +1576,41 @@ def main(): print(get_git_commits([__file__])) print(get_git_provenance_dict(__file__)) dirnm = dirname(__file__) - if dirnm == '': - dirnm = '.' + if dirnm == "": + dirnm = "." for elt in get_file_metadata(dirnm, DummyFileMatcher()): print(elt) pprint(get_git_provenance_list(__file__)) - md = {'metadata': {'my_string': 'hello world'}, - 'files': get_file_metadata(dirnm, DummyFileMatcher()), - 'dag_provenance_list': get_git_provenance_list(__file__)} + md = { + "metadata": {"my_string": "hello world"}, + "files": get_file_metadata(dirnm, DummyFileMatcher()), + "dag_provenance_list": get_git_provenance_list(__file__), + } try: - localized_assert_json_matches_schema(md, 'dataset_metadata_schema.yml') - print('ASSERT passed') + localized_assert_json_matches_schema(md, "dataset_metadata_schema.yml") + print("ASSERT passed") except AssertionError as e: - print(f'ASSERT failed {e}') - - assay_pairs = [('devtest', 'devtest'), ('codex', 'CODEX'), - ('codex', 'SOMEOTHER'), ('someother', 'CODEX'), - ('someother', 'salmon_sn_rnaseq_10x'), ('someother', 'salmon_rnaseq_10x_sn')] + print(f"ASSERT failed {e}") + + assay_pairs = [ + ("devtest", "devtest"), + ("codex", "CODEX"), + ("codex", "SOMEOTHER"), + ("someother", "CODEX"), + ("someother", "salmon_sn_rnaseq_10x"), + ("someother", "salmon_rnaseq_10x_sn"), + ] for collectiontype, assay_type in assay_pairs: - print('collectiontype {}, assay_type {}:'.format(collectiontype, assay_type)) + print("collectiontype {}, assay_type {}:".format(collectiontype, assay_type)) for elt in downstream_workflow_iter(collectiontype, assay_type): - print(' -> {}'.format(elt)) + print(" -> {}".format(elt)) - print(f'cwltool bin path: {get_cwltool_bin_path()}') + print(f"cwltool bin path: {get_cwltool_bin_path()}") - s = 'hello world' + s = "hello world" crypt_s = encrypt_tok(s) s2 = decrypt_tok(crypt_s) - print('crypto test: {} -> {} -> {}'.format(s, crypt_s, s2)) + print("crypto test: {} -> {} -> {}".format(s, crypt_s, s2)) if __name__ == "__main__": diff --git a/src/ingest-pipeline/airflow/dags/validate_upload.py b/src/ingest-pipeline/airflow/dags/validate_upload.py index 96cf3cd7..8fbc8b96 100644 --- a/src/ingest-pipeline/airflow/dags/validate_upload.py +++ b/src/ingest-pipeline/airflow/dags/validate_upload.py @@ -1,14 +1,17 @@ -import json +from __future__ import annotations + +import logging import sys from datetime import datetime, timedelta from pathlib import Path from pprint import pprint -# from error_catching.validate_upload_failure_callback import ValidateUploadFailure from hubmap_operators.common_operators import ( CleanupTmpDirOperator, CreateTmpDirOperator, ) +from status_change.failure_callback import FailureCallback +from status_change.status_manager import StatusChanger, Statuses from utils import ( HMDAG, get_auth_tok, @@ -22,7 +25,6 @@ from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator -from airflow.providers.http.hooks.http import HttpHook sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')) @@ -40,7 +42,7 @@ "email": ["gesina@psc.edu"], "email_on_failure": False, "email_on_retry": False, - # "on_failure_callback": ValidateUploadFailure, + "on_failure_callback": FailureCallback, "retries": 1, "retry_delay": timedelta(minutes=1), "xcom_push": True, @@ -132,42 +134,33 @@ def run_validation(**kwargs): def send_status_msg(**kwargs): validation_file_path = Path(kwargs["ti"].xcom_pull(key="validation_file_path")) - uuid = kwargs["ti"].xcom_pull(key="uuid") - endpoint = f"/entities/{uuid}" - headers = { - "authorization": "Bearer " + get_auth_tok(**kwargs), - "X-Hubmap-Application": "ingest-pipeline", - "content-type": "application/json", - } - extra_options = [] - http_conn_id = "entity_api_connection" - http_hook = HttpHook("PUT", http_conn_id=http_conn_id) with open(validation_file_path) as f: report_txt = f.read() if report_txt.startswith("No errors!"): - data = { - "status": "Valid", + status = Statuses.UPLOAD_VALID + extra_fields = { "validation_message": "", } else: - data = { - "status": "Invalid", + status = Statuses.UPLOAD_INVALID + extra_fields = { "validation_message": report_txt, } - # context = kwargs["ti"].get_template_context() - # ValidateUploadFailure(context, execute_methods=False).send_failure_email( - # report_txt=report_txt - # ) - print("data: ") - pprint(data) - response = http_hook.run( - endpoint, - json.dumps(data), - headers, - extra_options, + logging.info( + f""" + status: {status.value} + validation_message: {extra_fields['validation_message']} + """ ) - print("response: ") - pprint(response.json()) + StatusChanger( + kwargs["ti"].xcom_pull(key="uuid"), + get_auth_tok(**kwargs), + status, + { + "extra_fields": extra_fields, + "extra_options": {}, + }, + ).on_status_change() t_send_status = PythonOperator( task_id="send_status", diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 27bfdcaf..521814ba 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -1,28 +1,30 @@ #! /usr/bin/env python import argparse +import json import re +import time from pathlib import Path -from shutil import copytree, copy2 -from typing import TypeVar, List from pprint import pprint -import time -import json -import requests +from shutil import copy2, copytree +from typing import List, TypeVar + import pandas as pd +import requests +from status_change.status_manager import StatusChanger, Statuses # There has got to be a better solution for this, but I can't find it try: - from survey import (Dataset, EntityFactory, Upload, ENDPOINTS) + from survey import ENDPOINTS, Dataset, EntityFactory, Upload except ImportError: - from .survey import (Dataset, EntityFactory, Upload, ENDPOINTS) + from .survey import ENDPOINTS, Dataset, EntityFactory, Upload -DEFAULT_FROZEN_DF_FNAME = 'frozen_source_df{}.tsv' # must work with frozen_name.format(suffix) +DEFAULT_FROZEN_DF_FNAME = "frozen_source_df{}.tsv" # must work with frozen_name.format(suffix) FAKE_UUID_GENERATOR = None -SCRATCH_PATH = '/tmp/split_and_create' +SCRATCH_PATH = "/tmp/split_and_create" -StrOrListStr = TypeVar('StrOrListStr', str, List[str]) +StrOrListStr = TypeVar("StrOrListStr", str, List[str]) # # The following are used to try to deal with bad assay type information in the original @@ -31,10 +33,10 @@ # FALLBACK_ASSAY_TYPE_TRANSLATIONS = { # 'SNARE-Seq2-AC': 'SNARE-ATACseq2', - 'SNARE-Seq2-AC': 'SNAREseq', + "SNARE-Seq2-AC": "SNAREseq", # 'SNARE2-RNAseq': 'SNARE-RNAseq2', - 'SNARE2-RNAseq': 'sciRNAseq', - 'scRNAseq-10xGenomics-v2': 'scRNA-Seq-10x', + "SNARE2-RNAseq": "sciRNAseq", + "scRNAseq-10xGenomics-v2": "scRNA-Seq-10x", } @@ -46,35 +48,34 @@ # def _remove_na(row: pd.Series, parent_assay_type: StrOrListStr) -> pd.Series: new_row = row.copy() - key = 'transposition_kit_number' - if key in row and row[key].lower() == 'na': - new_row[key] = '' + key = "transposition_kit_number" + if key in row and row[key].lower() == "na": + new_row[key] = "" return new_row -SEQ_RD_FMT_TEST_RX = re.compile(r'\d+\+\d+\+\d+\+\d+') +SEQ_RD_FMT_TEST_RX = re.compile(r"\d+\+\d+\+\d+\+\d+") def _reformat_seq_read(row: pd.Series, parent_assay_type: StrOrListStr) -> pd.Series: new_row = row.copy() - key = 'sequencing_read_format' + key = "sequencing_read_format" if key in row and SEQ_RD_FMT_TEST_RX.match(row[key]): - new_row[key] = row[key].replace('+', '/') + new_row[key] = row[key].replace("+", "/") return new_row def _fix_snare_atac_assay_type(row: pd.Series, parent_assay_type: StrOrListStr) -> pd.Series: new_row = row.copy() - key1 = 'assay_type' - key2 = 'canonical_assay_type' - if (key1 in row and key2 in row - and row[key1] == 'SNARE-seq2' and row[key2] == 'SNAREseq'): - new_row[key2] = 'SNARE-seq2' + key1 = "assay_type" + key2 = "canonical_assay_type" + if key1 in row and key2 in row and row[key1] == "SNARE-seq2" and row[key2] == "SNAREseq": + new_row[key2] = "SNARE-seq2" return new_row SPECIAL_CASE_TRANSFORMATIONS = [ - (re.compile('SNAREseq'), [_remove_na, _reformat_seq_read, _fix_snare_atac_assay_type]) + (re.compile("SNAREseq"), [_remove_na, _reformat_seq_read, _fix_snare_atac_assay_type]) ] @@ -82,7 +83,7 @@ def create_fake_uuid_generator(): """This is used to simulate unique uuids for dryrun executions""" count = 0 while True: - rslt = 'fakeuuid_%08x'%count + rslt = "fakeuuid_%08x" % count count += 1 yield rslt @@ -92,10 +93,10 @@ def get_canonical_assay_type(row, entity_factory, default_type): Convert assay type to canonical form, with fallback """ try: - rslt = entity_factory.type_client.getAssayType(row['assay_type']).name + rslt = entity_factory.type_client.getAssayType(row["assay_type"]).name except Exception: print(f"fallback {row['assay_type']} {default_type}") - rslt = FALLBACK_ASSAY_TYPE_TRANSLATIONS.get(row['assay_type'], default_type) + rslt = FALLBACK_ASSAY_TYPE_TRANSLATIONS.get(row["assay_type"], default_type) print(f"{row['assay_type']} -> {rslt}") return rslt @@ -105,46 +106,49 @@ def create_new_uuid(row, source_entity, entity_factory, dryrun=False): Use the entity_factory to create a new dataset, with safety checks """ global FAKE_UUID_GENERATOR - canonical_assay_type = row['canonical_assay_type'] - orig_assay_type = row['assay_type'] - rec_identifier = row['data_path'].strip('/') - assert rec_identifier and rec_identifier != '.', 'Bad data_path!' + canonical_assay_type = row["canonical_assay_type"] + orig_assay_type = row["assay_type"] + rec_identifier = row["data_path"].strip("/") + assert rec_identifier and rec_identifier != ".", "Bad data_path!" info_txt_root = None if isinstance(source_entity, Dataset): - assert 'lab_dataset_id' in source_entity.prop_dct, (f'Dataset {source_entity.uuid}' - ' has no lab_dataset_id') - info_txt_root = source_entity.prop_dct['lab_dataset_id'] + assert "lab_dataset_id" in source_entity.prop_dct, ( + f"Dataset {source_entity.uuid}" " has no lab_dataset_id" + ) + info_txt_root = source_entity.prop_dct["lab_dataset_id"] elif isinstance(source_entity, Upload): - if 'title' in source_entity.prop_dct: - info_txt_root = source_entity.prop_dct['title'] + if "title" in source_entity.prop_dct: + info_txt_root = source_entity.prop_dct["title"] else: - print(f'WARNING: Upload {source_entity.uuid} has no title') + print(f"WARNING: Upload {source_entity.uuid} has no title") info_txt_root = f"Upload {source_entity.prop_dct['hubmap_id']}" - assert info_txt_root is not None, 'Expected a Dataset or an Upload' - info_txt = info_txt_root + ' : ' + rec_identifier + assert info_txt_root is not None, "Expected a Dataset or an Upload" + info_txt = info_txt_root + " : " + rec_identifier try: type_info = entity_factory.type_client.getAssayType(canonical_assay_type) except Exception: - print(f'tried {orig_assay_type}, canoncal version {canonical_assay_type}') - print(f'options are {list(entity_factory.type_client.iterAssayNames())}') + print(f"tried {orig_assay_type}, canoncal version {canonical_assay_type}") + print(f"options are {list(entity_factory.type_client.iterAssayNames())}") type_info = entity_factory.type_client.getAssayType(orig_assay_type) contains_human_genetic_sequences = type_info.contains_pii # Check consistency in case this is a Dataset, which will have this info - if 'contains_human_genetic_sequences' in source_entity.prop_dct: - assert (contains_human_genetic_sequences - == source_entity.prop_dct['contains_human_genetic_sequences']) - group_uuid = source_entity.prop_dct['group_uuid'] - if 'description' in row: - description = str(row['description']) - elif 'description' in source_entity.prop_dct: - description = source_entity.prop_dct['description'] + ' : ' + rec_identifier - elif 'lab_dataset_id' in source_entity.prop_dct: - description = source_entity.prop_dct['lab_dataset_id'] + ' : ' + rec_identifier + if "contains_human_genetic_sequences" in source_entity.prop_dct: + assert ( + contains_human_genetic_sequences + == source_entity.prop_dct["contains_human_genetic_sequences"] + ) + group_uuid = source_entity.prop_dct["group_uuid"] + if "description" in row: + description = str(row["description"]) + elif "description" in source_entity.prop_dct: + description = source_entity.prop_dct["description"] + " : " + rec_identifier + elif "lab_dataset_id" in source_entity.prop_dct: + description = source_entity.prop_dct["lab_dataset_id"] + " : " + rec_identifier else: - description = ': ' + rec_identifier - sample_id_list = row['tissue_id'] + description = ": " + rec_identifier + sample_id_list = row["tissue_id"] direct_ancestor_uuids = [] - for sample_id in sample_id_list.split(','): + for sample_id in sample_id_list.split(","): sample_id = sample_id.strip() sample_uuid = entity_factory.id_to_uuid(sample_id) print(f"including tissue_id {sample_id} ({sample_uuid})") @@ -156,7 +160,7 @@ def create_new_uuid(row, source_entity, entity_factory, dryrun=False): if FAKE_UUID_GENERATOR is None: FAKE_UUID_GENERATOR = create_fake_uuid_generator() uuid = FAKE_UUID_GENERATOR.__next__() - print(f'Not creating uuid {uuid} with assay_type {canonical_assay_type}') + print(f"Not creating uuid {uuid} with assay_type {canonical_assay_type}") return uuid else: rslt = entity_factory.create_dataset( @@ -165,82 +169,81 @@ def create_new_uuid(row, source_entity, entity_factory, dryrun=False): assay_type=canonical_assay_type, direct_ancestor_uuids=direct_ancestor_uuids, group_uuid=group_uuid, - description=description + description=description, ) - return rslt['uuid'] + return rslt["uuid"] def populate(row, source_entity, entity_factory, dryrun=False): """ Build the contents of the newly created dataset using info from the parent """ - uuid = row['new_uuid'] - old_data_path = row['data_path'] - row['data_path'] = '.' - old_contrib_path = Path(row['contributors_path']) - new_contrib_path = Path('extras') / old_contrib_path.name - row['contributors_path'] = str(new_contrib_path) - if 'antibodies_path' in row: - old_antibodies_path = Path(row['antibodies_path']) - new_antibodies_path = Path('extras') / old_antibodies_path.name - row['antibodies_path'] = str(new_antibodies_path) + uuid = row["new_uuid"] + old_data_path = row["data_path"] + row["data_path"] = "." + old_contrib_path = Path(row["contributors_path"]) + new_contrib_path = Path("extras") / old_contrib_path.name + row["contributors_path"] = str(new_contrib_path) + if "antibodies_path" in row: + old_antibodies_path = Path(row["antibodies_path"]) + new_antibodies_path = Path("extras") / old_antibodies_path.name + row["antibodies_path"] = str(new_antibodies_path) else: old_antibodies_path = None # row['assay_type'] = row['canonical_assay_type'] row_df = pd.DataFrame([row]) - row_df = row_df.drop(columns=['canonical_assay_type', 'new_uuid']) + row_df = row_df.drop(columns=["canonical_assay_type", "new_uuid"]) if dryrun: kid_path = Path(SCRATCH_PATH) / uuid kid_path.mkdir(0o770, parents=True, exist_ok=True) - print(f'writing this metadata to {kid_path}:') + print(f"writing this metadata to {kid_path}:") print(row_df) else: kid_path = Path(entity_factory.get_full_path(uuid)) - row_df.to_csv(kid_path / f'{uuid}-metadata.tsv', header=True, sep='\t', index=False) - extras_path = kid_path / 'extras' + row_df.to_csv(kid_path / f"{uuid}-metadata.tsv", header=True, sep="\t", index=False) + extras_path = kid_path / "extras" if extras_path.exists(): - assert extras_path.is_dir(), f'{extras_path} is not a directory' + assert extras_path.is_dir(), f"{extras_path} is not a directory" else: - source_extras_path = source_entity.full_path / 'extras' + source_extras_path = source_entity.full_path / "extras" if source_extras_path.exists(): if dryrun: - print(f'copy {source_extras_path} to {extras_path}') + print(f"copy {source_extras_path} to {extras_path}") else: copytree(source_extras_path, extras_path) else: if dryrun: - print(f'creating {extras_path}') + print(f"creating {extras_path}") extras_path.mkdir(0o770) source_data_path = source_entity.full_path / old_data_path - for elt in source_data_path.glob('*'): + for elt in source_data_path.glob("*"): dst_file = kid_path / elt.name if dryrun: if dst_file.exists() and dst_file.is_dir(): - for sub_elt in elt.glob('*'): - print(f'rename {sub_elt} to {kid_path / elt.name / sub_elt.name}') + for sub_elt in elt.glob("*"): + print(f"rename {sub_elt} to {kid_path / elt.name / sub_elt.name}") continue - print(f'rename {elt} to {dst_file}') + print(f"rename {elt} to {dst_file}") else: if dst_file.exists() and dst_file.is_dir(): - for sub_elt in elt.glob('*'): + for sub_elt in elt.glob("*"): sub_elt.rename(kid_path / elt.name / sub_elt.name) continue elt.rename(dst_file) if dryrun: - print(f'copy {old_contrib_path} to {extras_path}') + print(f"copy {old_contrib_path} to {extras_path}") else: copy2(source_entity.full_path / old_contrib_path, extras_path) if old_antibodies_path is not None: if dryrun: - print(f'copy {old_antibodies_path} to {extras_path}') + print(f"copy {old_antibodies_path} to {extras_path}") else: copy2(source_entity.full_path / old_antibodies_path, extras_path) print(f"{old_data_path} -> {uuid} -> full path: {kid_path}") def apply_special_case_transformations( - df: pd.DataFrame, - parent_assay_type: StrOrListStr + df: pd.DataFrame, parent_assay_type: StrOrListStr ) -> pd.DataFrame: """ Sometimes special case transformations must be applied, for example because the @@ -260,54 +263,52 @@ def update_upload_entity(child_uuid_list, source_entity, dryrun=False, verbose=F if isinstance(source_entity, Upload): if dryrun: print(f'set status of <{source_entity.uuid}> to "Reorganized"') - print(f'set <{source_entity.uuid}> dataset_uuids_to_link to {child_uuid_list}') + print(f"set <{source_entity.uuid}> dataset_uuids_to_link to {child_uuid_list}") else: # Set Upload status to "Reorganized" - # Set links from Upload to split Datasets" - entity_url = ENDPOINTS[source_entity.entity_factory.instance]['entity_url'] - data = { - "status": "Reorganized", - "dataset_uuids_to_link": child_uuid_list - } - endpoint = f'{entity_url}/entities/{source_entity.uuid}' - print(f'sending to {endpoint}:') - pprint(data) - r = requests.put(endpoint, - data=json.dumps(data), - headers={ - 'Authorization': f'Bearer {source_entity.entity_factory.auth_tok}', - 'Content-Type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline' - }) - if r.status_code >= 300: - r.raise_for_status() - if verbose: - print('response:') - pprint(r.json()) - else: - print(f'{source_entity.uuid} status is Reorganized') - + # Set links from Upload to split Datasets + print(f"Setting status of {source_entity.uuid} to 'Reorganized'") + StatusChanger( + source_entity.uuid, + source_entity.entity_factory.auth_tok, + Statuses.UPLOAD_REORGANIZED, + { + "extra_fields": {"dataset_uuids_to_link": child_uuid_list}, + "extra_options": {}, + }, + verbose=verbose, + ).on_status_change() + if not verbose: + print(f"{source_entity.uuid} status is Reorganized") + + # TODO: click in with UpdateAsana data = {"status": "Submitted"} for uuid in child_uuid_list: - endpoint = f'{entity_url}/entities/{uuid}' - print(f'sending to {endpoint}: {data}') - r = requests.put(endpoint, - data=json.dumps(data), - headers={ - 'Authorization': f'Bearer {source_entity.entity_factory.auth_tok}', - 'Content-Type': 'application/json', - 'X-Hubmap-Application': 'ingest-pipeline' - }) + endpoint = f"{entity_url}/entities/{uuid}" + print(f"sending to {endpoint}: {data}") + r = requests.put( + endpoint, + data=json.dumps(data), + headers={ + "Authorization": f"Bearer {source_entity.entity_factory.auth_tok}", + "Content-Type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + }, + ) if r.status_code >= 300: r.raise_for_status() if verbose: - print('response:') + print("response:") pprint(r.json()) else: - print(f'Reorganized new: {uuid} from Upload: {source_entity.uuid} status is Submitted') + print( + f"Reorganized new: {uuid} from Upload: {source_entity.uuid} status is Submitted" + ) else: - print(f'source entity <{source_entity.uuid}> is not an upload,' - ' so its status was not updated') + print( + f"source entity <{source_entity.uuid}> is not an upload," + " so its status was not updated" + ) def submit_uuid(uuid, entity_factory, dryrun=False): @@ -315,13 +316,13 @@ def submit_uuid(uuid, entity_factory, dryrun=False): Submit the given dataset, causing it to be ingested. """ if dryrun: - print(f'Not submitting uuid {uuid}.') + print(f"Not submitting uuid {uuid}.") return uuid else: uuid_entity_to_submit = entity_factory.get(uuid) rslt = entity_factory.submit_dataset( uuid=uuid, - contains_human_genetic_sequences=uuid_entity_to_submit.contains_human_genetic_sequences + contains_human_genetic_sequences=uuid_entity_to_submit.contains_human_genetic_sequences, ) return rslt @@ -341,67 +342,72 @@ def reorganize(source_uuid, **kwargs) -> None: must be formattable as frozen_df_fname.format(index_string) kwargs['verbose']: if present and True, increase verbosity of output """ - auth_tok = kwargs['auth_tok'] - mode = kwargs['mode'] - ingest = kwargs['ingest'] - dryrun = kwargs['dryrun'] - instance = kwargs['instance'] - frozen_df_fname = kwargs['frozen_df_fname'] - verbose = kwargs.get('verbose', False) + auth_tok = kwargs["auth_tok"] + mode = kwargs["mode"] + ingest = kwargs["ingest"] + dryrun = kwargs["dryrun"] + instance = kwargs["instance"] + frozen_df_fname = kwargs["frozen_df_fname"] + verbose = kwargs.get("verbose", False) dag_config = {} entity_factory = EntityFactory(auth_tok, instance=instance) - print(f'Decomposing {source_uuid}') + print(f"Decomposing {source_uuid}") source_entity = entity_factory.get(source_uuid) - if mode in ['all', 'stop']: - if hasattr(source_entity, 'data_types'): + if mode in ["all", "stop"]: + if hasattr(source_entity, "data_types"): assert isinstance(source_entity.data_types, str) source_data_types = source_entity.data_types else: source_data_types = None - source_metadata_files = list(source_entity.full_path.glob('*metadata.tsv')) + source_metadata_files = list(source_entity.full_path.glob("*metadata.tsv")) for src_idx, smf in enumerate(source_metadata_files): - source_df = pd.read_csv(smf, sep='\t') - source_df['canonical_assay_type'] = source_df.apply(get_canonical_assay_type, - axis=1, - entity_factory=entity_factory, - default_type=source_data_types) - source_df['new_uuid'] = source_df.apply(create_new_uuid, axis=1, - source_entity=source_entity, - entity_factory=entity_factory, - dryrun=dryrun) + source_df = pd.read_csv(smf, sep="\t") + source_df["canonical_assay_type"] = source_df.apply( + get_canonical_assay_type, + axis=1, + entity_factory=entity_factory, + default_type=source_data_types, + ) + source_df["new_uuid"] = source_df.apply( + create_new_uuid, + axis=1, + source_entity=source_entity, + entity_factory=entity_factory, + dryrun=dryrun, + ) source_df = apply_special_case_transformations(source_df, source_data_types) - print(source_df[['data_path', 'canonical_assay_type', 'new_uuid']]) - this_frozen_df_fname = frozen_df_fname.format('_' + str(src_idx)) - source_df.to_csv(this_frozen_df_fname, sep='\t', header=True, index=False) - print(f'wrote {this_frozen_df_fname}') + print(source_df[["data_path", "canonical_assay_type", "new_uuid"]]) + this_frozen_df_fname = frozen_df_fname.format("_" + str(src_idx)) + source_df.to_csv(this_frozen_df_fname, sep="\t", header=True, index=False) + print(f"wrote {this_frozen_df_fname}") - if mode == 'stop': + if mode == "stop": return - if mode in ['all', 'unstop']: - dag_config = {'uuid_list': [], 'collection_type': ''} + if mode in ["all", "unstop"]: + dag_config = {"uuid_list": [], "collection_type": ""} child_uuid_list = [] - source_metadata_files = list(source_entity.full_path.glob('*metadata.tsv')) + source_metadata_files = list(source_entity.full_path.glob("*metadata.tsv")) for src_idx, _ in enumerate(source_metadata_files): - this_frozen_df_fname = frozen_df_fname.format('_' + str(src_idx)) - source_df = pd.read_csv(this_frozen_df_fname, sep='\t') - print(f'read {this_frozen_df_fname}') + this_frozen_df_fname = frozen_df_fname.format("_" + str(src_idx)) + source_df = pd.read_csv(this_frozen_df_fname, sep="\t") + print(f"read {this_frozen_df_fname}") for _, row in source_df.iterrows(): - dag_config['uuid_list'].append(row['new_uuid']) - child_uuid_list.append(row['new_uuid']) + dag_config["uuid_list"].append(row["new_uuid"]) + child_uuid_list.append(row["new_uuid"]) populate(row, source_entity, entity_factory, dryrun=dryrun) update_upload_entity(child_uuid_list, source_entity, dryrun=dryrun, verbose=verbose) if ingest: - print('Beginning ingestion') + print("Beginning ingestion") for uuid in child_uuid_list: submit_uuid(uuid, entity_factory, dryrun) if not dryrun: - while entity_factory.get(uuid).status not in ['QA', 'Invalid', 'Error']: + while entity_factory.get(uuid).status not in ["QA", "Invalid", "Error"]: time.sleep(30) print(json.dumps(dag_config)) @@ -412,29 +418,33 @@ def main(): main """ parser = argparse.ArgumentParser() - simplified_frozen_df_fname = DEFAULT_FROZEN_DF_FNAME.format('') # no suffix - parser.add_argument("uuid", - help=("input .txt file containing uuids or" - " .csv or .tsv file with uuid column")) - parser.add_argument("--stop", - help=("stop after creating child uuids and writing" - f" {simplified_frozen_df_fname}"), - action="store_true") - parser.add_argument("--unstop", - help=("do not create child uuids;" - f" read {simplified_frozen_df_fname} and continue"), - action="store_true") - parser.add_argument("--instance", - help=("instance to use." - f" One of {list(ENDPOINTS)} (default %(default)s)"), - default='PROD') - parser.add_argument("--dryrun", - help=("describe the steps that would be taken but" - " do not make changes"), - action="store_true") - parser.add_argument("--ingest", - help="automatically ingest the generated datasets", - action="store_true") + simplified_frozen_df_fname = DEFAULT_FROZEN_DF_FNAME.format("") # no suffix + parser.add_argument( + "uuid", help=("input .txt file containing uuids or" " .csv or .tsv file with uuid column") + ) + parser.add_argument( + "--stop", + help=("stop after creating child uuids and writing" f" {simplified_frozen_df_fname}"), + action="store_true", + ) + parser.add_argument( + "--unstop", + help=("do not create child uuids;" f" read {simplified_frozen_df_fname} and continue"), + action="store_true", + ) + parser.add_argument( + "--instance", + help=("instance to use." f" One of {list(ENDPOINTS)} (default %(default)s)"), + default="PROD", + ) + parser.add_argument( + "--dryrun", + help=("describe the steps that would be taken but" " do not make changes"), + action="store_true", + ) + parser.add_argument( + "--ingest", help="automatically ingest the generated datasets", action="store_true" + ) args = parser.parse_args() @@ -454,11 +464,11 @@ def main(): dryrun = args.dryrun ingest = args.ingest if args.stop: - mode = 'stop' + mode = "stop" elif args.unstop: - mode = 'unstop' + mode = "unstop" else: - mode = 'all' + mode = "all" print( """ @@ -466,16 +476,18 @@ def main(): files around on PROD. Be very sure you know what it does before you run it! """ ) - auth_tok = input('auth_tok: ') - - reorganize(source_uuid, - auth_tok=auth_tok, - mode=mode, - ingest=ingest, - dryrun=dryrun, - instance=instance, - frozen_df_fname=DEFAULT_FROZEN_DF_FNAME) + auth_tok = input("auth_tok: ") + + reorganize( + source_uuid, + auth_tok=auth_tok, + mode=mode, + ingest=ingest, + dryrun=dryrun, + instance=instance, + frozen_df_fname=DEFAULT_FROZEN_DF_FNAME, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index 08fc7c69..80f78d30 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit 08fc7c6922c7f6a73b913e520b73055f0349bd73 +Subproject commit 80f78d30542d3b084bbd56f07be9c4eceff6e9d0