Skip to content

Commit

Permalink
Merge pull request #851 from hubmapconsortium/NIHDEV-512-Ingest-Pipel…
Browse files Browse the repository at this point in the history
…ine-Add-support-for-populating-component-datasets

Nihdev 512 ingest pipeline add support for populating component datasets
  • Loading branch information
jpuerto-psc authored Mar 6, 2024
2 parents 54395f1 + 0d0c217 commit 08aa041
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 11 deletions.
40 changes: 34 additions & 6 deletions src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,36 @@ def get_uuid_for_error(**kwargs):
def build_dataset_lists(**kwargs):
kwargs["dag_run"].conf["primary_datasets"] = []
kwargs["dag_run"].conf["processed_datasets"] = []
kwargs["dag_run"].conf["component_datasets"] = []

print("dag_run conf follows:")
pprint(kwargs["dag_run"].conf)
for uuid in kwargs["dag_run"].conf["uuids"]:
soft_data = get_soft_data(uuid, **kwargs)
ds_rslt = pythonop_get_dataset_state(
dataset_uuid_callable=lambda **kwargs: uuid, **kwargs
)

# If we got nothing back from soft_data, then let's try to determine using entity_api
if soft_data:
if soft_data.get("primary"):
kwargs["dag_run"].conf["primary_datasets"].append(uuid)
if ds_rslt["creation_action"] == "Multi-Assay Split":
kwargs["dag_run"].conf["component_datasets"].append(uuid)
else:
kwargs["dag_run"].conf["primary_datasets"].append(uuid)
else:
kwargs["dag_run"].conf["processed_datasets"].append(uuid)
else:
print(f"No matching soft data returned for {uuid}")
ds_rslt = pythonop_get_dataset_state(
dataset_uuid_callable=lambda **kwargs: uuid, **kwargs
)
if ds_rslt.get("dataset_info"):
# dataset_info should only be populated for processed_datasets
print(ds_rslt.get("dataset_info"))
kwargs["dag_run"].conf["processed_datasets"].append(uuid)
else:
kwargs["dag_run"].conf["primary_datasets"].append(uuid)
if ds_rslt["creation_action"] == "Multi-Assay Split":
kwargs["dag_run"].conf["component_datasets"].append(uuid)
else:
kwargs["dag_run"].conf["primary_datasets"].append(uuid)

t_build_dataset_lists = PythonOperator(
task_id="build_dataset_lists",
Expand Down Expand Up @@ -97,6 +104,16 @@ def get_processed_dataset_uuids(**kwargs):
provide_context=True,
)

def get_component_dataset_uuids(**kwargs):
return [{"uuid": uuid} for uuid in kwargs["dag_run"].conf["component_datasets"]]

t_get_component_dataset_uuids = PythonOperator(
task_id="get_component_dataset_uuids",
python_callable=get_component_dataset_uuids,
queue=get_queue_resource("rebuild_metadata"),
provide_context=True,
)

t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator.partial(
task_id="trigger_rebuild_primary_dataset_metadata",
trigger_dag_id="rebuild_primary_dataset_metadata",
Expand All @@ -109,6 +126,17 @@ def get_processed_dataset_uuids(**kwargs):
queue=get_queue_resource("rebuild_metadata"),
).expand(conf=t_get_processed_dataset_uuids.output)

t_build_dataset_lists >> [t_get_primary_dataset_uuids, t_get_processed_dataset_uuids]
t_launch_rebuild_component_dataset_metadata = TriggerDagRunOperator.partial(
task_id="trigger_rebuild_component_dataset_metadata",
trigger_dag_id="multiassay_component_metadata",
queue=get_queue_resource("rebuild_metadata"),
).expand(conf=t_get_component_dataset_uuids.output)

t_build_dataset_lists >> [
t_get_primary_dataset_uuids,
t_get_processed_dataset_uuids,
t_get_component_dataset_uuids,
]
t_get_primary_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata
t_get_processed_dataset_uuids >> t_launch_rebuild_processed_dataset_metadata
t_get_component_dataset_uuids >> t_launch_rebuild_component_dataset_metadata
18 changes: 14 additions & 4 deletions src/ingest-pipeline/airflow/dags/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ def pythonop_get_dataset_state(**kwargs) -> Dict:
"local_directory_full_path": full_path,
"metadata": metadata,
"ingest_metadata": ds_rslt.get("ingest_metadata"),
"creation_action": ds_rslt.get("creation_action"),
}

if ds_rslt["entity_type"] == "Dataset":
Expand Down Expand Up @@ -1338,10 +1339,15 @@ def send_status_msg(**kwargs) -> bool:
status = None
extra_fields = {}

ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs)
ds_rslt = pythonop_get_dataset_state(
dataset_uuid_callable=lambda **kwargs: dataset_uuid, **kwargs
)
if success:
md = {}
files_for_provenance = [dag_file, *cwl_workflows,]
files_for_provenance = [
dag_file,
*cwl_workflows,
]
if ivt_path_fun:
files_for_provenance.append(ivt_path_fun(**kwargs))
if no_provenance:
Expand Down Expand Up @@ -1408,8 +1414,12 @@ def send_status_msg(**kwargs) -> bool:
else:
status = ds_rslt.get("status", "QA")
if status in ["Processing", "New", "Invalid"]:
status = "Submitted" if kwargs["dag"].dag_id in ["multiassay_component_metadata",
"reorganize_upload"] else "QA"
status = (
"Submitted"
if kwargs["dag"].dag_id
in ["multiassay_component_metadata", "reorganize_upload"]
else "QA"
)
if metadata_fun:
if not contacts:
contacts = ds_rslt.get("contacts", [])
Expand Down
2 changes: 1 addition & 1 deletion src/ingest-pipeline/submodules/ingest-validation-tools

0 comments on commit 08aa041

Please sign in to comment.