From e9514bc9e2bedc13bee09a9a00ff949093f15b32 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 22 Feb 2024 11:50:28 -0500 Subject: [PATCH 01/12] Adding IVT git hash to the dag_provenance_list field for primary datasets. --- .../airflow/dags/scan_and_begin_processing.py | 9 ++++++--- src/ingest-pipeline/airflow/dags/utils.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 83c0eaad..1da37446 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -46,6 +46,10 @@ def get_dataset_lz_path(**kwargs): return ctx["lz_path"] +def get_ivt_path(**kwargs): + return Path(kwargs["ti"].xcom_pull(task_ids="run_validation", key="ivt_path")) + + # Following are defaults which can be overridden later on default_args = { "owner": "hubmap", @@ -155,7 +159,7 @@ def run_validation(**kwargs): send_status_msg = make_send_status_msg_function( dag_file=__file__, retcode_ops=["run_validation", "run_md_extract", "md_consistency_tests"], - cwl_workflows=[], + ivt_path_fun=get_ivt_path, dataset_uuid_fun=get_dataset_uuid, dataset_lz_path_fun=get_dataset_lz_path, metadata_fun=read_metadata_file, @@ -268,8 +272,7 @@ def flex_maybe_spawn(**kwargs): "parent_submission_id": uuid, "metadata": md, "dag_provenance_list": utils.get_git_provenance_list( - [__file__, - kwargs["ti"].xcom_pull(task_ids="run_validation", key="ivt_path")] + [__file__, kwargs["ti"].xcom_pull(task_ids="run_validation", key="ivt_path")] ), } for next_dag in utils.downstream_workflow_iter(collectiontype, assay_type): diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 9f471e71..e07d652d 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1235,6 +1235,7 @@ def make_send_status_msg_function( metadata_fun: Optional[Callable[..., dict]] = None, include_file_metadata: Optional[bool] = True, no_provenance: Optional[bool] = False, + ivt_path_fun: Optional[Callable[..., list]] = None ) -> Callable[..., bool]: """ The function which is generated by this function will return a boolean, @@ -1316,7 +1317,7 @@ def my_callable(**kwargs): ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) if success: md = {} - files_for_provenance = [dag_file, *cwl_workflows] + files_for_provenance = [dag_file, *cwl_workflows, ivt_path_fun(**kwargs)] if no_provenance: md["dag_provenance_list"] = kwargs["dag_run"].conf["dag_provenance_list"].copy() From eb6700c6f518844c513c51e23f01f1affe3a747c Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 22 Feb 2024 12:09:43 -0500 Subject: [PATCH 02/12] Adding IVT git hash to the dag_provenance_list field for primary datasets. --- src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py | 3 ++- src/ingest-pipeline/airflow/dags/utils.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 1da37446..7684a8a4 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -159,11 +159,12 @@ def run_validation(**kwargs): send_status_msg = make_send_status_msg_function( dag_file=__file__, retcode_ops=["run_validation", "run_md_extract", "md_consistency_tests"], - ivt_path_fun=get_ivt_path, + cwl_workflows=[], dataset_uuid_fun=get_dataset_uuid, dataset_lz_path_fun=get_dataset_lz_path, metadata_fun=read_metadata_file, include_file_metadata=False, + ivt_path_fun=get_ivt_path, ) def wrapped_send_status_msg(**kwargs): diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index e07d652d..7f3d2015 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1235,7 +1235,7 @@ def make_send_status_msg_function( metadata_fun: Optional[Callable[..., dict]] = None, include_file_metadata: Optional[bool] = True, no_provenance: Optional[bool] = False, - ivt_path_fun: Optional[Callable[..., list]] = None + ivt_path_fun: Optional[Callable[..., Path]] = None ) -> Callable[..., bool]: """ The function which is generated by this function will return a boolean, From 843c27b9adb48ef7d5861be4994b61c5d646765b Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Mon, 26 Feb 2024 12:22:10 -0500 Subject: [PATCH 03/12] Bumping salmon-rnaseq to v2.1.19 --- src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq index fa4f9c13..6591870e 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq +++ b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq @@ -1 +1 @@ -Subproject commit fa4f9c13b96ef0c8168afd14bdd414d2d524a00c +Subproject commit 6591870e5784be2536a1a88d661dc3dbd7459367 From b0a59cd9997d3bc913dc631d15aa3c04c5283b35 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 14:15:22 -0500 Subject: [PATCH 04/12] Adding get_component_uuids function to obtain multi-assay components from a primary uuid. Adding new expandable DAG for metadata building for multi-assay components. Linting rebuild_multiple_metadata.py Revamp reorganize_multiassay.py to build the metadata for multi-assay components. Refactoring multiassay_metadatatsv_data_collection.py to support components metadata building. --- .../airflow/dags/extra_utils.py | 23 +- .../dags/multiassay_component_metadata.py | 224 ++++++++++++++++++ .../airflow/dags/rebuild_multiple_metadata.py | 87 ++++--- .../airflow/dags/reorganize_multiassay.py | 174 +++++++------- src/ingest-pipeline/airflow/dags/utils.py | 33 ++- .../multiassay_metadatatsv_data_collection.py | 24 +- src/ingest-pipeline/md/metadata_extract.py | 6 +- .../misc/tools/split_and_create.py | 4 +- 8 files changed, 433 insertions(+), 142 deletions(-) create mode 100644 src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py diff --git a/src/ingest-pipeline/airflow/dags/extra_utils.py b/src/ingest-pipeline/airflow/dags/extra_utils.py index f88f5e3a..4b29330d 100644 --- a/src/ingest-pipeline/airflow/dags/extra_utils.py +++ b/src/ingest-pipeline/airflow/dags/extra_utils.py @@ -34,6 +34,27 @@ def check_link_published_drvs(uuid: str, auth_tok: str) -> Tuple[bool, str]: return needs_previous_version, published_uuid +def get_component_uuids(uuid:str, auth_tok: str) -> List: + children = [] + endpoint = f"/children/{uuid}" + headers = { + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + "Authorization": f"Bearer {auth_tok}" + } + extra_options = {} + + http_hook = HttpHook("GET", http_conn_id="entity_api_connection") + + response = http_hook.run(endpoint, headers=headers, extra_options=extra_options) + print("response: ") + pprint(response.json()) + for data in response.json(): + if data.get("creation_action") == "Multi-Assay Split": + children.append(data.get("uuid")) + return children + + class SoftAssayClient: def __init__(self, metadata_files: List, auth_tok: str): self.assay_components = [] @@ -48,7 +69,7 @@ def __init__(self, metadata_files: List, auth_tok: str): assay_type = self.__get_assaytype_data(row=rows[0], auth_tok=auth_tok) data_component = { "assaytype": assay_type.get("assaytype"), - "dataset_type": assay_type.get("dataset-type"), + "dataset-type": assay_type.get("dataset-type"), "contains-pii": assay_type.get("contains-pii", True), "primary": assay_type.get("primary", False), "metadata-file": metadata_file, diff --git a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py new file mode 100644 index 00000000..30fac047 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py @@ -0,0 +1,224 @@ +import os +import yaml +import utils +from pprint import pprint + +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator +from airflow.exceptions import AirflowException +from airflow.configuration import conf as airflow_conf +from datetime import datetime, timedelta + +from utils import ( + HMDAG, + get_queue_resource, + get_preserve_scratch_resource, + create_dataset_state_error_callback, + pythonop_md_consistency_tests, + make_send_status_msg_function, + get_tmp_dir_path, + localized_assert_json_matches_schema as assert_json_matches_schema, + pythonop_get_dataset_state, + encrypt_tok, +) + + +def get_uuid_for_error(**kwargs): + """ + Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise. + """ + return None + + +def get_dataset_uuid(**kwargs): + return kwargs["dag_run"].conf["uuid"] + + +def get_dataset_lz_path(**kwargs): + ctx = kwargs["dag_run"].conf + return ctx["lz_path"] + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("rebuild_metadata"), + "on_failure_callback": create_dataset_state_error_callback(get_uuid_for_error), +} + +with HMDAG( + "rebuild_primary_dataset_metadata", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + }, +) as dag: + + def check_one_uuid(uuid, **kwargs): + """ + Look up information on the given uuid or HuBMAP identifier. + Returns: + - the uuid, translated from an identifier if necessary + - data type(s) of the dataset + - local directory full path of the dataset + """ + print(f"Starting uuid {uuid}") + my_callable = lambda **kwargs: uuid + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) + if not ds_rslt: + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") + pprint(ds_rslt) + + for key in ["status", "uuid", "local_directory_full_path", "metadata", "dataset_type"]: + assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" + + if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: + raise AirflowException(f"Dataset {uuid} is not QA or better") + + return ( + ds_rslt["uuid"], + ds_rslt["local_directory_full_path"], + ds_rslt["metadata"], + ds_rslt["dataset_type"], + ) + + def check_uuids(**kwargs): + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) + + try: + assert_json_matches_schema( + kwargs["dag_run"].conf, "launch_checksums_metadata_schema.yml" + ) + except AssertionError as e: + print("invalid metadata follows:") + pprint(kwargs["dag_run"].conf) + raise + + uuid, lz_path, metadata, dataset_type = check_one_uuid( + kwargs["dag_run"].conf["uuid"], **kwargs + ) + print(f"filtered metadata: {metadata}") + print(f"filtered paths: {lz_path}") + kwargs["dag_run"].conf["lz_path"] = lz_path + kwargs["dag_run"].conf["src_path"] = airflow_conf.as_dict()["connections"][ + "src_path" + ].strip("'") + kwargs["dag_run"].conf["dataset_type"] = dataset_type + kwargs["ti"].xcom_push(key="dataset_type", value=dataset_type) + + t_check_uuids = PythonOperator( + task_id="check_uuids", + python_callable=check_uuids, + provide_context=True, + op_kwargs={ + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + t_run_md_extract = BashOperator( + task_id="run_md_extract", + bash_command=""" \ + lz_dir="{{dag_run.conf.lz_path}}" ; \ + component_type="{{dag_run.conf.dataset_type}}" ; \ + src_dir="{{dag_run.conf.src_path}}/md" ; \ + top_dir="{{dag_run.conf.src_path}}" ; \ + work_dir="{{tmp_dir_path(run_id)}}" ; \ + cd $work_dir ; \ + env PYTHONPATH=${PYTHONPATH}:$top_dir \ + ${PYTHON_EXE} $src_dir/metadata_extract.py --out ./"$component_type"-rslt.yml --yaml "$lz_dir" \ + --component "$component_type" >> session.log 2> error.log ; \ + echo $? ; \ + if [ -s error.log ] ; \ + then echo 'ERROR!' `cat error.log` >> session.log ; \ + else rm error.log ; \ + fi + """, + env={ + "AUTH_TOK": ( + utils.get_auth_tok( + **{ + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode() + } + ) + ), + "PYTHON_EXE": os.environ["CONDA_PREFIX"] + "/bin/python", + "INGEST_API_URL": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"], + "COMPONENTS_ASSAY_TYPE": "1", + }, + ) + + def xcom_consistency_puller(**kwargs): + return kwargs["ti"].xcom_pull(task_ids="check_uuid", key="dataset_type") + + t_md_consistency_tests = PythonOperator( + task_id="md_consistency_tests", + python_callable=pythonop_md_consistency_tests, + provide_context=True, + op_kwargs={"metadata_fname": "rslt.yml", "component": xcom_consistency_puller}, + ) + + def read_metadata_file(**kwargs): + md_fname = os.path.join( + get_tmp_dir_path(kwargs["run_id"]), + kwargs["ti"].xcom_pull(task_ids="check_uuid", key="dataset_type") + "-rslt.yml", + ) + with open(md_fname, "r") as f: + scanned_md = yaml.safe_load(f) + return scanned_md + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=["run_md_extract", "md_consistency_tests"], + cwl_workflows=[], + dataset_uuid_fun=get_dataset_uuid, + dataset_lz_path_fun=get_dataset_lz_path, + metadata_fun=read_metadata_file, + include_file_metadata=False, + ) + + def wrapped_send_status_msg(**kwargs): + if send_status_msg(**kwargs): + scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read + kwargs["ti"].xcom_push( + key="collectiontype", + value=(scanned_md["collectiontype"] if "collectiontype" in scanned_md else None), + ) + if "assay_type" in scanned_md: + assay_type = scanned_md["assay_type"] + elif "metadata" in scanned_md and "assay_type" in scanned_md["metadata"]: + assay_type = scanned_md["metadata"]["assay_type"] + else: + assay_type = None + kwargs["ti"].xcom_push(key="assay_type", value=assay_type) + else: + kwargs["ti"].xcom_push(key="collectiontype", value=None) + + t_send_status = PythonOperator( + task_id="send_status_msg", + python_callable=wrapped_send_status_msg, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + t_check_uuids >> t_run_md_extract >> t_md_consistency_tests >> t_send_status diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index 9843b5a8..6134c3fa 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -24,91 +24,90 @@ def get_uuid_for_error(**kwargs): default_args = { - 'start_date': datetime(2019, 1, 1), + "start_date": datetime(2019, 1, 1), } -with DAG('rebuild_multiple_metadata', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('rebuild_metadata') - }) as dag: +with DAG( + "rebuild_multiple_metadata", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + }, +) as dag: def build_dataset_lists(**kwargs): - kwargs['dag_run'].conf['primary_datasets'] = [] - kwargs['dag_run'].conf['processed_datasets'] = [] + kwargs["dag_run"].conf["primary_datasets"] = [] + kwargs["dag_run"].conf["processed_datasets"] = [] - print('dag_run conf follows:') - pprint(kwargs['dag_run'].conf) - for uuid in kwargs['dag_run'].conf['uuids']: + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) + for uuid in kwargs["dag_run"].conf["uuids"]: soft_data = get_soft_data(uuid, **kwargs) # If we got nothing back from soft_data, then let's try to determine using entity_api if soft_data: - if soft_data.get('primary'): - kwargs['dag_run'].conf['primary_datasets'].append(uuid) + if soft_data.get("primary"): + kwargs["dag_run"].conf["primary_datasets"].append(uuid) else: - kwargs['dag_run'].conf['processed_datasets'].append(uuid) + kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: - print(f'No matching soft data returned for {uuid}') - ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: uuid, **kwargs) + print(f"No matching soft data returned for {uuid}") + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: uuid, **kwargs + ) if ds_rslt.get("dataset_info"): # dataset_info should only be populated for processed_datasets print(ds_rslt.get("dataset_info")) - kwargs['dag_run'].conf['processed_datasets'].append(uuid) + kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: - kwargs['dag_run'].conf['primary_datasets'].append(uuid) - - + kwargs["dag_run"].conf["primary_datasets"].append(uuid) t_build_dataset_lists = PythonOperator( - task_id='build_dataset_lists', + task_id="build_dataset_lists", python_callable=build_dataset_lists, provide_context=True, - queue= get_queue_resource('rebuild_metadata'), + queue=get_queue_resource("rebuild_metadata"), op_kwargs={ - 'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - } + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, ) def get_primary_dataset_uuids(**kwargs): - return [{'uuid': uuid} for uuid in kwargs['dag_run'].conf['primary_datasets']] + return [{"uuid": uuid} for uuid in kwargs["dag_run"].conf["primary_datasets"]] t_get_primary_dataset_uuids = PythonOperator( - task_id='get_primary_dataset_uuids', + task_id="get_primary_dataset_uuids", python_callable=get_primary_dataset_uuids, - queue=get_queue_resource('rebuild_metadata'), - provide_context=True + queue=get_queue_resource("rebuild_metadata"), + provide_context=True, ) def get_processed_dataset_uuids(**kwargs): - return [{'uuid': uuid} for uuid in kwargs['dag_run'].conf['processed_datasets']] + return [{"uuid": uuid} for uuid in kwargs["dag_run"].conf["processed_datasets"]] t_get_processed_dataset_uuids = PythonOperator( - task_id='get_processed_dataset_uuids', + task_id="get_processed_dataset_uuids", python_callable=get_processed_dataset_uuids, - queue=get_queue_resource('rebuild_metadata'), - provide_context=True + queue=get_queue_resource("rebuild_metadata"), + provide_context=True, ) t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator.partial( task_id="trigger_rebuild_primary_dataset_metadata", trigger_dag_id="rebuild_primary_dataset_metadata", - queue=get_queue_resource('rebuild_metadata'), - ).expand( - conf=t_get_primary_dataset_uuids.output - ) + queue=get_queue_resource("rebuild_metadata"), + ).expand(conf=t_get_primary_dataset_uuids.output) t_launch_rebuild_processed_dataset_metadata = TriggerDagRunOperator.partial( task_id="trigger_rebuild_processed_dataset_metadata", trigger_dag_id="rebuild_processed_dataset_metadata", - queue=get_queue_resource('rebuild_metadata'), - ).expand( - conf=t_get_processed_dataset_uuids.output - ) + queue=get_queue_resource("rebuild_metadata"), + ).expand(conf=t_get_processed_dataset_uuids.output) t_build_dataset_lists >> [t_get_primary_dataset_uuids, t_get_processed_dataset_uuids] t_get_primary_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata diff --git a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py index 5ff3ee98..5392be27 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py @@ -5,9 +5,11 @@ from airflow.operators.python import PythonOperator from airflow.operators.python import BranchPythonOperator from airflow.operators.bash import BashOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook + from hubmap_operators.common_operators import ( LogInfoOperator, JoinOperator, @@ -17,7 +19,8 @@ from utils import ( pythonop_maybe_keep, - get_tmp_dir_path, get_auth_tok, + get_tmp_dir_path, + get_auth_tok, pythonop_get_dataset_state, pythonop_set_dataset_state, find_matching_endpoint, @@ -26,21 +29,23 @@ get_preserve_scratch_resource, ) +from extra_utils import get_component_uuids + from misc.tools.split_and_create import reorganize_multiassay # Following are defaults which can be overridden later on default_args = { - 'owner': 'hubmap', - 'depends_on_past': False, - 'start_date': datetime(2019, 1, 1), - 'email': ['joel.welling@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - 'xcom_push': True, - 'queue': get_queue_resource('reorganize_upload'), + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("reorganize_upload"), } @@ -48,83 +53,75 @@ def _get_frozen_df_path(run_id): # This version of the path is passed to the internals of # split_and_create, and must contain formatting space for # a suffix. - return str(Path(get_tmp_dir_path(run_id)) / 'frozen_source_df{}.tsv') + return str(Path(get_tmp_dir_path(run_id)) / "frozen_source_df{}.tsv") def _get_frozen_df_wildcard(run_id): # This version of the path is used from a bash command line # and must match all frozen_df files regardless of suffix. - return str(Path(get_tmp_dir_path(run_id)) / 'frozen_source_df*.tsv') - - -with HMDAG('reorganize_multiassay', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': get_tmp_dir_path, - 'frozen_df_path': _get_frozen_df_path, - 'frozen_df_wildcard': _get_frozen_df_wildcard, - 'preserve_scratch': get_preserve_scratch_resource('reorganize_multiassay'), - }) as dag: - def find_uuid(**kwargs): - uuid = kwargs['dag_run'].conf['uuid'] + return str(Path(get_tmp_dir_path(run_id)) / "frozen_source_df*.tsv") + + +with HMDAG( + "reorganize_multiassay", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "frozen_df_path": _get_frozen_df_path, + "frozen_df_wildcard": _get_frozen_df_wildcard, + "preserve_scratch": get_preserve_scratch_resource("reorganize_multiassay"), + }, +) as dag: - def my_callable(**kwargs): - return uuid + def find_uuid(**kwargs): + uuid = kwargs["dag_run"].conf["uuid"] - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=my_callable, - **kwargs - ) + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: uuid, **kwargs) if not ds_rslt: - raise AirflowException(f'Invalid uuid/doi for group: {uuid}') - print('ds_rslt:') + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") pprint(ds_rslt) - for key in ['entity_type', 'status', 'uuid', 'dataset_type', - 'local_directory_full_path']: + for key in ["entity_type", "status", "uuid", "dataset_type", "local_directory_full_path"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if ds_rslt['entity_type'] != 'Dataset': - raise AirflowException(f'{uuid} is not an Dataset') - if ds_rslt['status'] not in ['New', 'Submitted', 'Error']: - raise AirflowException(f"status of Dataset {uuid} is not New, Error or Submitted, {ds_rslt['status']}") - - lz_path = ds_rslt['local_directory_full_path'] - uuid = ds_rslt['uuid'] # 'uuid' may actually be a DOI - print(f'Finished uuid {uuid}') - print(f'lz path: {lz_path}') - kwargs['ti'].xcom_push(key='lz_path', value=lz_path) - kwargs['ti'].xcom_push(key='uuid', value=uuid) + if ds_rslt["entity_type"] != "Dataset": + raise AirflowException(f"{uuid} is not an Dataset") + if ds_rslt["status"] not in ["New", "Submitted", "Error"]: + raise AirflowException( + f"status of Dataset {uuid} is not New, Error or Submitted, {ds_rslt['status']}" + ) + lz_path = ds_rslt["local_directory_full_path"] + uuid = ds_rslt["uuid"] # 'uuid' may actually be a DOI + print(f"Finished uuid {uuid}") + print(f"lz path: {lz_path}") + kwargs["ti"].xcom_push(key="lz_path", value=lz_path) + kwargs["ti"].xcom_push(key="uuid", value=uuid) t_find_uuid = PythonOperator( - task_id='find_uuid', - python_callable=find_uuid, - provide_context=True, - op_kwargs={ - } + task_id="find_uuid", python_callable=find_uuid, provide_context=True, op_kwargs={} ) - t_create_tmpdir = CreateTmpDirOperator(task_id='create_tmpdir') - t_cleanup_tmpdir = CleanupTmpDirOperator(task_id='cleanup_tmpdir') + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") t_preserve_info = BashOperator( - task_id='preserve_info', + task_id="preserve_info", bash_command=""" frozen_df_wildcard="{{frozen_df_wildcard(run_id)}}" ; \ upload_path="{{ti.xcom_pull(task_ids="find_uuid", key="lz_path")}}" ; \ if ls $frozen_df_wildcard > /dev/null 2>&1 ; then \ cp ${frozen_df_wildcard} "${upload_path}" ; \ fi - """ + """, ) - def split(**kwargs): - uuid = kwargs['ti'].xcom_pull(task_ids='find_uuid', key='uuid') - entity_host = HttpHook.get_connection('entity_api_connection').host + uuid = kwargs["ti"].xcom_pull(task_ids="find_uuid", key="uuid") + entity_host = HttpHook.get_connection("entity_api_connection").host try: reorganize_multiassay( uuid, @@ -133,47 +130,53 @@ def split(**kwargs): instance=find_matching_endpoint(entity_host), auth_tok=get_auth_tok(**kwargs), ) - kwargs['ti'].xcom_push(key='split', value='0') # signal success + kwargs["ti"].xcom_push(key="split", value="0") # signal success except Exception as e: - print(f'Encountered {e}') - kwargs['ti'].xcom_push(key='split', value='1') # signal failure - + print(f"Encountered {e}") + kwargs["ti"].xcom_push(key="split", value="1") # signal failure t_split = PythonOperator( - task_id='split', - python_callable=split, - provide_context=True, - op_kwargs={ - } + task_id="split", python_callable=split, provide_context=True, op_kwargs={} ) t_maybe_keep = BranchPythonOperator( - task_id='maybe_keep', + task_id="maybe_keep", python_callable=pythonop_maybe_keep, provide_context=True, - op_kwargs={ - 'next_op': 'join', - 'bail_op': 'set_dataset_error', - 'test_op': 'split' - } + op_kwargs={"next_op": "join", "bail_op": "set_dataset_error", "test_op": "split"}, + ) + + def get_component_dataset_uuids(**kwargs): + return [{"uuid": uuid for uuid in get_component_uuids(kwargs["ti"].xcom_pull(task_ids="find_uuid", key="uuid"), + get_auth_tok(**kwargs))}] + + + t_get_components_uuids = PythonOperator( + task_id="get_primary_dataset_uuids", + python_callable=get_component_dataset_uuids, + queue=get_queue_resource("rebuild_metadata"), + provide_context=True, ) - t_log_info = LogInfoOperator(task_id='log_info') + t_launch_multiassay_component_metadata = TriggerDagRunOperator.partial( + task_id="trigger_multiassay_component_metadata", + trigger_dag_id="multiassay_component_metadata", + queue=get_queue_resource("rebuild_metadata"), + ).expand(conf=t_get_components_uuids.output) - t_join = JoinOperator(task_id='join') + t_log_info = LogInfoOperator(task_id="log_info") + + t_join = JoinOperator(task_id="join") def _get_upload_uuid(**kwargs): - return kwargs['ti'].xcom_pull(task_ids='find_uuid', key='uuid') + return kwargs["ti"].xcom_pull(task_ids="find_uuid", key="uuid") t_set_dataset_error = PythonOperator( - task_id='set_dataset_error', + task_id="set_dataset_error", python_callable=pythonop_set_dataset_state, provide_context=True, - trigger_rule='all_done', - op_kwargs={ - 'dataset_uuid_callable': _get_upload_uuid, - 'ds_state': 'Error' - } + trigger_rule="all_done", + op_kwargs={"dataset_uuid_callable": _get_upload_uuid, "ds_state": "Error"}, ) ( @@ -182,10 +185,11 @@ def _get_upload_uuid(**kwargs): >> t_create_tmpdir >> t_split >> t_maybe_keep + >> t_launch_multiassay_component_metadata >> t_join >> t_preserve_info >> t_cleanup_tmpdir - ) + ) t_maybe_keep >> t_set_dataset_error t_set_dataset_error >> t_join diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 7f3d2015..dccf3297 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1089,6 +1089,31 @@ def pythonop_md_consistency_tests(**kwargs) -> int: Perform simple consistency checks of the metadata stored as YAML in kwargs['metadata_fname']. This includes accessing the UUID api via its Airflow connection ID to verify uuids. """ + if "component" in kwargs: + md_path = join( + get_tmp_dir_path(kwargs["run_id"]), + kwargs["component"](**kwargs) + "-" + kwargs["metadata_fname"], + ) + if exists(md_path): + with open(md_path, "r") as f: + md = yaml.safe_load(f) + # print('metadata from {} follows:'.format(md_path)) + # pprint(md) + if "_from_metadatatsv" in md and md["_from_metadatatsv"]: + try: + for elt in ["tissue_id", "donor_id"]: + assert elt in md, "metadata is missing {}".format(elt) + assert md["tissue_id"].startswith( + md["donor_id"] + "-" + ), "tissue_id does not match" + assert_id_known(md["tissue_id"], **kwargs) + return 0 + except AssertionError as e: + kwargs["ti"].xcom_push(key="err_msg", value="Assertion Failed: {}".format(e)) + return 1 + else: + kwargs["ti"].xcom_push(key="err_msg", value="Expected metadata file is missing") + return 1 if "uuid_list" in kwargs: for uuid in kwargs["uuid_list"](**kwargs): md_path = join( @@ -1235,7 +1260,7 @@ def make_send_status_msg_function( metadata_fun: Optional[Callable[..., dict]] = None, include_file_metadata: Optional[bool] = True, no_provenance: Optional[bool] = False, - ivt_path_fun: Optional[Callable[..., Path]] = None + ivt_path_fun: Optional[Callable[..., Path]] = None, ) -> Callable[..., bool]: """ The function which is generated by this function will return a boolean, @@ -1383,11 +1408,7 @@ def my_callable(**kwargs): else: status = ds_rslt.get("status", "QA") if status in ["Processing", "New", "Invalid"]: - status = ( - "QA" - if kwargs["dag_run"].conf["dag_id"] == "scan_and_begin_processing" - else "Submitted" - ) + status = "QA" if metadata_fun: if not contacts: contacts = ds_rslt.get("contacts", []) diff --git a/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py b/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py index 7143557d..05893f3a 100755 --- a/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py +++ b/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py @@ -60,8 +60,9 @@ def __init__(self, path): self.offsetdir = self.find_top(self.topdir, self.top_target, self.dir_regex) assert self.offsetdir is not None, "Wrong dataset type?" - def collect_metadata(self): + def collect_metadata(self, component=None): ingest_api_url = os.getenv("INGEST_API_URL") + component_process = os.getenv("COMPONENTS_ASSAY_TYPE") md_type_tbl = self.get_md_type_tbl() rslt = {} cl = [] @@ -100,7 +101,7 @@ def collect_metadata(self): if "metadata" in fname and fname.endswith(".tsv"): assert isinstance(this_md, list), "metadata.tsv did not produce a list" - if "must-contain" in response: + if "must-contain" in response and component_process is None: print("MULTI ASSAY FOUND") for rec in this_md: this_dict = {"metadata": rec} @@ -120,6 +121,25 @@ def collect_metadata(self): this_dict[dict_key] = sub_md cl.append(this_dict) print(this_dict) + elif component_process is not None and component == response.get("dataset-type"): + for rec in this_md: + this_dict = {"metadata": rec} + for sub_key, dict_key in [ + ("contributors_path", "contributors"), + ("antibodies_path", "antibodies"), + ]: + if sub_key in rec: + assert rec[sub_key].endswith( + ".tsv" + ), 'TSV file expected, received "{}"'.format(rec[sub_key]) + sub_path = os.path.join( + os.path.dirname(fpath), rec[sub_key] + ) + sub_parser = md_type_tbl["TSV"](sub_path) + sub_md = sub_parser.collect_metadata() + this_dict[dict_key] = sub_md + cl.append(this_dict) + print(this_dict) else: print("NON MULTI ASSAY FOUND") print(this_md) diff --git a/src/ingest-pipeline/md/metadata_extract.py b/src/ingest-pipeline/md/metadata_extract.py index 959c7c3b..3dd6eb14 100755 --- a/src/ingest-pipeline/md/metadata_extract.py +++ b/src/ingest-pipeline/md/metadata_extract.py @@ -20,7 +20,7 @@ set_schema_base_path(SCHEMA_BASE_PATH, SCHEMA_BASE_URI) -def scan(target_dir, out_fname, schema_fname, yaml_flag=False): +def scan(target_dir, out_fname, schema_fname, yaml_flag=False, component=None): global _KNOWN_DATA_COLLECTION_TYPES if _KNOWN_DATA_COLLECTION_TYPES is None: @@ -68,15 +68,17 @@ def main(myargv=None): parser.add_argument('dir', default=None, nargs='?', help='directory to scan (defaults to CWD)') parser.add_argument('--yaml', default=False, action='store_true') + parser.add_argument('--component', default=None, action='store_true') ns = parser.parse_args(myargv[1:]) schema_fname = default_schema_path if ns.schema is None else ns.schema out_fname = ns.out target_dir = (os.getcwd() if ns.dir is None else ns.dir) yaml_flag = ns.yaml + component = ns.component try: scan(target_dir=target_dir, out_fname=out_fname, schema_fname=schema_fname, - yaml_flag=yaml_flag) + yaml_flag=yaml_flag, component=component) except (MetadataError, AssertionError) as e: sys.exit(f'{type(e).__name__}: {e}') diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index c51d8411..29215da7 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -391,7 +391,7 @@ def reorganize(source_uuid, **kwargs) -> Union[Tuple, None]: source_df["canonical_assay_type"] = source_df.apply( get_canonical_assay_type, axis=1, - dataset_type=full_entity.primary_assay.get("dataset_type"), + dataset_type=full_entity.primary_assay.get("dataset-type"), ) source_df["new_uuid"] = source_df.apply( create_new_uuid, @@ -475,7 +475,7 @@ def create_multiassay_component( { "dataset_link_abs_dir": parent_dir, "contains_human_genetic_sequences": component.get("contains-pii"), - "dataset_type": component.get("dataset_type"), + "dataset_type": component.get("dataset-type"), } for component in components ], From 8127c0c09f77fd48c729a640c11d2f1720374e9e Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 15:32:59 -0500 Subject: [PATCH 05/12] Bugfix moving to standard DAG. Updating correct task ordering and naming. --- .../airflow/dags/reorganize_multiassay.py | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py index 5392be27..b852e5aa 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py @@ -8,6 +8,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook +from airflow import DAG from hubmap_operators.common_operators import ( @@ -24,7 +25,6 @@ pythonop_get_dataset_state, pythonop_set_dataset_state, find_matching_endpoint, - HMDAG, get_queue_resource, get_preserve_scratch_resource, ) @@ -36,16 +36,7 @@ # Following are defaults which can be overridden later on default_args = { - "owner": "hubmap", - "depends_on_past": False, "start_date": datetime(2019, 1, 1), - "email": ["joel.welling@gmail.com"], - "email_on_failure": False, - "email_on_retry": False, - "retries": 1, - "retry_delay": timedelta(minutes=1), - "xcom_push": True, - "queue": get_queue_resource("reorganize_upload"), } @@ -62,7 +53,7 @@ def _get_frozen_df_wildcard(run_id): return str(Path(get_tmp_dir_path(run_id)) / "frozen_source_df*.tsv") -with HMDAG( +with DAG( "reorganize_multiassay", schedule_interval=None, is_paused_upon_creation=False, @@ -143,16 +134,16 @@ def split(**kwargs): task_id="maybe_keep", python_callable=pythonop_maybe_keep, provide_context=True, - op_kwargs={"next_op": "join", "bail_op": "set_dataset_error", "test_op": "split"}, + op_kwargs={"next_op": "get_component_uuids", "bail_op": "set_dataset_error", "test_op": "split"}, ) def get_component_dataset_uuids(**kwargs): - return [{"uuid": uuid for uuid in get_component_uuids(kwargs["ti"].xcom_pull(task_ids="find_uuid", key="uuid"), - get_auth_tok(**kwargs))}] + return [{"uuid": uuid} for uuid in get_component_uuids(kwargs["ti"].xcom_pull(task_ids="find_uuid", key="uuid"), + get_auth_tok(**kwargs))] - t_get_components_uuids = PythonOperator( - task_id="get_primary_dataset_uuids", + t_get_component_uuids = PythonOperator( + task_id="get_component_uuids", python_callable=get_component_dataset_uuids, queue=get_queue_resource("rebuild_metadata"), provide_context=True, @@ -162,7 +153,7 @@ def get_component_dataset_uuids(**kwargs): task_id="trigger_multiassay_component_metadata", trigger_dag_id="multiassay_component_metadata", queue=get_queue_resource("rebuild_metadata"), - ).expand(conf=t_get_components_uuids.output) + ).expand(conf=t_get_component_uuids.output) t_log_info = LogInfoOperator(task_id="log_info") @@ -185,11 +176,12 @@ def _get_upload_uuid(**kwargs): >> t_create_tmpdir >> t_split >> t_maybe_keep - >> t_launch_multiassay_component_metadata + >> t_get_component_uuids >> t_launch_multiassay_component_metadata >> t_join >> t_preserve_info >> t_cleanup_tmpdir ) t_maybe_keep >> t_set_dataset_error + t_launch_multiassay_component_metadata >> t_set_dataset_error t_set_dataset_error >> t_join From 7240de17a1e8ceb59623008b0f640823eb5dad3c Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 15:51:20 -0500 Subject: [PATCH 06/12] Rollback moving to standard DAG. Updating correct dag_id name for new DAG. --- .../airflow/dags/multiassay_component_metadata.py | 2 +- .../airflow/dags/reorganize_multiassay.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py index 30fac047..c7f22d1e 100644 --- a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py +++ b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py @@ -54,7 +54,7 @@ def get_dataset_lz_path(**kwargs): } with HMDAG( - "rebuild_primary_dataset_metadata", + "multiassay-component-metadata", schedule_interval=None, is_paused_upon_creation=False, default_args=default_args, diff --git a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py index b852e5aa..e9b81eea 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py @@ -8,7 +8,6 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.exceptions import AirflowException from airflow.providers.http.hooks.http import HttpHook -from airflow import DAG from hubmap_operators.common_operators import ( @@ -25,6 +24,7 @@ pythonop_get_dataset_state, pythonop_set_dataset_state, find_matching_endpoint, + HMDAG, get_queue_resource, get_preserve_scratch_resource, ) @@ -36,7 +36,16 @@ # Following are defaults which can be overridden later on default_args = { + "owner": "hubmap", + "depends_on_past": False, "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("reorganize_upload"), } @@ -53,7 +62,7 @@ def _get_frozen_df_wildcard(run_id): return str(Path(get_tmp_dir_path(run_id)) / "frozen_source_df*.tsv") -with DAG( +with HMDAG( "reorganize_multiassay", schedule_interval=None, is_paused_upon_creation=False, From e9e5d0108ae1e9536ea6e3818c32c3bf78d7c85a Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 16:12:41 -0500 Subject: [PATCH 07/12] Bugfix missing tmp_directories. TyPo dag_id. TyPo task_ids to get component name. --- .../dags/multiassay_component_metadata.py | 17 +++++++++++++---- .../airflow/dags/reorganize_multiassay.py | 1 - 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py index c7f22d1e..616f2378 100644 --- a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py +++ b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py @@ -22,6 +22,11 @@ encrypt_tok, ) +from hubmap_operators.common_operators import ( + CreateTmpDirOperator, + CleanupTmpDirOperator, +) + def get_uuid_for_error(**kwargs): """ @@ -54,7 +59,7 @@ def get_dataset_lz_path(**kwargs): } with HMDAG( - "multiassay-component-metadata", + "multiassay_component_metadata", schedule_interval=None, is_paused_upon_creation=False, default_args=default_args, @@ -64,6 +69,8 @@ def get_dataset_lz_path(**kwargs): }, ) as dag: + t_create_tmpdir = CreateTmpDirOperator(task_id="create_temp_dir") + def check_one_uuid(uuid, **kwargs): """ Look up information on the given uuid or HuBMAP identifier. @@ -164,7 +171,7 @@ def check_uuids(**kwargs): ) def xcom_consistency_puller(**kwargs): - return kwargs["ti"].xcom_pull(task_ids="check_uuid", key="dataset_type") + return kwargs["ti"].xcom_pull(task_ids="check_uuids", key="dataset_type") t_md_consistency_tests = PythonOperator( task_id="md_consistency_tests", @@ -176,7 +183,7 @@ def xcom_consistency_puller(**kwargs): def read_metadata_file(**kwargs): md_fname = os.path.join( get_tmp_dir_path(kwargs["run_id"]), - kwargs["ti"].xcom_pull(task_ids="check_uuid", key="dataset_type") + "-rslt.yml", + kwargs["ti"].xcom_pull(task_ids="check_uuids", key="dataset_type") + "-rslt.yml", ) with open(md_fname, "r") as f: scanned_md = yaml.safe_load(f) @@ -221,4 +228,6 @@ def wrapped_send_status_msg(**kwargs): }, ) - t_check_uuids >> t_run_md_extract >> t_md_consistency_tests >> t_send_status + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir") + + t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir diff --git a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py index e9b81eea..038a58a9 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py @@ -44,7 +44,6 @@ "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=1), - "xcom_push": True, "queue": get_queue_resource("reorganize_upload"), } From acfed33b505acf5dbedba8d59aa8d04a32070607 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 16:33:38 -0500 Subject: [PATCH 08/12] Moving env variable getting to outside main script. Updating base class to support new env var and component var to multi-assay component metadata building. Linting. --- src/ingest-pipeline/md/data_collection.py | 2 +- .../multiassay_metadatatsv_data_collection.py | 3 +- src/ingest-pipeline/md/metadata_extract.py | 65 ++++++++++++------- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/ingest-pipeline/md/data_collection.py b/src/ingest-pipeline/md/data_collection.py index 318f60cf..a566820f 100755 --- a/src/ingest-pipeline/md/data_collection.py +++ b/src/ingest-pipeline/md/data_collection.py @@ -43,7 +43,7 @@ def __str__(self): def __repr__(self): return "<%s(%s)>" % (type(self).__name__, self.topdir) - def collect_metadata(self): + def collect_metadata(self, component=None, component_process=None): return {} def filter_metadata(self, metadata): diff --git a/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py b/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py index 05893f3a..c774a7ba 100755 --- a/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py +++ b/src/ingest-pipeline/md/data_collection_types/multiassay_metadatatsv_data_collection.py @@ -60,9 +60,8 @@ def __init__(self, path): self.offsetdir = self.find_top(self.topdir, self.top_target, self.dir_regex) assert self.offsetdir is not None, "Wrong dataset type?" - def collect_metadata(self, component=None): + def collect_metadata(self, component=None, component_process=None): ingest_api_url = os.getenv("INGEST_API_URL") - component_process = os.getenv("COMPONENTS_ASSAY_TYPE") md_type_tbl = self.get_md_type_tbl() rslt = {} cl = [] diff --git a/src/ingest-pipeline/md/metadata_extract.py b/src/ingest-pipeline/md/metadata_extract.py index 3dd6eb14..c6d91887 100755 --- a/src/ingest-pipeline/md/metadata_extract.py +++ b/src/ingest-pipeline/md/metadata_extract.py @@ -13,10 +13,11 @@ _KNOWN_DATA_COLLECTION_TYPES = None -DEFAULT_SCHEMA = 'datacollection_metadata_schema.yml' -SCHEMA_BASE_PATH = os.path.join(os.path.dirname(os.path.realpath(os.path.dirname(__file__))), - 'schemata') -SCHEMA_BASE_URI = 'http://schemata.hubmapconsortium.org/' +DEFAULT_SCHEMA = "datacollection_metadata_schema.yml" +SCHEMA_BASE_PATH = os.path.join( + os.path.dirname(os.path.realpath(os.path.dirname(__file__))), "schemata" +) +SCHEMA_BASE_URI = "http://schemata.hubmapconsortium.org/" set_schema_base_path(SCHEMA_BASE_PATH, SCHEMA_BASE_URI) @@ -35,23 +36,23 @@ def scan(target_dir, out_fname, schema_fname, yaml_flag=False, component=None): for collection_type in _KNOWN_DATA_COLLECTION_TYPES: if collection_type.test_match(target_dir): - print('collector match: ', collection_type.category_name) + print("collector match: ", collection_type.category_name) collector = collection_type(target_dir) - metadata = collector.filter_metadata(collector.collect_metadata()) + component_process = os.getenv("COMPONENTS_ASSAY_TYPE") + metadata = collector.filter_metadata(collector.collect_metadata(component, component_process)) # print('collector: ', repr(collector)) # print('metadata: %s' % metadata) break else: - raise MetadataError('%s does not match any known data collection type' - % target_dir) + raise MetadataError("%s does not match any known data collection type" % target_dir) assert_json_matches_schema(metadata, schema_fname) if yaml_flag: - with sys.stdout if out_fname is None else open(out_fname, 'w') as f: + with sys.stdout if out_fname is None else open(out_fname, "w") as f: yaml.dump(metadata, f) else: - with sys.stdout if out_fname is None else open(out_fname, 'w') as f: + with sys.stdout if out_fname is None else open(out_fname, "w") as f: json.dump(metadata, f) - + def main(myargv=None): if myargv is None: @@ -60,28 +61,42 @@ def main(myargv=None): # default_schema_path = os.path.join(os.path.dirname(__file__), '../schemata/', DEFAULT_SCHEMA) default_schema_path = DEFAULT_SCHEMA # trust the schema tools to know where to look - parser = argparse.ArgumentParser(description='Scan a directory tree of datafiles and extract metadata') - parser.add_argument('--out', default=None, - help='Full pathname of output JSON (defaults to stdout)') - parser.add_argument('--schema', default=None, nargs=1, - help=('Schema against which the output will be checked (default %s)' % default_schema_path)) - parser.add_argument('dir', default=None, nargs='?', - help='directory to scan (defaults to CWD)') - parser.add_argument('--yaml', default=False, action='store_true') - parser.add_argument('--component', default=None, action='store_true') + parser = argparse.ArgumentParser( + description="Scan a directory tree of datafiles and extract metadata" + ) + parser.add_argument( + "--out", default=None, help="Full pathname of output JSON (defaults to stdout)" + ) + parser.add_argument( + "--schema", default=None, nargs=1, + help=( + "Schema against which the output will be checked (default %s)" % default_schema_path + ), + ) + parser.add_argument("dir", default=None, nargs="?", + help="directory to scan (defaults to CWD)") + parser.add_argument("--yaml", default=False, action="store_true") + parser.add_argument("--component", default=None, + help="Component name to match when building Multi-Assay component metadata", + ) ns = parser.parse_args(myargv[1:]) schema_fname = default_schema_path if ns.schema is None else ns.schema out_fname = ns.out - target_dir = (os.getcwd() if ns.dir is None else ns.dir) + target_dir = os.getcwd() if ns.dir is None else ns.dir yaml_flag = ns.yaml component = ns.component try: - scan(target_dir=target_dir, out_fname=out_fname, schema_fname=schema_fname, - yaml_flag=yaml_flag, component=component) + scan( + target_dir=target_dir, + out_fname=out_fname, + schema_fname=schema_fname, + yaml_flag=yaml_flag, + component=component, + ) except (MetadataError, AssertionError) as e: - sys.exit(f'{type(e).__name__}: {e}') + sys.exit(f"{type(e).__name__}: {e}") -if __name__ == '__main__': +if __name__ == "__main__": main() From 33d8b01e56b6a74032274e2e3fd35f465dcc2906 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 27 Feb 2024 16:51:13 -0500 Subject: [PATCH 09/12] Adding missing return value. Updated logic on recent change to add IVT git hash for when there is no use for that. --- src/ingest-pipeline/airflow/dags/utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index dccf3297..084c3f86 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1111,6 +1111,8 @@ def pythonop_md_consistency_tests(**kwargs) -> int: except AssertionError as e: kwargs["ti"].xcom_push(key="err_msg", value="Assertion Failed: {}".format(e)) return 1 + else: + return 0 else: kwargs["ti"].xcom_push(key="err_msg", value="Expected metadata file is missing") return 1 @@ -1336,14 +1338,12 @@ def send_status_msg(**kwargs) -> bool: status = None extra_fields = {} - def my_callable(**kwargs): - return dataset_uuid - - ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs) if success: md = {} - files_for_provenance = [dag_file, *cwl_workflows, ivt_path_fun(**kwargs)] - + files_for_provenance = [dag_file, *cwl_workflows,] + if ivt_path_fun: + files_for_provenance.append(ivt_path_fun(**kwargs)) if no_provenance: md["dag_provenance_list"] = kwargs["dag_run"].conf["dag_provenance_list"].copy() elif "dag_provenance" in kwargs["dag_run"].conf: From 8db5df6d4fc03a6b27b4fd72b438e6bde5d648ca Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 28 Feb 2024 14:03:26 -0500 Subject: [PATCH 10/12] Bump IVT with bugfixes for Visium (with probes) Bugfix bad routing of steps in reorganize_multiassay --- src/ingest-pipeline/submodules/ingest-validation-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index b726cb3f..6748dd7b 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit b726cb3fdf529cdf69039ec5aebc815780de77d9 +Subproject commit 6748dd7b76f8d522d3657488f1fe36be18d0ebbc From f5211d8af09714e53b11fe092f552d0566d4cac5 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 28 Feb 2024 14:04:41 -0500 Subject: [PATCH 11/12] Bugfix bad routing on error step. --- src/ingest-pipeline/airflow/dags/reorganize_multiassay.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py index 038a58a9..757df0cd 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_multiassay.py @@ -191,5 +191,4 @@ def _get_upload_uuid(**kwargs): ) t_maybe_keep >> t_set_dataset_error - t_launch_multiassay_component_metadata >> t_set_dataset_error t_set_dataset_error >> t_join From d7080aad2b80fe6c92d9b4b51f2932dee031b96d Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 28 Feb 2024 16:25:51 -0500 Subject: [PATCH 12/12] Bugfix correct status update DAG dependant. --- src/ingest-pipeline/airflow/dags/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 084c3f86..88034b5b 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1408,7 +1408,8 @@ def send_status_msg(**kwargs) -> bool: else: status = ds_rslt.get("status", "QA") if status in ["Processing", "New", "Invalid"]: - status = "QA" + status = "Submitted" if kwargs["dag"].dag_id in ["multiassay_component_metadata", + "reorganize_upload"] else "QA" if metadata_fun: if not contacts: contacts = ds_rslt.get("contacts", [])