From 91a0cc3359fa884c4c53bcb214c60f5094993418 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 5 Mar 2024 16:13:00 -0500 Subject: [PATCH 1/6] General: Bump IVT to latest main --- src/ingest-pipeline/submodules/ingest-validation-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index 6748dd7b..c7b2ae7b 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit 6748dd7b76f8d522d3657488f1fe36be18d0ebbc +Subproject commit c7b2ae7b93238a00c3918e950920f357d3329add From f09de1ec775f2e264eaf07a4b49267fe5f42ba6a Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 5 Mar 2024 16:14:03 -0500 Subject: [PATCH 2/6] DAGs: Update rebuild_multiple_metadata DAG to account for component datasets separately from regular primary datasets. --- .../airflow/dags/rebuild_multiple_metadata.py | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index 6134c3fa..047fe2e2 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -46,18 +46,21 @@ def build_dataset_lists(**kwargs): pprint(kwargs["dag_run"].conf) for uuid in kwargs["dag_run"].conf["uuids"]: soft_data = get_soft_data(uuid, **kwargs) + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: uuid, **kwargs + ) # If we got nothing back from soft_data, then let's try to determine using entity_api if soft_data: if soft_data.get("primary"): - kwargs["dag_run"].conf["primary_datasets"].append(uuid) + if ds_rslt["creation_action"] == "Multi-Assay Split": + kwargs["dag_run"].conf["component_datasets"].append(uuid) + else: + kwargs["dag_run"].conf["primary_datasets"].append(uuid) else: kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: print(f"No matching soft data returned for {uuid}") - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=lambda **kwargs: uuid, **kwargs - ) if ds_rslt.get("dataset_info"): # dataset_info should only be populated for processed_datasets print(ds_rslt.get("dataset_info")) @@ -97,6 +100,16 @@ def get_processed_dataset_uuids(**kwargs): provide_context=True, ) + def get_component_dataset_uuids(**kwargs): + return [{"uuid": uuid} for uuid in kwargs["dag_run"].conf["component_datasets"]] + + t_get_component_dataset_uuids = PythonOperator( + task_id="get_component_dataset_uuids", + python_callable=get_component_dataset_uuids(), + queue=get_queue_resource("rebuild_metadata"), + provide_context=True, + ) + t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator.partial( task_id="trigger_rebuild_primary_dataset_metadata", trigger_dag_id="rebuild_primary_dataset_metadata", @@ -109,6 +122,17 @@ def get_processed_dataset_uuids(**kwargs): queue=get_queue_resource("rebuild_metadata"), ).expand(conf=t_get_processed_dataset_uuids.output) - t_build_dataset_lists >> [t_get_primary_dataset_uuids, t_get_processed_dataset_uuids] + t_launch_rebuild_component_dataset_metadata = TriggerDagRunOperator.partial( + task_id="trigger_rebuild_component_dataset_metadata", + trigger_dag_id="multiassay_component_metadata", + queue=get_queue_resource("rebuild_metadata"), + ).expand(conf=t_get_component_dataset_uuids.output) + + t_build_dataset_lists >> [ + t_get_primary_dataset_uuids, + t_get_processed_dataset_uuids, + t_get_component_dataset_uuids, + ] t_get_primary_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata t_get_processed_dataset_uuids >> t_launch_rebuild_processed_dataset_metadata + t_get_component_dataset_uuids >> t_launch_rebuild_component_dataset_metadata From f24ed4abcfd2e45dfa5df559c07a0d967babc84f Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 5 Mar 2024 16:14:34 -0500 Subject: [PATCH 3/6] Utils: Include creation_action in data returned from pythonop_get_dataset_state --- src/ingest-pipeline/airflow/dags/utils.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 88034b5b..5dac8891 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1017,6 +1017,7 @@ def pythonop_get_dataset_state(**kwargs) -> Dict: "local_directory_full_path": full_path, "metadata": metadata, "ingest_metadata": ds_rslt.get("ingest_metadata"), + "creation_action": ds_rslt.get("creation_action"), } if ds_rslt["entity_type"] == "Dataset": @@ -1338,10 +1339,15 @@ def send_status_msg(**kwargs) -> bool: status = None extra_fields = {} - ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs) + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs + ) if success: md = {} - files_for_provenance = [dag_file, *cwl_workflows,] + files_for_provenance = [ + dag_file, + *cwl_workflows, + ] if ivt_path_fun: files_for_provenance.append(ivt_path_fun(**kwargs)) if no_provenance: @@ -1408,8 +1414,12 @@ def send_status_msg(**kwargs) -> bool: else: status = ds_rslt.get("status", "QA") if status in ["Processing", "New", "Invalid"]: - status = "Submitted" if kwargs["dag"].dag_id in ["multiassay_component_metadata", - "reorganize_upload"] else "QA" + status = ( + "Submitted" + if kwargs["dag"].dag_id + in ["multiassay_component_metadata", "reorganize_upload"] + else "QA" + ) if metadata_fun: if not contacts: contacts = ds_rslt.get("contacts", []) From e04eacf3b7d36cbb9732bacde128201bd1b23c00 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 5 Mar 2024 16:16:21 -0500 Subject: [PATCH 4/6] DAGs: Do not include () --- src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index 047fe2e2..49426b5b 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -105,7 +105,7 @@ def get_component_dataset_uuids(**kwargs): t_get_component_dataset_uuids = PythonOperator( task_id="get_component_dataset_uuids", - python_callable=get_component_dataset_uuids(), + python_callable=get_component_dataset_uuids, queue=get_queue_resource("rebuild_metadata"), provide_context=True, ) From f64ec0dc86d174e931e90c4edc84a46bc3bd6442 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Wed, 6 Mar 2024 09:58:07 -0500 Subject: [PATCH 5/6] DAGs: Initialize component_datasets like we do for processed/primary --- .../airflow/dags/rebuild_multiple_metadata.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index 49426b5b..fc908259 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -41,6 +41,7 @@ def get_uuid_for_error(**kwargs): def build_dataset_lists(**kwargs): kwargs["dag_run"].conf["primary_datasets"] = [] kwargs["dag_run"].conf["processed_datasets"] = [] + kwargs["dag_run"].conf["component_datasets"] = [] print("dag_run conf follows:") pprint(kwargs["dag_run"].conf) @@ -64,7 +65,10 @@ def build_dataset_lists(**kwargs): if ds_rslt.get("dataset_info"): # dataset_info should only be populated for processed_datasets print(ds_rslt.get("dataset_info")) - kwargs["dag_run"].conf["processed_datasets"].append(uuid) + if ds_rslt["creation_action"] == "Multi-Assay Split": + kwargs["dag_run"].conf["component_datasets"].append(uuid) + else: + kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: kwargs["dag_run"].conf["primary_datasets"].append(uuid) From 0d0c21725a925a9e95b57455930b38aa9645f31e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Wed, 6 Mar 2024 14:03:02 -0500 Subject: [PATCH 6/6] DAGs: Bad logic --- .../airflow/dags/rebuild_multiple_metadata.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index fc908259..9d02412b 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -65,12 +65,12 @@ def build_dataset_lists(**kwargs): if ds_rslt.get("dataset_info"): # dataset_info should only be populated for processed_datasets print(ds_rslt.get("dataset_info")) + kwargs["dag_run"].conf["processed_datasets"].append(uuid) + else: if ds_rslt["creation_action"] == "Multi-Assay Split": kwargs["dag_run"].conf["component_datasets"].append(uuid) else: - kwargs["dag_run"].conf["processed_datasets"].append(uuid) - else: - kwargs["dag_run"].conf["primary_datasets"].append(uuid) + kwargs["dag_run"].conf["primary_datasets"].append(uuid) t_build_dataset_lists = PythonOperator( task_id="build_dataset_lists",