diff --git a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py index 6134c3fa..9d02412b 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py @@ -41,29 +41,36 @@ def get_uuid_for_error(**kwargs): def build_dataset_lists(**kwargs): kwargs["dag_run"].conf["primary_datasets"] = [] kwargs["dag_run"].conf["processed_datasets"] = [] + kwargs["dag_run"].conf["component_datasets"] = [] print("dag_run conf follows:") pprint(kwargs["dag_run"].conf) for uuid in kwargs["dag_run"].conf["uuids"]: soft_data = get_soft_data(uuid, **kwargs) + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: uuid, **kwargs + ) # If we got nothing back from soft_data, then let's try to determine using entity_api if soft_data: if soft_data.get("primary"): - kwargs["dag_run"].conf["primary_datasets"].append(uuid) + if ds_rslt["creation_action"] == "Multi-Assay Split": + kwargs["dag_run"].conf["component_datasets"].append(uuid) + else: + kwargs["dag_run"].conf["primary_datasets"].append(uuid) else: kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: print(f"No matching soft data returned for {uuid}") - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=lambda **kwargs: uuid, **kwargs - ) if ds_rslt.get("dataset_info"): # dataset_info should only be populated for processed_datasets print(ds_rslt.get("dataset_info")) kwargs["dag_run"].conf["processed_datasets"].append(uuid) else: - kwargs["dag_run"].conf["primary_datasets"].append(uuid) + if ds_rslt["creation_action"] == "Multi-Assay Split": + kwargs["dag_run"].conf["component_datasets"].append(uuid) + else: + kwargs["dag_run"].conf["primary_datasets"].append(uuid) t_build_dataset_lists = PythonOperator( task_id="build_dataset_lists", @@ -97,6 +104,16 @@ def get_processed_dataset_uuids(**kwargs): provide_context=True, ) + def get_component_dataset_uuids(**kwargs): + return [{"uuid": uuid} for uuid in kwargs["dag_run"].conf["component_datasets"]] + + t_get_component_dataset_uuids = PythonOperator( + task_id="get_component_dataset_uuids", + python_callable=get_component_dataset_uuids, + queue=get_queue_resource("rebuild_metadata"), + provide_context=True, + ) + t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator.partial( task_id="trigger_rebuild_primary_dataset_metadata", trigger_dag_id="rebuild_primary_dataset_metadata", @@ -109,6 +126,17 @@ def get_processed_dataset_uuids(**kwargs): queue=get_queue_resource("rebuild_metadata"), ).expand(conf=t_get_processed_dataset_uuids.output) - t_build_dataset_lists >> [t_get_primary_dataset_uuids, t_get_processed_dataset_uuids] + t_launch_rebuild_component_dataset_metadata = TriggerDagRunOperator.partial( + task_id="trigger_rebuild_component_dataset_metadata", + trigger_dag_id="multiassay_component_metadata", + queue=get_queue_resource("rebuild_metadata"), + ).expand(conf=t_get_component_dataset_uuids.output) + + t_build_dataset_lists >> [ + t_get_primary_dataset_uuids, + t_get_processed_dataset_uuids, + t_get_component_dataset_uuids, + ] t_get_primary_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata t_get_processed_dataset_uuids >> t_launch_rebuild_processed_dataset_metadata + t_get_component_dataset_uuids >> t_launch_rebuild_component_dataset_metadata diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 88034b5b..5dac8891 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -1017,6 +1017,7 @@ def pythonop_get_dataset_state(**kwargs) -> Dict: "local_directory_full_path": full_path, "metadata": metadata, "ingest_metadata": ds_rslt.get("ingest_metadata"), + "creation_action": ds_rslt.get("creation_action"), } if ds_rslt["entity_type"] == "Dataset": @@ -1338,10 +1339,15 @@ def send_status_msg(**kwargs) -> bool: status = None extra_fields = {} - ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs) + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs + ) if success: md = {} - files_for_provenance = [dag_file, *cwl_workflows,] + files_for_provenance = [ + dag_file, + *cwl_workflows, + ] if ivt_path_fun: files_for_provenance.append(ivt_path_fun(**kwargs)) if no_provenance: @@ -1408,8 +1414,12 @@ def send_status_msg(**kwargs) -> bool: else: status = ds_rslt.get("status", "QA") if status in ["Processing", "New", "Invalid"]: - status = "Submitted" if kwargs["dag"].dag_id in ["multiassay_component_metadata", - "reorganize_upload"] else "QA" + status = ( + "Submitted" + if kwargs["dag"].dag_id + in ["multiassay_component_metadata", "reorganize_upload"] + else "QA" + ) if metadata_fun: if not contacts: contacts = ds_rslt.get("contacts", []) diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index 6748dd7b..c7b2ae7b 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit 6748dd7b76f8d522d3657488f1fe36be18d0ebbc +Subproject commit c7b2ae7b93238a00c3918e950920f357d3329add