From 4b424ee5789fa65dca45299199fab5dedb64be11 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 10:30:27 -0500 Subject: [PATCH 01/66] Utils: Update split_and_create to add support for shared uploads. Also refactor populate function slightly --- .../misc/tools/split_and_create.py | 207 +++++++++++++----- 1 file changed, 148 insertions(+), 59 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index c51d8411..cdd1567e 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -3,6 +3,7 @@ import argparse import json import re +import os import time from pathlib import Path from pprint import pprint @@ -89,19 +90,6 @@ def create_fake_uuid_generator(): yield rslt -# def get_canonical_assay_type(row, entity_factory, default_type): -# """ -# Convert assay type to canonical form, with fallback -# """ -# try: -# rslt = entity_factory.type_client.getAssayType(row["assay_type"]).name -# except Exception: -# print(f"fallback {row['assay_type']} {default_type}") -# rslt = FALLBACK_ASSAY_TYPE_TRANSLATIONS.get(row["assay_type"], default_type) -# print(f"{row['assay_type']} -> {rslt}") -# return rslt - - def get_canonical_assay_type(row, dataset_type=None): # TODO: check if this needs to be rewrite to support old style metadata file_dataset_type = row["assay_type"] if hasattr(row, "assay_type") else row["dataset_type"] @@ -180,20 +168,9 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): Build the contents of the newly created dataset using info from the parent """ uuid = row["new_uuid"] - old_data_path = row["data_path"] - row["data_path"] = "." - old_contrib_path = Path(row["contributors_path"]) - new_contrib_path = Path("extras") / old_contrib_path.name - row["contributors_path"] = str(new_contrib_path) - if "antibodies_path" in row: - old_antibodies_path = Path(row["antibodies_path"]) - new_antibodies_path = Path("extras") / old_antibodies_path.name - row["antibodies_path"] = str(new_antibodies_path) - else: - old_antibodies_path = None - # row['assay_type'] = row['canonical_assay_type'] row_df = pd.DataFrame([row]) row_df = row_df.drop(columns=["canonical_assay_type", "new_uuid"]) + if dryrun: kid_path = Path(SCRATCH_PATH) / uuid kid_path.mkdir(0o770, parents=True, exist_ok=True) @@ -202,29 +179,98 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): else: kid_path = Path(entity_factory.get_full_path(uuid)) + # Have to cover two cases + # 1. Case when non_global is set but there is no global/non_global directories + # 2. Case when non_global is not set but there are global/non_global directories + is_shared_upload = {"global", "non_global"} == { + x.name() + for x in source_entity.full_path.glob("*global") + if x.is_dir() and x.name() in ["global", "non_global"] + } + non_global_files = row.get("non_global_files") + + if non_global_files: + # Catch case 1 + assert ( + is_shared_upload + ), f"{uuid} has non_global_files specified but missing global or non_global directories" + + # Generate a list of file paths for where non_global_files live in the upload + # Structure is {source_path: path_passed_in_metadata (destination path relative to root of dataset)} + non_global_files = { + source_entity.full_path / "non_global" / Path(x.strip()): Path(x.strip()) + for x in non_global_files.split(";") + } + + # Iterate over source_paths and make sure they exist. + for non_global_file in non_global_files.keys(): + assert ( + non_global_file.exists() + ), f"Non global file {non_global_file.as_posix()} does not exist in {source_entity.full_path}" + else: + # Catch case 2 + assert ( + not is_shared_upload + ), f"{uuid} has empty non_global_files but has global & non_global directories" + + old_data_path = row["data_path"] + row["data_path"] = "." + + # Contributors and antibodies should point to the path directly. + old_paths = [] + for path_index in ["contributors_path", "antibodies_path"]: + if old_path := row.get(path_index): + old_paths.append(Path(old_path)) + row[path_index] = str(Path("extras") / old_paths[old_path].name) + + # old_contrib_path = Path(row["contributors_path"]) + # new_contrib_path = Path("extras") / old_contrib_path.name + # row["contributors_path"] = str(new_contrib_path) + # if "antibodies_path" in row: + # old_antibodies_path = Path(row["antibodies_path"]) + # new_antibodies_path = Path("extras") / old_antibodies_path.name + # row["antibodies_path"] = str(new_antibodies_path) + # else: + # old_antibodies_path = None + row_df.to_csv(kid_path / f"{uuid}-metadata.tsv", header=True, sep="\t", index=False) - extras_path = kid_path / "extras" + dest_extras_path = kid_path / "extras" if components is not None: for component in components: component_df = pd.read_csv(component.get("metadata-file"), sep="\t") component_df_cp = component_df.query(f'data_path=="{old_data_path}"').copy() for _, row_component in component_df_cp.iterrows(): - old_component_data_path = row_component["data_path"] + # This loop updates the data_path for the component row_component["data_path"] = "." - old_component_contrib_path = Path(row_component["contributors_path"]) - new_component_contrib_path = Path("extras") / old_component_contrib_path.name - row_component["contributors_path"] = str(new_component_contrib_path) - if "antibodies_path" in row_component: - old_component_antibodies_path = Path(row["antibodies_path"]) - new_component_antibodies_path = ( - Path("extras") / old_component_antibodies_path.name - ) - row_component["antibodies_path"] = str(new_component_antibodies_path) - if dryrun: - print(f"copy {old_component_antibodies_path} to {extras_path}") - else: - copy2(source_entity.full_path / old_component_antibodies_path, extras_path) + old_component_paths = [] + for path_index in ["contributors_path", "antibodies_path"]: + if old_component_path := row_component.get(path_index): + old_component_paths.append(Path(old_component_path)) + row_component[path_index] = str(Path("extras") / old_component_path.name) + + copy_contrib_antibodies( + dest_extras_path, source_entity, old_component_paths, dryrun + ) + + # old_component_contrib_path = Path(row_component["contributors_path"]) + # new_component_contrib_path = Path("extras") / old_component_contrib_path.name + # # Update the contributors path + # row_component["contributors_path"] = str(new_component_contrib_path) + # # Update the antibodies path (if it exists) + # + # # TODO: Doesn't seem like we copy the component's contributors file + # if "antibodies_path" in row_component: + # old_component_antibodies_path = Path(row["antibodies_path"]) + # new_component_antibodies_path = ( + # Path("extras") / old_component_antibodies_path.name + # ) + # row_component["antibodies_path"] = str(new_component_antibodies_path) + # if dryrun: + # print(f"copy {old_component_antibodies_path} to {extras_path}") + # else: + # copy2(source_entity.full_path / old_component_antibodies_path, extras_path) + row_component = pd.DataFrame([row_component]) row_component.to_csv( kid_path / f"{component.get('assaytype')}-metadata.tsv", @@ -232,20 +278,49 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): sep="\t", index=False, ) - if extras_path.exists(): - assert extras_path.is_dir(), f"{extras_path} is not a directory" + + if is_shared_upload: + copy_shared_data(kid_path, source_entity, non_global_files, dryrun) else: - source_extras_path = source_entity.full_path / "extras" - if source_extras_path.exists(): - if dryrun: - print(f"copy {source_extras_path} to {extras_path}") - else: - copytree(source_extras_path, extras_path) + # This moves everything in the source_data_path over to the dataset path + # So part of non-shared uploads + copy_data_path(kid_path, source_entity.full_path / old_data_path, dryrun) + + # START REGION - INDEPENDENT OF SHARED/NON-SHARED STATUS + # This moves extras over to the dataset extras directory + copy_extras(dest_extras_path, source_entity, dryrun) + # This copies contrib and antibodies over to dataset + copy_contrib_antibodies( + dest_extras_path, + source_entity, + old_paths, + dryrun, + ) + # END REGION - INDEPENDENT OF SHARED/NON-SHARED STATUS + + print(f"{old_data_path} -> {uuid} -> full path: {kid_path}") + + +def copy_shared_data(kid_path, source_entity, non_global_files, dryrun): + # Copy global files over to dataset directory + if dryrun: + print(f"Copy files from {source_entity.full_path / 'global'} to {kid_path}") + else: + copytree(source_entity.full_path / "global", kid_path) + # Copy over non-global files to dataset directory + for source_non_global_file, dest_relative_non_global_file in non_global_files.items(): + dest_non_global_file = kid_path / dest_relative_non_global_file + if dryrun: + print( + f"Copy file from {source_non_global_file} to {kid_path / dest_relative_non_global_file}" + ) else: - if dryrun: - print(f"creating {extras_path}") - extras_path.mkdir(0o770) - source_data_path = source_entity.full_path / old_data_path + if not dest_non_global_file.exists(): + os.makedirs(dest_non_global_file, exist_ok=True) + copy2(source_non_global_file, kid_path / dest_relative_non_global_file) + + +def copy_data_path(kid_path, source_data_path, dryrun): for elt in source_data_path.glob("*"): dst_file = kid_path / elt.name if dryrun: @@ -260,16 +335,30 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): sub_elt.rename(kid_path / elt.name / sub_elt.name) continue elt.rename(dst_file) - if dryrun: - print(f"copy {old_contrib_path} to {extras_path}") + + +def copy_extras(dest_extras_path, source_entity, dryrun): + if dest_extras_path.exists(): + assert dest_extras_path.is_dir(), f"{dest_extras_path} is not a directory" else: - copy2(source_entity.full_path / old_contrib_path, extras_path) - if old_antibodies_path is not None: + source_extras_path = source_entity.full_path / "extras" + if source_extras_path.exists(): + if dryrun: + print(f"copy {source_extras_path} to {dest_extras_path}") + else: + copytree(source_extras_path, dest_extras_path) + else: + if dryrun: + print(f"creating {dest_extras_path}") + dest_extras_path.mkdir(0o770) + + +def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): + for old_path in old_paths: if dryrun: - print(f"copy {old_antibodies_path} to {extras_path}") + print(f"copy {old_path} to {dest_extras_path}") else: - copy2(source_entity.full_path / old_antibodies_path, extras_path) - print(f"{old_data_path} -> {uuid} -> full path: {kid_path}") + copy2(source_entity.full_path / old_path, dest_extras_path) def apply_special_case_transformations( From fe9598dfa9980295475441bf687eb99e117e9bab Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 12:01:20 -0500 Subject: [PATCH 02/66] Utils: x.name != x.name() --- src/ingest-pipeline/misc/tools/split_and_create.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index cdd1567e..5036414a 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -183,9 +183,9 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): # 1. Case when non_global is set but there is no global/non_global directories # 2. Case when non_global is not set but there are global/non_global directories is_shared_upload = {"global", "non_global"} == { - x.name() + x.name for x in source_entity.full_path.glob("*global") - if x.is_dir() and x.name() in ["global", "non_global"] + if x.is_dir() and x.name in ["global", "non_global"] } non_global_files = row.get("non_global_files") From a951c708e19af89961bc8b01706ba91564cd66f0 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 13:57:53 -0500 Subject: [PATCH 03/66] Utils: BUg fix --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 5036414a..501877a2 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -221,7 +221,7 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): for path_index in ["contributors_path", "antibodies_path"]: if old_path := row.get(path_index): old_paths.append(Path(old_path)) - row[path_index] = str(Path("extras") / old_paths[old_path].name) + row[path_index] = str(Path("extras") / old_path.name) # old_contrib_path = Path(row["contributors_path"]) # new_contrib_path = Path("extras") / old_contrib_path.name From 9df0cc1eb3aa83c05a0546bbaed5c433798f6d12 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 14:02:56 -0500 Subject: [PATCH 04/66] Utils: Bug fix for path issues --- src/ingest-pipeline/misc/tools/split_and_create.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 501877a2..813ca3b7 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -219,8 +219,8 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): # Contributors and antibodies should point to the path directly. old_paths = [] for path_index in ["contributors_path", "antibodies_path"]: - if old_path := row.get(path_index): - old_paths.append(Path(old_path)) + if old_path := Path(row.get(path_index)): + old_paths.append(old_path) row[path_index] = str(Path("extras") / old_path.name) # old_contrib_path = Path(row["contributors_path"]) @@ -245,8 +245,8 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): row_component["data_path"] = "." old_component_paths = [] for path_index in ["contributors_path", "antibodies_path"]: - if old_component_path := row_component.get(path_index): - old_component_paths.append(Path(old_component_path)) + if old_component_path := Path(row_component.get(path_index)): + old_component_paths.append(old_component_path) row_component[path_index] = str(Path("extras") / old_component_path.name) copy_contrib_antibodies( From 317d0913aa346ab4c4c66489212ca0d93008c65e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 14:30:08 -0500 Subject: [PATCH 05/66] Utils: Bug fix for path issues, add some prints for tracking --- .../misc/tools/split_and_create.py | 47 +++++-------------- 1 file changed, 13 insertions(+), 34 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 813ca3b7..0a9a4e0f 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -187,9 +187,13 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): for x in source_entity.full_path.glob("*global") if x.is_dir() and x.name in ["global", "non_global"] } + non_global_files = row.get("non_global_files") + print(f"Is {uuid} part of a shared upload? {is_shared_upload}") + if non_global_files: + print(f"Non global files: {non_global_files}") # Catch case 1 assert ( is_shared_upload @@ -223,15 +227,7 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): old_paths.append(old_path) row[path_index] = str(Path("extras") / old_path.name) - # old_contrib_path = Path(row["contributors_path"]) - # new_contrib_path = Path("extras") / old_contrib_path.name - # row["contributors_path"] = str(new_contrib_path) - # if "antibodies_path" in row: - # old_antibodies_path = Path(row["antibodies_path"]) - # new_antibodies_path = Path("extras") / old_antibodies_path.name - # row["antibodies_path"] = str(new_antibodies_path) - # else: - # old_antibodies_path = None + print(f"Old paths to copy over {old_paths}") row_df.to_csv(kid_path / f"{uuid}-metadata.tsv", header=True, sep="\t", index=False) dest_extras_path = kid_path / "extras" @@ -249,28 +245,12 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): old_component_paths.append(old_component_path) row_component[path_index] = str(Path("extras") / old_component_path.name) + print(f"Old component paths to copy over {old_component_paths}") + copy_contrib_antibodies( dest_extras_path, source_entity, old_component_paths, dryrun ) - # old_component_contrib_path = Path(row_component["contributors_path"]) - # new_component_contrib_path = Path("extras") / old_component_contrib_path.name - # # Update the contributors path - # row_component["contributors_path"] = str(new_component_contrib_path) - # # Update the antibodies path (if it exists) - # - # # TODO: Doesn't seem like we copy the component's contributors file - # if "antibodies_path" in row_component: - # old_component_antibodies_path = Path(row["antibodies_path"]) - # new_component_antibodies_path = ( - # Path("extras") / old_component_antibodies_path.name - # ) - # row_component["antibodies_path"] = str(new_component_antibodies_path) - # if dryrun: - # print(f"copy {old_component_antibodies_path} to {extras_path}") - # else: - # copy2(source_entity.full_path / old_component_antibodies_path, extras_path) - row_component = pd.DataFrame([row_component]) row_component.to_csv( kid_path / f"{component.get('assaytype')}-metadata.tsv", @@ -306,18 +286,17 @@ def copy_shared_data(kid_path, source_entity, non_global_files, dryrun): if dryrun: print(f"Copy files from {source_entity.full_path / 'global'} to {kid_path}") else: - copytree(source_entity.full_path / "global", kid_path) + print(f"Copy files from {source_entity.full_path / 'global'} to {kid_path}") + copytree(source_entity.full_path / "global", kid_path, dirs_exist_ok=True) # Copy over non-global files to dataset directory for source_non_global_file, dest_relative_non_global_file in non_global_files.items(): dest_non_global_file = kid_path / dest_relative_non_global_file if dryrun: - print( - f"Copy file from {source_non_global_file} to {kid_path / dest_relative_non_global_file}" - ) + print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") else: - if not dest_non_global_file.exists(): - os.makedirs(dest_non_global_file, exist_ok=True) - copy2(source_non_global_file, kid_path / dest_relative_non_global_file) + print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") + dest_non_global_file.mkdir(parents=True, exist_ok=True) + copy2(source_non_global_file, dest_non_global_file) def copy_data_path(kid_path, source_data_path, dryrun): From c1a10aea148237cf680e8d98e6ec8f3868ac9792 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 2 Feb 2024 14:36:05 -0500 Subject: [PATCH 06/66] Utils: Create the parent directory, not the file as a directory --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 0a9a4e0f..2e7bf5be 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -295,7 +295,7 @@ def copy_shared_data(kid_path, source_entity, non_global_files, dryrun): print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") else: print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") - dest_non_global_file.mkdir(parents=True, exist_ok=True) + dest_non_global_file.parent.mkdir(parents=True, exist_ok=True) copy2(source_non_global_file, dest_non_global_file) From 11605bb57cd0c2834137a4d17c00c3b730af04b3 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 08:58:28 -0500 Subject: [PATCH 07/66] S&C: Only add to non_global_files if X is not empty. --- src/ingest-pipeline/misc/tools/split_and_create.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 2e7bf5be..e0c03e33 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -204,6 +204,7 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): non_global_files = { source_entity.full_path / "non_global" / Path(x.strip()): Path(x.strip()) for x in non_global_files.split(";") + if x } # Iterate over source_paths and make sure they exist. From 3c11bd9a4e4f5f1bb6108243bd798a2d9f86811f Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 10:55:06 -0500 Subject: [PATCH 08/66] S&C: Only add non_global_files if after a strip, they are still valid values --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index e0c03e33..7962b138 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -204,7 +204,7 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): non_global_files = { source_entity.full_path / "non_global" / Path(x.strip()): Path(x.strip()) for x in non_global_files.split(";") - if x + if x.strip() } # Iterate over source_paths and make sure they exist. From f7bc78178ea005e9afb21257537ac5c84a6a8f72 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:10:27 -0500 Subject: [PATCH 09/66] S&C: Fix iteration over path_indices. Path(None) throws an error --- src/ingest-pipeline/misc/tools/split_and_create.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 7962b138..d79dfa2a 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -224,7 +224,8 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): # Contributors and antibodies should point to the path directly. old_paths = [] for path_index in ["contributors_path", "antibodies_path"]: - if old_path := Path(row.get(path_index)): + if old_path := row.get(path_index): + old_path = Path(old_path) old_paths.append(old_path) row[path_index] = str(Path("extras") / old_path.name) @@ -242,7 +243,8 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): row_component["data_path"] = "." old_component_paths = [] for path_index in ["contributors_path", "antibodies_path"]: - if old_component_path := Path(row_component.get(path_index)): + if old_component_path := row_component.get(path_index): + old_component_path = Path(old_component_path) old_component_paths.append(old_component_path) row_component[path_index] = str(Path("extras") / old_component_path.name) From 5256c3eb28db7a446d3c7188dda5dfb5a44fe319 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:16:47 -0500 Subject: [PATCH 10/66] S&C: Add dirs_exist_ok to copytree call --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index d79dfa2a..c4f522d4 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -328,7 +328,7 @@ def copy_extras(dest_extras_path, source_entity, dryrun): if dryrun: print(f"copy {source_extras_path} to {dest_extras_path}") else: - copytree(source_extras_path, dest_extras_path) + copytree(source_extras_path, dest_extras_path, dirs_exist_ok=True) else: if dryrun: print(f"creating {dest_extras_path}") From c6544218d41b30e0a2e758d6a4f59fe92a596ae7 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:29:39 -0500 Subject: [PATCH 11/66] S&C: Copy contrib/antibodies to their file not to the extras file --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index c4f522d4..278911e5 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -340,7 +340,7 @@ def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): if dryrun: print(f"copy {old_path} to {dest_extras_path}") else: - copy2(source_entity.full_path / old_path, dest_extras_path) + copy2(source_entity.full_path / old_path, dest_extras_path / old_path.name) def apply_special_case_transformations( From 38fac4d76f0c84b84fe1399c6f54ad8620207c7e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:31:44 -0500 Subject: [PATCH 12/66] S&C: More printing --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 278911e5..92de0a0b 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -328,6 +328,7 @@ def copy_extras(dest_extras_path, source_entity, dryrun): if dryrun: print(f"copy {source_extras_path} to {dest_extras_path}") else: + print(f"copy {source_extras_path} to {dest_extras_path}") copytree(source_extras_path, dest_extras_path, dirs_exist_ok=True) else: if dryrun: @@ -340,6 +341,7 @@ def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): if dryrun: print(f"copy {old_path} to {dest_extras_path}") else: + print(f"copy {old_path} to {dest_extras_path}") copy2(source_entity.full_path / old_path, dest_extras_path / old_path.name) From cc6d947a30a0b05d80e6fbe5f66aa8e19085a82b Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:34:22 -0500 Subject: [PATCH 13/66] S&C: Create dest extras directory if it does not exist yet. --- src/ingest-pipeline/misc/tools/split_and_create.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 92de0a0b..900b1022 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -342,6 +342,7 @@ def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): print(f"copy {old_path} to {dest_extras_path}") else: print(f"copy {old_path} to {dest_extras_path}") + dest_extras_path.parent.mkdir(parents=True, exist_ok=True) copy2(source_entity.full_path / old_path, dest_extras_path / old_path.name) From e53661b45c3e8d94b4e1b8f77e47a9e1cdf9e582 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 11:36:53 -0500 Subject: [PATCH 14/66] S&C: Create dest extras directory, not the extras' parent directory --- src/ingest-pipeline/misc/tools/split_and_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 900b1022..887686c5 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -342,7 +342,7 @@ def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): print(f"copy {old_path} to {dest_extras_path}") else: print(f"copy {old_path} to {dest_extras_path}") - dest_extras_path.parent.mkdir(parents=True, exist_ok=True) + dest_extras_path.mkdir(parents=True, exist_ok=True) copy2(source_entity.full_path / old_path, dest_extras_path / old_path.name) From b656c11078132b648d7825472a4366425ecccfe5 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Feb 2024 12:23:55 -0500 Subject: [PATCH 15/66] S&C: Reorganize things so that the metadata write is writing the updated row --- .../misc/tools/split_and_create.py | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 887686c5..b9931d01 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -168,16 +168,17 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): Build the contents of the newly created dataset using info from the parent """ uuid = row["new_uuid"] - row_df = pd.DataFrame([row]) - row_df = row_df.drop(columns=["canonical_assay_type", "new_uuid"]) + old_data_path = row["data_path"] + row["data_path"] = "." - if dryrun: - kid_path = Path(SCRATCH_PATH) / uuid - kid_path.mkdir(0o770, parents=True, exist_ok=True) - print(f"writing this metadata to {kid_path}:") - print(row_df) - else: - kid_path = Path(entity_factory.get_full_path(uuid)) + # Contributors and antibodies should point to the path directly. + old_paths = [] + for path_index in ["contributors_path", "antibodies_path"]: + if old_path := row.get(path_index): + old_path = Path(old_path) + old_paths.append(old_path) + row[path_index] = str(Path("extras") / old_path.name) + print(f"Old paths to copy over {old_paths}") # Have to cover two cases # 1. Case when non_global is set but there is no global/non_global directories @@ -187,11 +188,8 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): for x in source_entity.full_path.glob("*global") if x.is_dir() and x.name in ["global", "non_global"] } - non_global_files = row.get("non_global_files") - print(f"Is {uuid} part of a shared upload? {is_shared_upload}") - if non_global_files: print(f"Non global files: {non_global_files}") # Catch case 1 @@ -218,18 +216,16 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): not is_shared_upload ), f"{uuid} has empty non_global_files but has global & non_global directories" - old_data_path = row["data_path"] - row["data_path"] = "." - - # Contributors and antibodies should point to the path directly. - old_paths = [] - for path_index in ["contributors_path", "antibodies_path"]: - if old_path := row.get(path_index): - old_path = Path(old_path) - old_paths.append(old_path) - row[path_index] = str(Path("extras") / old_path.name) + row_df = pd.DataFrame([row]) + row_df = row_df.drop(columns=["canonical_assay_type", "new_uuid"]) - print(f"Old paths to copy over {old_paths}") + if dryrun: + kid_path = Path(SCRATCH_PATH) / uuid + kid_path.mkdir(0o770, parents=True, exist_ok=True) + print(f"writing this metadata to {kid_path}:") + print(row_df) + else: + kid_path = Path(entity_factory.get_full_path(uuid)) row_df.to_csv(kid_path / f"{uuid}-metadata.tsv", header=True, sep="\t", index=False) dest_extras_path = kid_path / "extras" From 3b0cc4670840dae145124832001e7742fd31bbe2 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 9 Feb 2024 09:25:28 -0500 Subject: [PATCH 16/66] General: Bugfix --- src/ingest-pipeline/airflow/dags/validate_upload.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/validate_upload.py b/src/ingest-pipeline/airflow/dags/validate_upload.py index 0dd55b56..5ac908f0 100644 --- a/src/ingest-pipeline/airflow/dags/validate_upload.py +++ b/src/ingest-pipeline/airflow/dags/validate_upload.py @@ -72,7 +72,7 @@ def my_callable(**kwargs): print("ds_rslt:") pprint(ds_rslt) - for key in ["entity_type", "status", "uuid", "data_types", "local_directory_full_path"]: + for key in ["entity_type", "status", "uuid", "local_directory_full_path"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" if ds_rslt["entity_type"] != "Upload": @@ -111,7 +111,9 @@ def run_validation(**kwargs): plugin_directory=plugin_path, # offline=True, # noqa E265 add_notes=False, - extra_parameters={"coreuse": get_threads_resource("validate_upload", "run_validation")}, + extra_parameters={ + "coreuse": get_threads_resource("validate_upload", "run_validation") + }, globus_token=get_auth_tok(**kwargs), ) # Scan reports an error result From 21338e988b26d99d7098dd938204402f39b5f8a2 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 11 Mar 2024 12:13:48 -0400 Subject: [PATCH 17/66] ReAdding extra_parameters on scan_and_begin_processing- Creating new endpoint for bulk data ingestion. --- .../airflow/dags/scan_and_begin_processing.py | 4 + .../airflow/plugins/hubmap_api/endpoint.py | 400 +++++++++++------- 2 files changed, 252 insertions(+), 152 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 7684a8a4..37720c9c 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -16,6 +16,7 @@ HMDAG, get_auth_tok, get_preserve_scratch_resource, + get_threads_resource, get_queue_resource, get_soft_data_assaytype, make_send_status_msg_function, @@ -129,6 +130,9 @@ def run_validation(**kwargs): plugin_directory=plugin_path, # offline=True, # noqa E265 add_notes=False, + extra_parameters={ + "coreuse": get_threads_resource("scan_and_begin_processing", "run_validation") + }, ignore_deprecation=True, globus_token=get_auth_tok(**kwargs), app_context=app_context, diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index b9099334..7b9ce6c0 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -28,34 +28,34 @@ LOGGER = logging.getLogger(__name__) -airflow_conf.read(os.path.join(os.environ['AIRFLOW_HOME'], 'instance', 'app.cfg')) +airflow_conf.read(os.path.join(os.environ["AIRFLOW_HOME"], "instance", "app.cfg")) # Tables of configuration elements needed at start-up. # Config elements must be lowercase NEEDED_ENV_VARS = [ - 'AIRFLOW_CONN_INGEST_API_CONNECTION', - 'AIRFLOW_CONN_UUID_API_CONNECTION', - 'AIRFLOW_CONN_FILES_API_CONNECTION', - 'AIRFLOW_CONN_SPATIAL_API_CONNECTION', - 'AIRFLOW_CONN_SEARCH_API_CONNECTION', - 'AIRFLOW_CONN_ENTITY_API_CONNECTION' - ] + "AIRFLOW_CONN_INGEST_API_CONNECTION", + "AIRFLOW_CONN_UUID_API_CONNECTION", + "AIRFLOW_CONN_FILES_API_CONNECTION", + "AIRFLOW_CONN_SPATIAL_API_CONNECTION", + "AIRFLOW_CONN_SEARCH_API_CONNECTION", + "AIRFLOW_CONN_ENTITY_API_CONNECTION", +] NEEDED_CONFIG_SECTIONS = [ - 'ingest_map', - ] + "ingest_map", +] NEEDED_CONFIGS = [ - ('ingest_map', 'scan.and.begin.processing'), - ('ingest_map', 'validate.upload'), - ('hubmap_api_plugin', 'build_number'), - ('connections', 'app_client_id'), - ('connections', 'app_client_secret'), - ('connections', 'docker_mount_path'), - ('connections', 'src_path'), - ('connections', 'output_group_name'), - ('connections', 'workflow_scratch'), - ('core', 'timezone'), - ('core', 'fernet_key') - ] + ("ingest_map", "scan.and.begin.processing"), + ("ingest_map", "validate.upload"), + ("hubmap_api_plugin", "build_number"), + ("connections", "app_client_id"), + ("connections", "app_client_secret"), + ("connections", "docker_mount_path"), + ("connections", "src_path"), + ("connections", "output_group_name"), + ("connections", "workflow_scratch"), + ("core", "timezone"), + ("core", "fernet_key"), +] def check_config(): @@ -64,18 +64,18 @@ def check_config(): failed = 0 for elt in NEEDED_ENV_VARS: if elt not in os.environ: - LOGGER.error('The environment variable {} is not set'.format(elt)) + LOGGER.error("The environment variable {} is not set".format(elt)) failed += 1 for key in NEEDED_CONFIG_SECTIONS + [a for a, b in NEEDED_CONFIGS]: if not (key in dct or key.upper() in dct): - LOGGER.error('The configuration section {} does not exist'.format(key)) + LOGGER.error("The configuration section {} does not exist".format(key)) failed += 1 for key1, key2 in NEEDED_CONFIGS: sub_dct = dct[key1] if key1 in dct else dct[key1.upper()] if not (key2 in sub_dct or key2.upper() in sub_dct): - LOGGER.error('The configuration parameter [{}] {} does not exist'.format(key1, key2)) + LOGGER.error("The configuration parameter [{}] {} does not exist".format(key1, key2)) failed += 1 - assert failed == 0, 'ingest-pipeline plugin found {} configuration errors'.format(failed) + assert failed == 0, "ingest-pipeline plugin found {} configuration errors".format(failed) check_config() @@ -91,20 +91,22 @@ def config(section, key): elif key.upper() in dct[section]: rslt = dct[section][key.upper()] else: - raise AirflowConfigException('No config entry for [{}] {}'.format(section, key)) + raise AirflowConfigException("No config entry for [{}] {}".format(section, key)) # airflow config reader leaves quotes, which we want to strip for qc in ['"', "'"]: if rslt.startswith(qc) and rslt.endswith(qc): rslt = rslt.strip(qc) return rslt else: - raise AirflowConfigException('No config section [{}]'.format(section)) + raise AirflowConfigException("No config section [{}]".format(section)) AUTH_HELPER = None if not AuthHelper.isInitialized(): - AUTH_HELPER = AuthHelper.create(clientId=config('connections', 'app_client_id'), - clientSecret=config('connections', 'app_client_secret')) + AUTH_HELPER = AuthHelper.create( + clientId=config("connections", "app_client_id"), + clientSecret=config("connections", "app_client_secret"), + ) else: AUTH_HELPER = AuthHelper.instance() @@ -115,85 +117,81 @@ class HubmapApiInputException(Exception): class HubmapApiConfigException(Exception): pass - - + + class HubmapApiResponse: - def __init__(self): pass - + STATUS_OK = 200 STATUS_BAD_REQUEST = 400 STATUS_UNAUTHORIZED = 401 STATUS_NOT_FOUND = 404 STATUS_SERVER_ERROR = 500 - + @staticmethod def standard_response(status, payload): - json_data = json.dumps({ - 'response': payload - }) - resp = Response(json_data, status=status, mimetype='application/json') + json_data = json.dumps({"response": payload}) + resp = Response(json_data, status=status, mimetype="application/json") return resp - + @staticmethod def success(payload): return HubmapApiResponse.standard_response(HubmapApiResponse.STATUS_OK, payload) - + @staticmethod def error(status, error): - return HubmapApiResponse.standard_response(status, { - 'error': error - }) - + return HubmapApiResponse.standard_response(status, {"error": error}) + @staticmethod def bad_request(error): return HubmapApiResponse.error(HubmapApiResponse.STATUS_BAD_REQUEST, error) - + @staticmethod - def not_found(error='Resource not found'): + def not_found(error="Resource not found"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_NOT_FOUND, error) - + @staticmethod - def unauthorized(error='Not authorized to access this resource'): + def unauthorized(error="Not authorized to access this resource"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_UNAUTHORIZED, error) - + @staticmethod - def server_error(error='An unexpected problem occurred'): + def server_error(error="An unexpected problem occurred"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_SERVER_ERROR, error) -@api_bp.route('/test') +@api_bp.route("/test") @secured(groups="HuBMAP-read") def api_test(): token = None - client_id = config('connections', 'app_client_id') + client_id = config("connections", "app_client_id") print("Client id: " + client_id) - client_secret = config('connections', 'app_client_secret') + client_secret = config("connections", "app_client_secret") print("Client secret: " + client_secret) - if 'MAUTHORIZATION' in request.headers: + if "MAUTHORIZATION" in request.headers: token = str(request.headers["MAUTHORIZATION"])[8:] - elif 'AUTHORIZATION' in request.headers: + elif "AUTHORIZATION" in request.headers: token = str(request.headers["AUTHORIZATION"])[7:] print("Token: " + token) - return HubmapApiResponse.success({'api_is_alive': True}) - + return HubmapApiResponse.success({"api_is_alive": True}) -@api_bp.route('/version') + +@api_bp.route("/version") def api_version(): - return HubmapApiResponse.success({'api': API_VERSION, - 'build': config('hubmap_api_plugin', 'build_number')}) + return HubmapApiResponse.success( + {"api": API_VERSION, "build": config("hubmap_api_plugin", "build_number")} + ) + - def format_dag_run(dag_run): return { - 'run_id': dag_run.run_id, - 'dag_id': dag_run.dag_id, - 'state': dag_run.get_state(), - 'start_date': (None if not dag_run.start_date else str(dag_run.start_date)), - 'end_date': (None if not dag_run.end_date else str(dag_run.end_date)), - 'external_trigger': dag_run.external_trigger, - 'execution_date': str(dag_run.execution_date) + "run_id": dag_run.run_id, + "dag_id": dag_run.dag_id, + "state": dag_run.get_state(), + "start_date": (None if not dag_run.start_date else str(dag_run.start_date)), + "end_date": (None if not dag_run.end_date else str(dag_run.end_date)), + "external_trigger": dag_run.external_trigger, + "execution_date": str(dag_run.execution_date), } @@ -221,27 +219,29 @@ def check_ingest_parms(provider, submission_id, process, full_path): On error, HubmapApiInputException is raised. Return value is None. """ - if process.startswith('mock.'): + if process.startswith("mock."): # test request; there should be pre-recorded response data here_dir = os.path.dirname(os.path.abspath(__file__)) - yml_path = os.path.join(here_dir, '../../dags/mock_data/', - process + '.yml') + yml_path = os.path.join(here_dir, "../../dags/mock_data/", process + ".yml") try: - with open(yml_path, 'r') as f: + with open(yml_path, "r") as f: mock_data = yaml.safe_load(f) except yaml.YAMLError as e: - LOGGER.error('mock data contains invalid YAML: {}'.format(e)) - raise HubmapApiInputException('Mock data is invalid YAML for process %s', process) + LOGGER.error("mock data contains invalid YAML: {}".format(e)) + raise HubmapApiInputException("Mock data is invalid YAML for process %s", process) except IOError as e: - LOGGER.error('mock data load failed: {}'.format(e)) - raise HubmapApiInputException('No mock data found for process %s', process) + LOGGER.error("mock data load failed: {}".format(e)) + raise HubmapApiInputException("No mock data found for process %s", process) else: - dct = {'provider': provider, 'submission_id': submission_id, 'process': process} - base_path = config('connections', 'docker_mount_path') + # dct = {"provider": provider, "submission_id": submission_id, "process": process} + base_path = config("connections", "docker_mount_path") if os.path.commonprefix([full_path, base_path]) != base_path: - LOGGER.error("Ingest directory {} is not a subdirectory of DOCKER_MOUNT_PATH" - .format(full_path)) - raise HubmapApiInputException("Ingest directory is not a subdirectory of DOCKER_MOUNT_PATH") + LOGGER.error( + "Ingest directory {} is not a subdirectory of DOCKER_MOUNT_PATH".format(full_path) + ) + raise HubmapApiInputException( + "Ingest directory is not a subdirectory of DOCKER_MOUNT_PATH" + ) if os.path.exists(full_path) and os.path.isdir(full_path): try: num_files = len(os.listdir(full_path)) @@ -252,20 +252,24 @@ def check_ingest_parms(provider, submission_id, process, full_path): LOGGER.error("Ingest directory {} contains no files".format(full_path)) raise HubmapApiInputException("Ingest directory contains no files") else: - LOGGER.error("cannot find the ingest data for '%s' '%s' '%s' (expected %s)" - % (provider, submission_id, process, full_path)) - raise HubmapApiInputException("Cannot find the expected ingest directory for '%s' '%s' '%s'" - % (provider, submission_id, process)) - + LOGGER.error( + "cannot find the ingest data for '%s' '%s' '%s' (expected %s)" + % (provider, submission_id, process, full_path) + ) + raise HubmapApiInputException( + "Cannot find the expected ingest directory for '%s' '%s' '%s'" + % (provider, submission_id, process) + ) + def _auth_tok_from_request(): """ Parse out and return the authentication token from the current request - """ - authorization = request.headers.get('authorization') - LOGGER.info('top of request_ingest.') - assert authorization[:len('BEARER')].lower() == 'bearer', 'authorization is not BEARER' - substr = authorization[len('BEARER'):].strip() + """ + authorization = request.headers.get("authorization") + LOGGER.info("top of request_ingest.") + assert authorization[: len("BEARER")].lower() == "bearer", "authorization is not BEARER" + substr = authorization[len("BEARER") :].strip() auth_tok = substr # LOGGER.info('auth_tok: %s', auth_tok) # reduce visibility of auth_tok return auth_tok @@ -275,7 +279,7 @@ def _auth_tok_from_environment(): """ Get the 'permanent authorization token' """ - tok = airflow_conf.as_dict()['connections']['APP_CLIENT_SECRET'] + tok = airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] return tok @@ -296,68 +300,72 @@ def _auth_tok_from_environment(): @csrf.exempt -@api_bp.route('/request_ingest', methods=['POST']) +@api_bp.route("/request_ingest", methods=["POST"]) # @secured(groups="HuBMAP-read") def request_ingest(): auth_tok = _auth_tok_from_environment() # decode input data = request.get_json(force=True) - - LOGGER.debug('request_ingest data: {}'.format(str(data))) + + LOGGER.debug("request_ingest data: {}".format(str(data))) # Test and extract required parameters try: - provider = _get_required_string(data, 'provider') - submission_id = _get_required_string(data, 'submission_id') - process = _get_required_string(data, 'process') - full_path = _get_required_string(data, 'full_path') + provider = _get_required_string(data, "provider") + submission_id = _get_required_string(data, "submission_id") + process = _get_required_string(data, "process") + full_path = _get_required_string(data, "full_path") except HubmapApiInputException as e: - return HubmapApiResponse.bad_request('Must specify {} to request data be ingested'.format(str(e))) + return HubmapApiResponse.bad_request( + "Must specify {} to request data be ingested".format(str(e)) + ) - process = process.lower() # necessary because config parser has made the corresponding string lower case + process = ( + process.lower() + ) # necessary because config parser has made the corresponding string lower case try: - dag_id = config('ingest_map', process) + dag_id = config("ingest_map", process) except AirflowConfigException: - return HubmapApiResponse.bad_request('{} is not a known ingestion process'.format(process)) - + return HubmapApiResponse.bad_request("{} is not a known ingestion process".format(process)) + try: check_ingest_parms(provider, submission_id, process, full_path) - + session = settings.Session() # Produce one and only one run - tz = pytz.timezone(config('core', 'timezone')) + tz = pytz.timezone(config("core", "timezone")) execution_date = datetime.now(tz) - LOGGER.info('starting {} with execution_date: {}'.format(dag_id, - execution_date)) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) - run_id = '{}_{}_{}'.format(submission_id, process, execution_date.isoformat()) + run_id = "{}_{}_{}".format(submission_id, process, execution_date.isoformat()) ingest_id = run_id - fernet = Fernet(config('core', 'fernet_key').encode()) + fernet = Fernet(config("core", "fernet_key").encode()) crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() - conf = {'provider': provider, - 'submission_id': submission_id, - 'process': process, - 'dag_id': dag_id, - 'run_id': run_id, - 'ingest_id': ingest_id, - 'crypt_auth_tok': crypt_auth_tok, - 'src_path': config('connections', 'src_path'), - 'lz_path': full_path - } + conf = { + "provider": provider, + "submission_id": submission_id, + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "ingest_id": ingest_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "lz_path": full_path, + } if find_dag_runs(session, dag_id, run_id, execution_date): # The run already happened?? - raise AirflowException('The request happened twice?') + raise AirflowException("The request happened twice?") try: dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) except AirflowException as err: LOGGER.error(err) raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) - LOGGER.info('dagrun follows: {}'.format(dr)) + LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiInputException as e: return HubmapApiResponse.bad_request(str(e)) @@ -368,8 +376,96 @@ def request_ingest(): except Exception as e: return HubmapApiResponse.server_error(str(e)) - return HubmapApiResponse.success({'ingest_id': ingest_id, - 'run_id': run_id}) + return HubmapApiResponse.success({"ingest_id": ingest_id, "run_id": run_id}) + + +@csrf.exempt +@api_bp.route("/request_bulk_ingest", methods=["POST"]) +# @secured(groups="HuBMAP-read") +def request_ingest(): + auth_tok = _auth_tok_from_environment() + error_msgs = [] + success_msgs = [] + + # decode input + data = request.get_json(force=True) + + LOGGER.debug("request_ingest data: {}".format(str(data))) + + for item in data: + # Test and extract required parameters + try: + provider = _get_required_string(item, "provider") + submission_id = _get_required_string(item, "submission_id") + process = _get_required_string(item, "process") + full_path = _get_required_string(item, "full_path") + except HubmapApiInputException as e: + error_msgs.append("Must specify {} to request data be ingested".format(str(e))) + continue + + process = ( + process.lower() + ) # necessary because config parser has made the corresponding string lower case + + try: + dag_id = config("ingest_map", process) + except AirflowConfigException: + error_msgs.append("{} is not a known ingestion process".format(process)) + continue + + try: + check_ingest_parms(provider, submission_id, process, full_path) + + session = settings.Session() + + # Produce one and only one run + tz = pytz.timezone(config("core", "timezone")) + execution_date = datetime.now(tz) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) + + run_id = "{}_{}_{}".format(submission_id, process, execution_date.isoformat()) + ingest_id = run_id + fernet = Fernet(config("core", "fernet_key").encode()) + crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() + + conf = { + "provider": provider, + "submission_id": submission_id, + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "ingest_id": ingest_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "lz_path": full_path, + } + + if find_dag_runs(session, dag_id, run_id, execution_date): + # The run already happened?? + raise AirflowException("The request happened twice?") + + try: + dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) + except AirflowException as err: + LOGGER.error(err) + raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) + success_msgs.append({"ingest_id": ingest_id, "run_id": run_id, "submission_id": submission_id}) + LOGGER.info("dagrun follows: {}".format(dr)) + session.close() + except HubmapApiInputException as e: + error_msgs.append(str(e)) + except ValueError as e: + error_msgs.append(str(e)) + except AirflowException as e: + error_msgs.append(str(e)) + except Exception as e: + error_msgs.append(str(e)) + + if error_msgs: + return HubmapApiResponse.bad_request(error_msgs) + if success_msgs: + return HubmapApiResponse.success(success_msgs) + return HubmapApiResponse.bad_request("Nothing to ingest") def generic_invoke_dag_on_uuid(uuid, process_name): @@ -377,71 +473,71 @@ def generic_invoke_dag_on_uuid(uuid, process_name): process = process_name run_id = "empty" try: - dag_id = config('ingest_map', process) + dag_id = config("ingest_map", process) session = settings.Session() # Produce one and only one run - tz = pytz.timezone(config('core', 'timezone')) + tz = pytz.timezone(config("core", "timezone")) execution_date = datetime.now(tz) - LOGGER.info('starting {} with execution_date: {}'.format(dag_id, - execution_date)) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) - run_id = '{}_{}_{}'.format(uuid, process, execution_date.isoformat()) - fernet = Fernet(config('core', 'fernet_key').encode()) + run_id = "{}_{}_{}".format(uuid, process, execution_date.isoformat()) + fernet = Fernet(config("core", "fernet_key").encode()) crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() - conf = {'process': process, - 'dag_id': dag_id, - 'run_id': run_id, - 'crypt_auth_tok': crypt_auth_tok, - 'src_path': config('connections', 'src_path'), - 'uuid': uuid - } + conf = { + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "uuid": uuid, + } if find_dag_runs(session, dag_id, run_id, execution_date): # The run already happened?? - raise AirflowException('The request happened twice?') + raise AirflowException("The request happened twice?") try: dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) except AirflowException as err: LOGGER.error(err) raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) - LOGGER.info('dagrun follows: {}'.format(dr)) + LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiConfigException: - return HubmapApiResponse.bad_request(f'{process} does not map to a known DAG') + return HubmapApiResponse.bad_request(f"{process} does not map to a known DAG") except HubmapApiInputException as e: return HubmapApiResponse.bad_request(str(e)) except ValueError as e: return HubmapApiResponse.server_error(str(e)) except KeyError as e: - HubmapApiResponse.not_found(f'{e}') + HubmapApiResponse.not_found(f"{e}") except AirflowException as e: return HubmapApiResponse.server_error(str(e)) except Exception as e: return HubmapApiResponse.server_error(str(e)) - return HubmapApiResponse.success({'run_id': run_id}) + return HubmapApiResponse.success({"run_id": run_id}) @csrf.exempt -@api_bp.route('/uploads//validate', methods=['PUT']) +@api_bp.route("/uploads//validate", methods=["PUT"]) # @secured(groups="HuBMAP-read") def validate_upload_uuid(uuid): - return generic_invoke_dag_on_uuid(uuid, 'validate.upload') + return generic_invoke_dag_on_uuid(uuid, "validate.upload") # auth_tok = _auth_tok_from_request() # process = 'validate.upload' @csrf.exempt -@api_bp.route('/uploads//reorganize', methods=['PUT']) +@api_bp.route("/uploads//reorganize", methods=["PUT"]) # @secured(groups="HuBMAP-read") def reorganize_upload_uuid(uuid): - return generic_invoke_dag_on_uuid(uuid, 'reorganize.upload') - - + return generic_invoke_dag_on_uuid(uuid, "reorganize.upload") + + """ Parameters for this request: None @@ -451,8 +547,8 @@ def reorganize_upload_uuid(uuid): """ -@api_bp.route('get_process_strings') +@api_bp.route("get_process_strings") def get_process_strings(): dct = airflow_conf.as_dict() - psl = [s.upper() for s in dct['ingest_map']] if 'ingest_map' in dct else [] - return HubmapApiResponse.success({'process_strings': psl}) + psl = [s.upper() for s in dct["ingest_map"]] if "ingest_map" in dct else [] + return HubmapApiResponse.success({"process_strings": psl}) From dc81db7d57b32351a1a340e0697b2c07ffd6b6d4 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 11 Mar 2024 12:23:12 -0400 Subject: [PATCH 18/66] Bugfix function naming --- src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index 7b9ce6c0..d8a9528b 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -382,7 +382,7 @@ def request_ingest(): @csrf.exempt @api_bp.route("/request_bulk_ingest", methods=["POST"]) # @secured(groups="HuBMAP-read") -def request_ingest(): +def request_bulk_ingest(): auth_tok = _auth_tok_from_environment() error_msgs = [] success_msgs = [] From c3b074d93df1f26e942ec8bc6dede1a002ec2f6f Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 11 Mar 2024 12:50:46 -0400 Subject: [PATCH 19/66] Adding submission_id to directory mapping error --- src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index d8a9528b..3a9ee902 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -453,7 +453,7 @@ def request_bulk_ingest(): LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiInputException as e: - error_msgs.append(str(e)) + error_msgs.append({"message": str(e), "submission_id": submission_id}) except ValueError as e: error_msgs.append(str(e)) except AirflowException as e: From 29636f4fca94fd209acca9c27c33e7529976233a Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 11 Mar 2024 12:57:36 -0400 Subject: [PATCH 20/66] Adding time delay to avoid duplicated error triggering multiple DAGs --- src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index 3a9ee902..90c25e5a 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -9,6 +9,7 @@ import pytz import yaml from cryptography.fernet import Fernet +from time import sleep from flask import request, Response @@ -421,6 +422,7 @@ def request_bulk_ingest(): # Produce one and only one run tz = pytz.timezone(config("core", "timezone")) execution_date = datetime.now(tz) + sleep(0.05) LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) run_id = "{}_{}_{}".format(submission_id, process, execution_date.isoformat()) From 3edc620a9d95f8105ec8cc341cc375ecb0ab7502 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 11 Mar 2024 13:18:42 -0400 Subject: [PATCH 21/66] Adding parameter on trigger_dag to not replace microseconds, so bulk datasets can be queued --- src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index 90c25e5a..a8c83c99 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -447,7 +447,7 @@ def request_bulk_ingest(): raise AirflowException("The request happened twice?") try: - dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) + dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date, replace_microseconds=False) except AirflowException as err: LOGGER.error(err) raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) From 70cc289d167b6c172c7e7467d3adce2a1d285798 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 19 Mar 2024 12:31:19 -0400 Subject: [PATCH 22/66] Updated version for tifffile requirement. --- src/ingest-pipeline/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/requirements.txt b/src/ingest-pipeline/requirements.txt index 64188a28..2f3a2c0a 100644 --- a/src/ingest-pipeline/requirements.txt +++ b/src/ingest-pipeline/requirements.txt @@ -4,7 +4,7 @@ hubmap-commons==2.0.14 prov==1.5.1 # pylibczi>=1.1.1 # tifffile==2020.12.8 -tifffile==2020.09.3 +tifffile==2021.11.2 xmltodict==0.13.0 pyimzml==1.5.2 apache-airflow[celery,crypto,postgres,redis,ssh,amazon]==2.5.3 From ee0935033dd8211997c2dcb3272bc337d920e2e6 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 20 Mar 2024 10:29:34 -0400 Subject: [PATCH 23/66] Updating response to have success messages along errored ones, so the UI can update every dataset that was sent to the Airflow. --- .../airflow/plugins/hubmap_api/endpoint.py | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index a8c83c99..8a65ef1d 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -148,6 +148,11 @@ def error(status, error): def bad_request(error): return HubmapApiResponse.error(HubmapApiResponse.STATUS_BAD_REQUEST, error) + @staticmethod + def bad_request_bulk(error, success): + return HubmapApiResponse.standard_response(HubmapApiResponse.STATUS_BAD_REQUEST, {"error": error, + "success": success}) + @staticmethod def not_found(error="Resource not found"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_NOT_FOUND, error) @@ -401,7 +406,8 @@ def request_bulk_ingest(): process = _get_required_string(item, "process") full_path = _get_required_string(item, "full_path") except HubmapApiInputException as e: - error_msgs.append("Must specify {} to request data be ingested".format(str(e))) + error_msgs.append({"message": "Must specify {} to request data be ingested".format(str(e)), + "submission_id": "not_found"}) continue process = ( @@ -411,7 +417,8 @@ def request_bulk_ingest(): try: dag_id = config("ingest_map", process) except AirflowConfigException: - error_msgs.append("{} is not a known ingestion process".format(process)) + error_msgs.append({"message": "{} is not a known ingestion process".format(process), + "submission_id": submission_id}) continue try: @@ -443,28 +450,32 @@ def request_bulk_ingest(): } if find_dag_runs(session, dag_id, run_id, execution_date): + error_msgs.append({"message": "run_id already exists {}".format(run_id), + "submission_id": submission_id}) # The run already happened?? - raise AirflowException("The request happened twice?") + continue try: dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date, replace_microseconds=False) except AirflowException as err: LOGGER.error(err) - raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) + error_msgs.append({"message": "Attempt to trigger run produced an error: {}".format(err), + "submission_id": submission_id}) + continue success_msgs.append({"ingest_id": ingest_id, "run_id": run_id, "submission_id": submission_id}) LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiInputException as e: error_msgs.append({"message": str(e), "submission_id": submission_id}) except ValueError as e: - error_msgs.append(str(e)) + error_msgs.append({"message": str(e), "submission_id": submission_id}) except AirflowException as e: - error_msgs.append(str(e)) + error_msgs.append({"message": str(e), "submission_id": submission_id}) except Exception as e: - error_msgs.append(str(e)) + error_msgs.append({"message": str(e), "submission_id": submission_id}) if error_msgs: - return HubmapApiResponse.bad_request(error_msgs) + return HubmapApiResponse.bad_request_bulk(error_msgs, success_msgs) if success_msgs: return HubmapApiResponse.success(success_msgs) return HubmapApiResponse.bad_request("Nothing to ingest") From d82f14af1cd257f5771c0cdd0cc76dfd665597a5 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 21 Mar 2024 13:43:27 -0400 Subject: [PATCH 24/66] Removing duplicated attribute from Upload validation. --- src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index ba8a440c..74c6f6dd 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -135,7 +135,6 @@ def run_validation(**kwargs): "coreuse": get_threads_resource("scan_and_begin_processing", "run_validation") }, ignore_deprecation=True, - extra_parameters={'coreuse': get_threads_resource('validate_upload', 'run_validation')}, globus_token=get_auth_tok(**kwargs), app_context=app_context, ) From a6cc77588889571f22b7db2fe787354c3539f72c Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 21 Mar 2024 13:44:10 -0400 Subject: [PATCH 25/66] Removing duplicated attribute from Upload validation. --- src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 74c6f6dd..fcbf02fe 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -16,7 +16,6 @@ HMDAG, get_auth_tok, get_preserve_scratch_resource, - get_threads_resource, get_queue_resource, get_soft_data_assaytype, make_send_status_msg_function, From 3d3182e38b8e99b98df450a8e92b910530b3695e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 21 Mar 2024 15:00:51 -0400 Subject: [PATCH 26/66] General: Make sure we use the same parameter name --- src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py | 2 +- src/ingest-pipeline/airflow/dags/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index 6e3b509c..4bc9bb50 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -235,7 +235,7 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "dataset_types_callable": get_dataset_type_organ_based, + "dataset_type_callable": get_dataset_type_organ_based, }, ) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 9d322c20..145204ac 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -753,7 +753,7 @@ def pythonop_send_create_dataset(**kwargs) -> str: for arg in ["parent_dataset_uuid_callable", "http_conn_id"]: assert arg in kwargs, "missing required argument {}".format(arg) - for arg_options in [["pipeline_shorthand", "dataset_types_callable"]]: + for arg_options in [["pipeline_shorthand", "dataset_type_callable"]]: assert any([arg in kwargs for arg in arg_options]) http_conn_id = kwargs["http_conn_id"] From 94577e856260cd68feec9fd86eec73872deef95a Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 25 Mar 2024 10:03:59 -0400 Subject: [PATCH 27/66] General: Update IVT branch --- src/ingest-pipeline/misc/tools/split_and_create.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index 6537b133..3b9a3670 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -345,8 +345,10 @@ def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): print(f"copy {old_path} to {dest_extras_path}") else: moved_path = dest_extras_path / old_path.name - print(f"""Probably already copied/moved {src_path} - to {moved_path} {"it exists" if moved_path.exists() else "missing file"}""") + print( + f"""Probably already copied/moved {src_path} + to {moved_path} {"it exists" if moved_path.exists() else "missing file"}""" + ) def apply_special_case_transformations( From 1fd7c45fbeef327dbe5c1c262fdf5b3c4f801629 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 25 Mar 2024 10:05:05 -0400 Subject: [PATCH 28/66] General: Update IVT branch --- src/ingest-pipeline/submodules/ingest-validation-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index c7b2ae7b..0ed60d63 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit c7b2ae7b93238a00c3918e950920f357d3329add +Subproject commit 0ed60d63617a8ae25d29ebf39b0f8485e4c7ab5a From 651c61551460c45872aca5f9d94d6ced6a775d88 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 25 Mar 2024 14:08:52 -0400 Subject: [PATCH 29/66] Adding new DAG for bulk processing (single parent reprocessing / multi_analysis). Commenting out duplicated code before removing completely. --- .../airflow/dags/bulk_process.py | 199 ++++++++++++++++++ .../airflow/dags/launch_multi_analysis.py | 6 +- 2 files changed, 202 insertions(+), 3 deletions(-) create mode 100644 src/ingest-pipeline/airflow/dags/bulk_process.py diff --git a/src/ingest-pipeline/airflow/dags/bulk_process.py b/src/ingest-pipeline/airflow/dags/bulk_process.py new file mode 100644 index 00000000..0474f3e2 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/bulk_process.py @@ -0,0 +1,199 @@ +import ast +from pprint import pprint +from datetime import datetime, timedelta + +from airflow.operators.python import PythonOperator +from airflow.exceptions import AirflowException +from airflow.configuration import conf as airflow_conf +from hubmap_operators.flex_multi_dag_run import FlexMultiDagRunOperator + +import utils + +from utils import ( + localized_assert_json_matches_schema as assert_json_matches_schema, + HMDAG, + get_queue_resource, + get_preserve_scratch_resource, + get_soft_data_assaytype, + get_auth_tok, +) + +from extra_utils import check_link_published_drvs + + +def get_uuid_for_error(**kwargs) -> str: + """ + Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise. + """ + return "" + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("launch_multi_analysis"), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), +} + + +with HMDAG( + "launch_multi_analysis", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": utils.get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("launch_multi_analysis"), + }, +) as dag: + + def check_one_uuid( + uuid: str, previous_version_uuid: str, avoid_previous_version: bool, **kwargs + ): + """ + Look up information on the given uuid or HuBMAP identifier. + Returns: + - the uuid, translated from an identifier if necessary + - data type(s) of the dataset + - local directory full path of the dataset + """ + print(f"Starting uuid {uuid}") + my_callable = lambda **kwargs: uuid + ds_rslt = utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) + if not ds_rslt: + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") + pprint(ds_rslt) + + for key in ["status", "uuid", "local_directory_full_path", "metadata"]: + assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" + + if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: + raise AirflowException(f"Dataset {uuid} is not QA or better") + + dt = ds_rslt["dataset_type"] + if isinstance(dt, str) and dt.startswith("[") and dt.endswith("]"): + dt = ast.literal_eval(dt) + print(f"parsed dt: {dt}") + + if not previous_version_uuid and not avoid_previous_version: + previous_status, previous_uuid = check_link_published_drvs( + uuid, get_auth_tok(**kwargs) + ) + if previous_status: + previous_version_uuid = previous_uuid + + return ( + ds_rslt["uuid"], + dt, + ds_rslt["local_directory_full_path"], + ds_rslt["metadata"], + previous_version_uuid, + ) + + def check_uuids(**kwargs): + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) + + try: + assert_json_matches_schema(kwargs["dag_run"].conf, "launch_multi_metadata_schema.yml") + except AssertionError as e: + print("invalid metadata follows:") + pprint(kwargs["dag_run"].conf) + raise + + uuid_l = kwargs["dag_run"].conf["uuid_list"] + collection_type = kwargs["dag_run"].conf["collection_type"] + prev_version_uuid = kwargs["dag_run"].conf.get("previous_version_uuid", None) + avoid_previous_version = kwargs["dag_run"].conf.get("avoid_previous_version_find", False) + filtered_uuid_l = [] + for uuid in uuid_l: + uuid, dt, lz_path, metadata, prev_version_uuid = check_one_uuid( + uuid, prev_version_uuid, avoid_previous_version, **kwargs + ) + soft_data_assaytype = get_soft_data_assaytype(uuid, **kwargs) + print(f"Got {soft_data_assaytype} as the soft_data_assaytype for UUID {uuid}") + filtered_uuid_l.append( + { + "uuid": uuid, + "dataset_type": soft_data_assaytype, + "path": lz_path, + "metadata": metadata, + "prev_version_uuid": prev_version_uuid, + } + ) + # if prev_version_uuid is not None: + # prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, **kwargs)[0] + # print(f'Finished uuid {uuid}') + print(f"filtered data types: {soft_data_assaytype}") + print(f"filtered paths: {lz_path}") + print(f"filtered uuids: {uuid}") + print(f"filtered previous_version_uuid: {prev_version_uuid}") + kwargs["ti"].xcom_push(key="collectiontype", value=collection_type) + kwargs["ti"].xcom_push(key="uuids", value=filtered_uuid_l) + + check_uuids_t = PythonOperator( + task_id="check_uuids", + python_callable=check_uuids, + provide_context=True, + op_kwargs={ + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + def flex_maybe_spawn(**kwargs): + """ + This is a generator which returns appropriate DagRunOrders + """ + print("kwargs:") + pprint(kwargs) + print("dag_run conf:") + ctx = kwargs["dag_run"].conf + pprint(ctx) + collectiontype = kwargs["ti"].xcom_pull(key="collectiontype", task_ids="check_uuids") + for uuid in kwargs["ti"].xcom_pull(key="uuids", task_ids="check_uuids"): + lz_path = uuid.get("path") + parent_submission = uuid.get("uuid") + prev_version_uuid = uuid.get("prev_version_uuid") + metadata = uuid.get("metadata") + assay_type = uuid.get("dataset_type") + print("collectiontype: <{}>, assay_type: <{}>".format(collectiontype, assay_type)) + print(f"uuid: {uuid}") + print("lz_paths:") + pprint(lz_path) + print(f"previous version uuid: {prev_version_uuid}") + payload = { + "ingest_id": kwargs["run_id"], + "crypt_auth_tok": kwargs["crypt_auth_tok"], + "parent_lz_path": lz_path, + "parent_submission_id": parent_submission, + "previous_version_uuid": prev_version_uuid, + "metadata": metadata, + "dag_provenance_list": utils.get_git_provenance_list(__file__), + } + for next_dag in utils.downstream_workflow_iter(collectiontype, assay_type): + yield next_dag, payload + + + t_maybe_spawn = FlexMultiDagRunOperator( + task_id="flex_maybe_spawn", + dag=dag, + trigger_dag_id="launch_multi_analysis", + python_callable=flex_maybe_spawn, + op_kwargs={ + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + check_uuids_t >> t_maybe_spawn diff --git a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py index 1018fa14..ae5ca83c 100644 --- a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py +++ b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py @@ -127,9 +127,9 @@ def check_uuids(**kwargs): filtered_path_l.append(lz_path) filtered_uuid_l.append(uuid) filtered_md_l.append(metadata) - if prev_version_uuid is not None: - prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, - **kwargs)[0] + # if prev_version_uuid is not None: + # prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, + # **kwargs)[0] # print(f'Finished uuid {uuid}') print(f"filtered data types: {filtered_data_types}") print(f"filtered paths: {filtered_path_l}") From 3b9ed7a63ea13f7b49a7519ad69321e07864a275 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 25 Mar 2024 14:26:34 -0400 Subject: [PATCH 30/66] Refactor testing to use standard PythonOperator instead of FlexMultiDagRun since it is not actually used as is. --- src/ingest-pipeline/airflow/dags/reorganize_upload.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/reorganize_upload.py b/src/ingest-pipeline/airflow/dags/reorganize_upload.py index 7568a112..6b4263ee 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_upload.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_upload.py @@ -376,11 +376,16 @@ def flex_maybe_multiassay_spawn(**kwargs): trigger_dag(dag_id, run_id, conf, execution_date=execution_date) return [] - t_maybe_multiassay_spawn = FlexMultiDagRunOperator( + # t_maybe_multiassay_spawn = FlexMultiDagRunOperator( + # task_id="flex_maybe_spawn", + # dag=dag, + # trigger_dag_id="scan_and_begin_processing", + # python_callable=flex_maybe_multiassay_spawn, + # ) + t_maybe_multiassay_spawn = PythonOperator( task_id="flex_maybe_spawn", - dag=dag, - trigger_dag_id="scan_and_begin_processing", python_callable=flex_maybe_multiassay_spawn, + provide_context=True, ) def _get_upload_uuid(**kwargs): From da269e845898bc5edde553f36fd97934e430bcca Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 25 Mar 2024 14:29:02 -0400 Subject: [PATCH 31/66] Missing DAG name. --- src/ingest-pipeline/airflow/dags/bulk_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_process.py b/src/ingest-pipeline/airflow/dags/bulk_process.py index 0474f3e2..9b002b29 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_process.py +++ b/src/ingest-pipeline/airflow/dags/bulk_process.py @@ -44,7 +44,7 @@ def get_uuid_for_error(**kwargs) -> str: with HMDAG( - "launch_multi_analysis", + "bulk_process", schedule_interval=None, is_paused_upon_creation=False, default_args=default_args, From 7edafd767fc328d983b6881dad63aee19b223e81 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Mon, 25 Mar 2024 15:37:52 -0400 Subject: [PATCH 32/66] Standard return for Operator. --- src/ingest-pipeline/airflow/dags/reorganize_upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/reorganize_upload.py b/src/ingest-pipeline/airflow/dags/reorganize_upload.py index 6b4263ee..a5a807c7 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_upload.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_upload.py @@ -374,7 +374,7 @@ def flex_maybe_multiassay_spawn(**kwargs): time.sleep(1) print(f"Triggering reorganization for UUID {uuid}") trigger_dag(dag_id, run_id, conf, execution_date=execution_date) - return [] + return 0 # t_maybe_multiassay_spawn = FlexMultiDagRunOperator( # task_id="flex_maybe_spawn", From a2f62264bf982586d0213bb76fe469de8d1b8b67 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Wed, 27 Mar 2024 11:52:17 -0400 Subject: [PATCH 33/66] DAGs: Change SnapATAC to ArchR for the pipeline shorthand. --- .../airflow/dags/sc_atac_seq.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py index 2962bad7..f7a88e74 100644 --- a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py +++ b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py @@ -51,22 +51,23 @@ def generate_atac_seq_dag(params: SequencingDagParameters) -> DAG: "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), } - with HMDAG(params.dag_id, - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - "tmp_dir_path": get_tmp_dir_path, - "preserve_scratch": get_preserve_scratch_resource(params.dag_id), - }) as dag: + with HMDAG( + params.dag_id, + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource(params.dag_id), + }, + ) as dag: cwl_workflows = get_absolute_workflows( Path("sc-atac-seq-pipeline", "sc_atac_seq_prep_process_analyze.cwl"), Path("portal-containers", "scatac-csv-to-arrow.cwl"), ) def build_dataset_name(**kwargs): - return inner_build_dataset_name(dag.dag_id, params.pipeline_name, - **kwargs) + return inner_build_dataset_name(dag.dag_id, params.pipeline_name, **kwargs) prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") @@ -174,7 +175,7 @@ def build_cwltool_cmd2(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "pipeline_shorthand": "SnapATAC" + "pipeline_shorthand": "ArchR", }, ) @@ -233,6 +234,7 @@ def build_cwltool_cmd2(**kwargs): return dag + atacseq_dag_data: List[SequencingDagParameters] = [ SequencingDagParameters( dag_id="sc_atac_seq_sci", From 232c4fb25d16a530cd99305d6eee9e95f8607d96 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 28 Mar 2024 13:10:43 -0400 Subject: [PATCH 34/66] DAG for multiome processing --- src/ingest-pipeline/airflow/dags/multiome.py | 336 +++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 src/ingest-pipeline/airflow/dags/multiome.py diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py new file mode 100644 index 00000000..e83d08ff --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -0,0 +1,336 @@ +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator +from hubmap_operators.common_operators import ( + CleanupTmpDirOperator, + CreateTmpDirOperator, + JoinOperator, + LogInfoOperator, + MoveDataOperator, + SetDatasetProcessingOperator, +) + +import utils +from utils import ( + SequencingDagParameters, + get_absolute_workflows, + get_cwltool_base_cmd, + get_dataset_uuid, + get_parent_dataset_uuids_list, + get_parent_data_dirs_list, + build_dataset_name as inner_build_dataset_name, + get_previous_revision_uuid, + get_uuid_for_error, + join_quote_command_str, + make_send_status_msg_function, + get_tmp_dir_path, + pythonop_get_dataset_state, + HMDAG, + get_queue_resource, + get_threads_resource, + get_preserve_scratch_resource, +) + + +def generate_multiome_dag(params: SequencingDagParameters) -> DAG: + default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource(params.dag_id), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), + } + + with HMDAG(params.dag_id, + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource(params.dag_id), + }) as dag: + + cwl_workflows = get_absolute_workflows( + Path("salmon-rnaseq", "pipeline.cwl"), + Path("azimuth-annotate", "pipeline.cwl"), + Path("portal-containers", "mudata-to-ui.cwl"), + ) + + def build_dataset_name(**kwargs): + return inner_build_dataset_name(dag.dag_id, params.pipeline_name, **kwargs) + + prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") + + prepare_cwl2 = DummyOperator(task_id="prepare_cwl2") + + prepare_cwl3 = DummyOperator(task_id="prepare_cwl3") + + def build_cwltool_cmd1(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + data_dirs = get_parent_data_dirs_list(**kwargs) + print("data_dirs: ", data_dirs) + + assert len(data_dirs) == 1 + + data_dir = data_dirs[0] + + assay_type = dag.dag_id.split('_')[0] + assay_flags = {'rna':{'10x':'10x_v3_sn', 'snareseq':'snareseq'}, 'atac':{'snareseq':'snareseq'}} + + command = [ + *get_cwltool_base_cmd(tmpdir), + "--relax-path-checks", + "--outdir", + tmpdir / "cwl_out", + "--parallel", + cwl_workflows[0], + "--assay", + params.assay, + "--threads", + get_threads_resource(dag.dag_id), + ] + + for component in ["RNA", "ATAC"]: + command.append(f"--fastq_dir_{component.lower()}") + command.append(data_dir / Path(f"raw/fastq/{component}")) + command.append(f"--assay_{component.lower()}") + command.append(assay_flags[component.lower()][assay_type]) + + return join_quote_command_str(command) + + def build_cwltool_cmd2(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + # get organ type + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=get_dataset_uuid, + **kwargs + ) + + organ_list = list(set(ds_rslt['organs'])) + organ_code = organ_list[0] if len(organ_list) == 1 else 'multi' + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[1], + "--reference", + organ_code, + "--matrix", + "expr.h5ad", + "--secondary-analysis-matrix", + "secondary_analysis.h5ad", + "--assay", + params.assay + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd3(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[2], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + t_build_cmd1 = PythonOperator( + task_id="build_cmd1", + python_callable=build_cwltool_cmd1, + provide_context=True, + ) + + t_build_cmd2 = PythonOperator( + task_id="build_cmd2", + python_callable=build_cwltool_cmd2, + provide_context=True, + ) + + t_build_cmd3 = PythonOperator( + task_id="build_cmd3", + python_callable=build_cwltool_cmd3, + provide_context=True, + ) + + t_pipeline_exec = BashOperator( + task_id="pipeline_exec", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_pipeline_exec_azimuth_annotate = BashOperator( + task_id="pipeline_exec_azimuth_annotate", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd "$tmp_dir"/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui = BashOperator( + task_id="convert_for_ui", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl1 = BranchPythonOperator( + task_id="maybe_keep_cwl1", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl2", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec", + }, + ) + + t_maybe_keep_cwl2 = BranchPythonOperator( + task_id="maybe_keep_cwl2", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl3", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_azimuth_annotate", + }, + ) + + t_maybe_keep_cwl3 = BranchPythonOperator( + task_id="maybe_keep_cwl3", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl4", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui", + }, + ) + + t_send_create_dataset = PythonOperator( + task_id="send_create_dataset", + python_callable=utils.pythonop_send_create_dataset, + provide_context=True, + op_kwargs={ + "parent_dataset_uuid_callable": get_parent_dataset_uuids_list, + "previous_revision_uuid_callable": get_previous_revision_uuid, + "http_conn_id": "ingest_api_connection", + "dataset_name_callable": build_dataset_name, + "pipeline_shorthand": "Multiome" + }, + ) + + t_set_dataset_error = PythonOperator( + task_id="set_dataset_error", + python_callable=utils.pythonop_set_dataset_state, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "dataset_uuid_callable": get_dataset_uuid, + "ds_state": "Error", + "message": f"An error occurred in {params.pipeline_name}", + }, + ) + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=["pipeline_exec", + "pipeline_exec_azimuth_annotate", + "move_data", + "convert_for_ui",], + cwl_workflows=cwl_workflows, + ) + + t_send_status = PythonOperator( + task_id="send_status_msg", + python_callable=send_status_msg, + provide_context=True, + ) + + t_log_info = LogInfoOperator(task_id="log_info") + t_join = JoinOperator(task_id="join") + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") + t_set_dataset_processing = SetDatasetProcessingOperator(task_id="set_dataset_processing") + t_move_data = MoveDataOperator(task_id="move_data") + + ( + t_log_info + >> t_create_tmpdir + >> t_send_create_dataset + >> t_set_dataset_processing + >> prepare_cwl1 + >> t_build_cmd1 + >> t_pipeline_exec + >> t_maybe_keep_cwl1 + >> prepare_cwl2 + >> t_build_cmd2 + >> t_pipeline_exec_azimuth_annotate + >> t_maybe_keep_cwl2 + >> prepare_cwl3 + >> t_build_cmd3 + >> t_convert_for_ui + >> t_maybe_keep_cwl3 + >> t_move_data + >> t_send_status + >> t_join + ) + t_maybe_keep_cwl1 >> t_set_dataset_error + t_maybe_keep_cwl2 >> t_set_dataset_error + t_maybe_keep_cwl3 >> t_set_dataset_error + t_set_dataset_error >> t_join + t_join >> t_cleanup_tmpdir + + return dag + + +def get_multiome_dag_params(assay: str) -> SequencingDagParameters: + # TODO: restructure assay names, pipeline names, etc.; this repetition + # is for backward compatibility + return SequencingDagParameters( + dag_id=f"multiome_{assay}", + pipeline_name=f"multiome-{assay}", + assay=assay, + ) + + +multiome_dag_params: List[SequencingDagParameters] = [ +# get_multiome_dag_params("10x"), + get_multiome_dag_params("snareseq"), +] + +for params in multiome_dag_params: + globals()[params.dag_id] = generate_multiome_dag(params) From f313a42c017ddb892caa82807aea7aeea0b81a1a Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 28 Mar 2024 13:16:49 -0400 Subject: [PATCH 35/66] Adding multiome-rna-atac-pipeline at v1.0 --- .gitmodules | 3 +++ .../airflow/dags/cwl/multiome-rna-atac-pipeline | 1 + 2 files changed, 4 insertions(+) create mode 160000 src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline diff --git a/.gitmodules b/.gitmodules index 31598e89..154bd101 100644 --- a/.gitmodules +++ b/.gitmodules @@ -58,3 +58,6 @@ [submodule "src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline"] path = src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline url = https://github.com/hubmapconsortium/mibi-pipeline +[submodule "src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline"] + path = src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline + url = https://github.com/hubmapconsortium/multiome-rna-atac-pipeline diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline new file mode 160000 index 00000000..9a512105 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -0,0 +1 @@ +Subproject commit 9a5121057c63d976b3bb25ffa07f4cfa20bff33c From c83b5d2a6538bc7c3b5ffc35f49c645413fe34c7 Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Thu, 28 Mar 2024 14:29:26 -0400 Subject: [PATCH 36/66] Use MultiomeSequencingDagParameters --- src/ingest-pipeline/airflow/dags/multiome.py | 44 ++++++++++++-------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index e83d08ff..c84c3007 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -1,3 +1,4 @@ +from collections import namedtuple from datetime import datetime, timedelta from pathlib import Path from typing import List @@ -36,8 +37,17 @@ get_preserve_scratch_resource, ) +MultiomeSequencingDagParameters = namedtuple( + "MultiomeSequencingDagParameters", + [ + "dag_id", + "pipeline_name", + "assay_rna", + "assay_atac", + ] +) -def generate_multiome_dag(params: SequencingDagParameters) -> DAG: +def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: default_args = { "owner": "hubmap", "depends_on_past": False, @@ -88,9 +98,6 @@ def build_cwltool_cmd1(**kwargs): data_dir = data_dirs[0] - assay_type = dag.dag_id.split('_')[0] - assay_flags = {'rna':{'10x':'10x_v3_sn', 'snareseq':'snareseq'}, 'atac':{'snareseq':'snareseq'}} - command = [ *get_cwltool_base_cmd(tmpdir), "--relax-path-checks", @@ -98,9 +105,9 @@ def build_cwltool_cmd1(**kwargs): tmpdir / "cwl_out", "--parallel", cwl_workflows[0], - "--assay", - params.assay, - "--threads", + "--threads_rna", + get_threads_resource(dag.dag_id), + "--threads_atac", get_threads_resource(dag.dag_id), ] @@ -108,7 +115,7 @@ def build_cwltool_cmd1(**kwargs): command.append(f"--fastq_dir_{component.lower()}") command.append(data_dir / Path(f"raw/fastq/{component}")) command.append(f"--assay_{component.lower()}") - command.append(assay_flags[component.lower()][assay_type]) + command.append(getattr(params, f"assay_{component.lower()}")) return join_quote_command_str(command) @@ -317,19 +324,22 @@ def build_cwltool_cmd3(**kwargs): return dag -def get_multiome_dag_params(assay: str) -> SequencingDagParameters: - # TODO: restructure assay names, pipeline names, etc.; this repetition - # is for backward compatibility - return SequencingDagParameters( +def get_simple_multiome_dag_params(assay: str) -> MultiomeSequencingDagParameters: + return MultiomeSequencingDagParameters( dag_id=f"multiome_{assay}", pipeline_name=f"multiome-{assay}", - assay=assay, + assay_rna=assay, + assay_atac=assay, ) - -multiome_dag_params: List[SequencingDagParameters] = [ -# get_multiome_dag_params("10x"), - get_multiome_dag_params("snareseq"), +multiome_dag_params: List[MultiomeSequencingDagParameters] = [ + # MultiomeSequencingDagParameters( + # dag_id="multiome_10x", + # pipeline_name="multiome-10x", + # assay_rna="10x_v3_sn", + # assay_atac="multiome_10x", + # ) + get_simple_multiome_dag_params("snareseq"), ] for params in multiome_dag_params: From 2793615b2efd9ac0a3ec9c251d634af4b50483d9 Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Thu, 28 Mar 2024 14:29:59 -0400 Subject: [PATCH 37/66] black --- src/ingest-pipeline/airflow/dags/multiome.py | 46 ++++++++++---------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index c84c3007..ebedc900 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -44,9 +44,10 @@ "pipeline_name", "assay_rna", "assay_atac", - ] + ], ) + def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: default_args = { "owner": "hubmap", @@ -62,15 +63,16 @@ def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), } - with HMDAG(params.dag_id, - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - "tmp_dir_path": get_tmp_dir_path, - "preserve_scratch": get_preserve_scratch_resource(params.dag_id), - }) as dag: - + with HMDAG( + params.dag_id, + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource(params.dag_id), + }, + ) as dag: cwl_workflows = get_absolute_workflows( Path("salmon-rnaseq", "pipeline.cwl"), Path("azimuth-annotate", "pipeline.cwl"), @@ -125,13 +127,10 @@ def build_cwltool_cmd2(**kwargs): print("tmpdir: ", tmpdir) # get organ type - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=get_dataset_uuid, - **kwargs - ) + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=get_dataset_uuid, **kwargs) - organ_list = list(set(ds_rslt['organs'])) - organ_code = organ_list[0] if len(organ_list) == 1 else 'multi' + organ_list = list(set(ds_rslt["organs"])) + organ_code = organ_list[0] if len(organ_list) == 1 else "multi" command = [ *get_cwltool_base_cmd(tmpdir), @@ -143,7 +142,7 @@ def build_cwltool_cmd2(**kwargs): "--secondary-analysis-matrix", "secondary_analysis.h5ad", "--assay", - params.assay + params.assay, ] return join_quote_command_str(command) @@ -256,7 +255,7 @@ def build_cwltool_cmd3(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "pipeline_shorthand": "Multiome" + "pipeline_shorthand": "Multiome", }, ) @@ -274,10 +273,12 @@ def build_cwltool_cmd3(**kwargs): send_status_msg = make_send_status_msg_function( dag_file=__file__, - retcode_ops=["pipeline_exec", - "pipeline_exec_azimuth_annotate", - "move_data", - "convert_for_ui",], + retcode_ops=[ + "pipeline_exec", + "pipeline_exec_azimuth_annotate", + "move_data", + "convert_for_ui", + ], cwl_workflows=cwl_workflows, ) @@ -332,6 +333,7 @@ def get_simple_multiome_dag_params(assay: str) -> MultiomeSequencingDagParameter assay_atac=assay, ) + multiome_dag_params: List[MultiomeSequencingDagParameters] = [ # MultiomeSequencingDagParameters( # dag_id="multiome_10x", From 697bb5901bead84382b73972ea5fd650869cce56 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 28 Mar 2024 14:30:49 -0400 Subject: [PATCH 38/66] Fixing pipeline directory --- src/ingest-pipeline/airflow/dags/multiome.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index ebedc900..41477620 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -74,7 +74,7 @@ def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: }, ) as dag: cwl_workflows = get_absolute_workflows( - Path("salmon-rnaseq", "pipeline.cwl"), + Path("multiome-rna-atac-pipeline", "pipeline.cwl"), Path("azimuth-annotate", "pipeline.cwl"), Path("portal-containers", "mudata-to-ui.cwl"), ) From 104b449d5d1a641a422ae14a8c6ed90540370df2 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 28 Mar 2024 15:40:10 -0400 Subject: [PATCH 39/66] General: Add mapping entry for new multiome snareseq --- src/ingest-pipeline/airflow/dags/workflow_map.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 43a1b558..83838f20 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -119,4 +119,7 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'visium-no-probes' 'workflow': 'visium_no_probes' + - 'collection_type': '.*' + 'assay_type': 'multiome-snare-seq2' + 'workflow': 'multiome_snareseq' From 2fb08232dc0759d18a4f9529b3361c408d6932a6 Mon Sep 17 00:00:00 2001 From: Nikolay Akhmetov Date: Thu, 28 Mar 2024 15:59:24 -0400 Subject: [PATCH 40/66] Update `portal-containers` --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 5faabafd..e9d33f75 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 5faabafdcc37bc5e55d8c2bf0228f641aa83d765 +Subproject commit e9d33f7549da31e0cf47f4b524da0fa96285d6ab From 2c523858e6fd0e280fa33e6bbc44bd8b33f4f5f3 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 28 Mar 2024 16:23:33 -0400 Subject: [PATCH 41/66] DAG: Update pipeline_shorthand --- src/ingest-pipeline/airflow/dags/multiome.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index 41477620..f3c3c766 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -255,7 +255,7 @@ def build_cwltool_cmd3(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "pipeline_shorthand": "Multiome", + "pipeline_shorthand": "Salmon + ArchR + Muon", }, ) From 2143257353fc9cef0c6bf4ad916b9fecdcb11357 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 1 Apr 2024 09:50:24 -0400 Subject: [PATCH 42/66] General: Update azimuth submodule --- src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate index a5b2aa92..6694e4ba 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate +++ b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate @@ -1 +1 @@ -Subproject commit a5b2aa9271c476d5c2c2b41dbf9e0a32dcc003e7 +Subproject commit 6694e4ba1c14688664b79b3bf36ad86c8069970c From 2e625e7898e651a86361d3aee57464a13be789a9 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 1 Apr 2024 11:53:05 -0400 Subject: [PATCH 43/66] General: Modify workflow_map to represent snareseq2 correctly --- src/ingest-pipeline/airflow/dags/workflow_map.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 83838f20..a3a4d3ee 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -119,7 +119,10 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'visium-no-probes' 'workflow': 'visium_no_probes' - - 'collection_type': '.*' +# - 'collection_type': '.*' # disconnected because we expect these to be multis +# 'assay_type': 'multiome-snare-seq2' +# 'workflow': 'multiome_snareseq' + - 'collection_type': 'multiome_snare-seq2_collection' 'assay_type': 'multiome-snare-seq2' 'workflow': 'multiome_snareseq' From e2ce1640172afad1ab075a9476083f6840b75f47 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Tue, 2 Apr 2024 10:08:40 -0400 Subject: [PATCH 44/66] Handling multiple input data directories --- src/ingest-pipeline/airflow/dags/multiome.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index f3c3c766..d2a09e37 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -96,10 +96,6 @@ def build_cwltool_cmd1(**kwargs): data_dirs = get_parent_data_dirs_list(**kwargs) print("data_dirs: ", data_dirs) - assert len(data_dirs) == 1 - - data_dir = data_dirs[0] - command = [ *get_cwltool_base_cmd(tmpdir), "--relax-path-checks", @@ -114,10 +110,11 @@ def build_cwltool_cmd1(**kwargs): ] for component in ["RNA", "ATAC"]: - command.append(f"--fastq_dir_{component.lower()}") - command.append(data_dir / Path(f"raw/fastq/{component}")) command.append(f"--assay_{component.lower()}") command.append(getattr(params, f"assay_{component.lower()}")) + for data_dir in data_dirs: + command.append(f"--fastq_dir_{component.lower()}") + command.append(data_dir / Path(f"raw/fastq/{component}")) return join_quote_command_str(command) From 81c1ba12e82bc389834756c8f93fb869f22c5995 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Wed, 3 Apr 2024 09:31:05 -0400 Subject: [PATCH 45/66] Updating secondary analysis file name and params field --- src/ingest-pipeline/airflow/dags/multiome.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index d2a09e37..3a4b1d61 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -137,9 +137,9 @@ def build_cwltool_cmd2(**kwargs): "--matrix", "expr.h5ad", "--secondary-analysis-matrix", - "secondary_analysis.h5ad", + "secondary_analysis.h5mu", "--assay", - params.assay, + params.assay_rna, ] return join_quote_command_str(command) From 3418f6b3c8d2103cb442d27d8589d0e798d8116a Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Wed, 3 Apr 2024 13:05:25 -0400 Subject: [PATCH 46/66] Updating raw matrix filename --- src/ingest-pipeline/airflow/dags/multiome.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index 3a4b1d61..548b8ea1 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -135,7 +135,7 @@ def build_cwltool_cmd2(**kwargs): "--reference", organ_code, "--matrix", - "expr.h5ad", + "mudata_raw.h5mu", "--secondary-analysis-matrix", "secondary_analysis.h5mu", "--assay", From 83a4cd06382edded326cebdf985f14a70688a2fc Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Wed, 3 Apr 2024 13:27:23 -0400 Subject: [PATCH 47/66] Fixing next_op for dag structure --- src/ingest-pipeline/airflow/dags/multiome.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index 548b8ea1..f768192d 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -237,7 +237,7 @@ def build_cwltool_cmd3(**kwargs): python_callable=utils.pythonop_maybe_keep, provide_context=True, op_kwargs={ - "next_op": "prepare_cwl4", + "next_op": "move_data", "bail_op": "set_dataset_error", "test_op": "convert_for_ui", }, From 0829460f148115c68ab2aca0b30c970716e6600b Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Wed, 3 Apr 2024 16:00:57 -0400 Subject: [PATCH 48/66] Bump multiome-rna-atac-pipeline to v1.0.1 --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 9a512105..e7e82a0a 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 9a5121057c63d976b3bb25ffa07f4cfa20bff33c +Subproject commit e7e82a0a9aee5cb530ffd499e79dbac556b1f926 From e9b2a4d9d1cfd42eb3f1c3a31d361fb8196987cc Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 4 Apr 2024 11:31:15 -0400 Subject: [PATCH 49/66] Bump multiome-rna-atac-pipeline to v1.0.2 --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index e7e82a0a..3896878c 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit e7e82a0a9aee5cb530ffd499e79dbac556b1f926 +Subproject commit 3896878c5a86265c9d080267d1c77c49e1ce50df From 1babc8fb222045b2e6d9b3b7e407389a891090f5 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 4 Apr 2024 14:11:02 -0400 Subject: [PATCH 50/66] Bump multiome-rna-atac-pipeline to v1.1. --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 3896878c..7ede3732 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 3896878c5a86265c9d080267d1c77c49e1ce50df +Subproject commit 7ede37324db0b8f8ecaab66fa15b968964358e39 From e1ac98fa31a42804add252ec796c5bc19fa43229 Mon Sep 17 00:00:00 2001 From: Sean Donahue Date: Thu, 4 Apr 2024 14:13:45 -0400 Subject: [PATCH 51/66] Adding support for 10x_multiome to dag --- src/ingest-pipeline/airflow/dags/multiome.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index f768192d..9e4a0925 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -332,12 +332,12 @@ def get_simple_multiome_dag_params(assay: str) -> MultiomeSequencingDagParameter multiome_dag_params: List[MultiomeSequencingDagParameters] = [ - # MultiomeSequencingDagParameters( - # dag_id="multiome_10x", - # pipeline_name="multiome-10x", - # assay_rna="10x_v3_sn", - # assay_atac="multiome_10x", - # ) + MultiomeSequencingDagParameters( + dag_id="multiome_10x", + pipeline_name="multiome-10x", + assay_rna="10x_v3_sn", + assay_atac="multiome_10x", + ), get_simple_multiome_dag_params("snareseq"), ] From 1c3d6ef4d80325cc7e60244cead44ef8d63b6ac3 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 4 Apr 2024 14:23:15 -0400 Subject: [PATCH 52/66] Bump portal-containers with SNARE multiome support --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index e9d33f75..31c838c0 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit e9d33f7549da31e0cf47f4b524da0fa96285d6ab +Subproject commit 31c838c0481c1c433d698ab12208b7fa5cef275b From b9bc9a8b6d7e291e3f7f9ff6969b9260c7178125 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 4 Apr 2024 15:53:15 -0400 Subject: [PATCH 53/66] General: Allow metadata rebuilding for datasets in Submitted status. --- .../dags/multiassay_component_metadata.py | 11 +- .../dags/rebuild_primary_dataset_metadata.py | 168 ++++++++++-------- 2 files changed, 99 insertions(+), 80 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py index 567a009f..7d6d2b70 100644 --- a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py +++ b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py @@ -90,7 +90,7 @@ def check_one_uuid(uuid, **kwargs): for key in ["status", "uuid", "local_directory_full_path", "metadata", "dataset_type"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: + if not ds_rslt["status"] in ["New", "Error", "QA", "Published", "Submitted"]: raise AirflowException(f"Dataset {uuid} is not QA or better") return ( @@ -230,4 +230,11 @@ def wrapped_send_status_msg(**kwargs): t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir") - t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir + ( + t_check_uuids + >> t_create_tmpdir + >> t_run_md_extract + >> t_md_consistency_tests + >> t_send_status + >> t_cleanup_tmpdir + ) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py index 17333764..cdb63ac3 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py @@ -36,38 +36,40 @@ def get_uuid_for_error(**kwargs): def get_dataset_uuid(**kwargs): - return kwargs['dag_run'].conf['uuid'] + return kwargs["dag_run"].conf["uuid"] def get_dataset_lz_path(**kwargs): - ctx = kwargs['dag_run'].conf - return ctx['lz_path'] + ctx = kwargs["dag_run"].conf + return ctx["lz_path"] default_args = { - 'owner': 'hubmap', - 'depends_on_past': False, - 'start_date': datetime(2019, 1, 1), - 'email': ['joel.welling@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - 'xcom_push': True, - 'queue': get_queue_resource('rebuild_metadata'), - 'on_failure_callback': create_dataset_state_error_callback(get_uuid_for_error) + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("rebuild_metadata"), + "on_failure_callback": create_dataset_state_error_callback(get_uuid_for_error), } -with HMDAG('rebuild_primary_dataset_metadata', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('rebuild_metadata') - }) as dag: +with HMDAG( + "rebuild_primary_dataset_metadata", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + }, +) as dag: - t_create_tmpdir = CreateTmpDirOperator(task_id='create_temp_dir') + t_create_tmpdir = CreateTmpDirOperator(task_id="create_temp_dir") def check_one_uuid(uuid, **kwargs): """ @@ -77,54 +79,56 @@ def check_one_uuid(uuid, **kwargs): - data type(s) of the dataset - local directory full path of the dataset """ - print(f'Starting uuid {uuid}') + print(f"Starting uuid {uuid}") my_callable = lambda **kwargs: uuid ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) if not ds_rslt: - raise AirflowException(f'Invalid uuid/doi for group: {uuid}') - print('ds_rslt:') + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") pprint(ds_rslt) - for key in ['status', 'uuid', 'local_directory_full_path', 'metadata']: + for key in ["status", "uuid", "local_directory_full_path", "metadata"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if not ds_rslt['status'] in ['New', 'Error', 'QA', 'Published']: - raise AirflowException(f'Dataset {uuid} is not QA or better') + if not ds_rslt["status"] in ["New", "Error", "QA", "Published", "Submitted"]: + raise AirflowException(f"Dataset {uuid} is not QA or better") - return (ds_rslt['uuid'], ds_rslt['local_directory_full_path'], - ds_rslt['metadata']) + return (ds_rslt["uuid"], ds_rslt["local_directory_full_path"], ds_rslt["metadata"]) def check_uuids(**kwargs): - print('dag_run conf follows:') - pprint(kwargs['dag_run'].conf) + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) try: - assert_json_matches_schema(kwargs['dag_run'].conf, - 'launch_checksums_metadata_schema.yml') + assert_json_matches_schema( + kwargs["dag_run"].conf, "launch_checksums_metadata_schema.yml" + ) except AssertionError as e: - print('invalid metadata follows:') - pprint(kwargs['dag_run'].conf) + print("invalid metadata follows:") + pprint(kwargs["dag_run"].conf) raise - uuid, lz_path, metadata = check_one_uuid(kwargs['dag_run'].conf['uuid'], **kwargs) - print(f'filtered metadata: {metadata}') - print(f'filtered paths: {lz_path}') - kwargs['dag_run'].conf['lz_path'] = lz_path - kwargs['dag_run'].conf['src_path'] = airflow_conf.as_dict()['connections']['src_path'].strip("'") - + uuid, lz_path, metadata = check_one_uuid(kwargs["dag_run"].conf["uuid"], **kwargs) + print(f"filtered metadata: {metadata}") + print(f"filtered paths: {lz_path}") + kwargs["dag_run"].conf["lz_path"] = lz_path + kwargs["dag_run"].conf["src_path"] = airflow_conf.as_dict()["connections"][ + "src_path" + ].strip("'") t_check_uuids = PythonOperator( - task_id='check_uuids', + task_id="check_uuids", python_callable=check_uuids, provide_context=True, op_kwargs={ - 'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - } + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, ) t_run_md_extract = BashOperator( - task_id='run_md_extract', + task_id="run_md_extract", bash_command=""" \ lz_dir="{{dag_run.conf.lz_path}}" ; \ src_dir="{{dag_run.conf.src_path}}/md" ; \ @@ -141,71 +145,79 @@ def check_uuids(**kwargs): fi """, env={ - 'AUTH_TOK': ( + "AUTH_TOK": ( utils.get_auth_tok( **{ - 'crypt_auth_tok': utils.encrypt_tok( - airflow_conf.as_dict()['connections']['APP_CLIENT_SECRET']).decode() + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode() } ) ), - 'PYTHON_EXE': os.environ["CONDA_PREFIX"] + "/bin/python", - 'INGEST_API_URL': os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"] - } + "PYTHON_EXE": os.environ["CONDA_PREFIX"] + "/bin/python", + "INGEST_API_URL": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"], + }, ) t_md_consistency_tests = PythonOperator( - task_id='md_consistency_tests', + task_id="md_consistency_tests", python_callable=pythonop_md_consistency_tests, provide_context=True, - op_kwargs={'metadata_fname': 'rslt.yml'} + op_kwargs={"metadata_fname": "rslt.yml"}, ) def read_metadata_file(**kwargs): - md_fname = os.path.join(get_tmp_dir_path(kwargs['run_id']), - 'rslt.yml') - with open(md_fname, 'r') as f: + md_fname = os.path.join(get_tmp_dir_path(kwargs["run_id"]), "rslt.yml") + with open(md_fname, "r") as f: scanned_md = yaml.safe_load(f) return scanned_md send_status_msg = make_send_status_msg_function( dag_file=__file__, - retcode_ops=['run_md_extract', 'md_consistency_tests'], + retcode_ops=["run_md_extract", "md_consistency_tests"], cwl_workflows=[], dataset_uuid_fun=get_dataset_uuid, dataset_lz_path_fun=get_dataset_lz_path, metadata_fun=read_metadata_file, - include_file_metadata=False + include_file_metadata=False, ) def wrapped_send_status_msg(**kwargs): if send_status_msg(**kwargs): scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read - kwargs['ti'].xcom_push(key='collectiontype', - value=(scanned_md['collectiontype'] - if 'collectiontype' in scanned_md - else None)) - if 'assay_type' in scanned_md: - assay_type = scanned_md['assay_type'] - elif 'metadata' in scanned_md and 'assay_type' in scanned_md['metadata']: - assay_type = scanned_md['metadata']['assay_type'] + kwargs["ti"].xcom_push( + key="collectiontype", + value=(scanned_md["collectiontype"] if "collectiontype" in scanned_md else None), + ) + if "assay_type" in scanned_md: + assay_type = scanned_md["assay_type"] + elif "metadata" in scanned_md and "assay_type" in scanned_md["metadata"]: + assay_type = scanned_md["metadata"]["assay_type"] else: assay_type = None - kwargs['ti'].xcom_push(key='assay_type', value=assay_type) + kwargs["ti"].xcom_push(key="assay_type", value=assay_type) else: - kwargs['ti'].xcom_push(key='collectiontype', value=None) + kwargs["ti"].xcom_push(key="collectiontype", value=None) t_send_status = PythonOperator( - task_id='send_status_msg', + task_id="send_status_msg", python_callable=wrapped_send_status_msg, provide_context=True, - trigger_rule='all_done', + trigger_rule="all_done", op_kwargs={ - 'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - } + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, ) - t_cleanup_tmpdir = CleanupTmpDirOperator(task_id='cleanup_temp_dir') + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir") - t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir \ No newline at end of file + ( + t_check_uuids + >> t_create_tmpdir + >> t_run_md_extract + >> t_md_consistency_tests + >> t_send_status + >> t_cleanup_tmpdir + ) From 78be4bd6fcc9ccaf32205df838b93b7988e80b9a Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 4 Apr 2024 16:28:36 -0400 Subject: [PATCH 54/66] General: Update workflow_map for 10x multiome --- src/ingest-pipeline/airflow/dags/workflow_map.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index a3a4d3ee..020786e4 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -125,4 +125,7 @@ workflow_map: - 'collection_type': 'multiome_snare-seq2_collection' 'assay_type': 'multiome-snare-seq2' 'workflow': 'multiome_snareseq' + - 'collection_type': '.*' + 'assay_type': 'multiome-10x' + 'workflow': 'multiome_10x' From 246933a9f3f02232e8a25ceb27c9404da19ac55d Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Thu, 4 Apr 2024 16:31:37 -0400 Subject: [PATCH 55/66] General: Fix assay_type --- src/ingest-pipeline/airflow/dags/workflow_map.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 020786e4..f2c09660 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -126,6 +126,6 @@ workflow_map: 'assay_type': 'multiome-snare-seq2' 'workflow': 'multiome_snareseq' - 'collection_type': '.*' - 'assay_type': 'multiome-10x' + 'assay_type': '10x-multiome' 'workflow': 'multiome_10x' From 4fc4f424707a2577b053ff5fd340481f9c3389c8 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 4 Apr 2024 16:44:47 -0400 Subject: [PATCH 56/66] Update to support multiple metadata DAGs to be controlled by the resource_map --- src/ingest-pipeline/airflow/dags/resource_map.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index f42deb49..3422b463 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -157,7 +157,7 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 6 - - 'dag_re': 'rebuild_metadata' + - 'dag_re': '.*metadata' 'preserve_scratch': true 'lanes': 2 'tasks': From e08ce50e726bca6c9c37dfe5595e2cec09e1e362 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 10 Apr 2024 10:08:13 -0400 Subject: [PATCH 57/66] Bump portal-containers with multiome support bugfixes --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 31c838c0..a965d1ff 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 31c838c0481c1c433d698ab12208b7fa5cef275b +Subproject commit a965d1ff9e176917190e4b0a3ca9fb569aec3213 From 0c6a1a63b1a8d423e6868ece6deb07c159affd5d Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 10 Apr 2024 11:28:22 -0400 Subject: [PATCH 58/66] Bump portal-containers --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index a965d1ff..d3b6f552 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit a965d1ff9e176917190e4b0a3ca9fb569aec3213 +Subproject commit d3b6f55230a274ff5a86b22e48a19283fbdac3bb From 6bf019e8ec335c9e954dd0d79f932bf8c2a22f4d Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 10 Apr 2024 13:21:46 -0400 Subject: [PATCH 59/66] Bump portal-containers --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index d3b6f552..47af73be 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit d3b6f55230a274ff5a86b22e48a19283fbdac3bb +Subproject commit 47af73be00a269e3b17e5616ff84b2844c2f4984 From 227b91c4376c51ad9f7470d1aa7945616a6a48ef Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 11 Apr 2024 11:28:03 -0400 Subject: [PATCH 60/66] Bump portal-containers --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 47af73be..fa7b688c 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 47af73be00a269e3b17e5616ff84b2844c2f4984 +Subproject commit fa7b688c62d7936b63b9e63cb18744d9fc3fb239 From b735daa5c00536ade34811e2555ee352eb2ffb16 Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Mon, 15 Apr 2024 11:24:45 -0400 Subject: [PATCH 61/66] Bump multiome-rna-atac-pipeline submodule to v1.1.1 --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 7ede3732..0404545e 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 7ede37324db0b8f8ecaab66fa15b968964358e39 +Subproject commit 0404545e60c8819ff92934a33be9a2b55c86d063 From f183e6769f8ed5223e9def51be32a9441176fc0a Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Mon, 15 Apr 2024 11:34:09 -0400 Subject: [PATCH 62/66] Multiome DAG: find ATAC-seq metadata file, pass to pipeline --- src/ingest-pipeline/airflow/dags/multiome.py | 23 +++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index 9e4a0925..b09b0d1d 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -18,7 +18,6 @@ import utils from utils import ( - SequencingDagParameters, get_absolute_workflows, get_cwltool_base_cmd, get_dataset_uuid, @@ -48,6 +47,14 @@ ) +def find_atac_metadata_file(data_dir: Path) -> Path: + for path in data_dir.glob("*.tsv"): + name_lower = path.name.lower() + if path.is_file() and "atac" in name_lower and "metadata" in name_lower: + return path + raise ValueError("Couldn't find ATAC-seq metadata file") + + def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: default_args = { "owner": "hubmap", @@ -116,6 +123,10 @@ def build_cwltool_cmd1(**kwargs): command.append(f"--fastq_dir_{component.lower()}") command.append(data_dir / Path(f"raw/fastq/{component}")) + for data_dir in data_dirs: + command.append("--atac_metadata_file") + command.append(find_atac_metadata_file(data_dir)) + return join_quote_command_str(command) def build_cwltool_cmd2(**kwargs): @@ -333,11 +344,11 @@ def get_simple_multiome_dag_params(assay: str) -> MultiomeSequencingDagParameter multiome_dag_params: List[MultiomeSequencingDagParameters] = [ MultiomeSequencingDagParameters( - dag_id="multiome_10x", - pipeline_name="multiome-10x", - assay_rna="10x_v3_sn", - assay_atac="multiome_10x", - ), + dag_id="multiome_10x", + pipeline_name="multiome-10x", + assay_rna="10x_v3_sn", + assay_atac="multiome_10x", + ), get_simple_multiome_dag_params("snareseq"), ] From f51d680783c40e080ea0ed43459e95959b236639 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 16 Apr 2024 14:58:33 -0400 Subject: [PATCH 63/66] General: Fix assay_type --- src/ingest-pipeline/airflow/dags/resource_map.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index 3422b463..42588614 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -164,6 +164,13 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 6 + - 'dag_re': '.*multiome' + 'preserve_scratch': true + 'lanes': 2 + 'tasks': + - 'task_re': '.*' + 'queue': 'general' + 'threads': 50 - 'dag_re': '.*' 'preserve_scratch': true 'lanes': 2 From 1c31a04c953b0079ef5ef1a5ceaa447c19e19ecd Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Tue, 16 Apr 2024 16:20:52 -0400 Subject: [PATCH 64/66] Bump multiome-rna-atac-pipeline to v1.1.2 --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 0404545e..68e0cc1b 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 0404545e60c8819ff92934a33be9a2b55c86d063 +Subproject commit 68e0cc1be35751f5ef5958050742ddfffd564d3c From 9cbff3c94493dd515c4dfbd56f3ec97e5bc15090 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Wed, 17 Apr 2024 10:38:13 -0400 Subject: [PATCH 65/66] Update portal-containers --- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index fa7b688c..00fd6df7 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit fa7b688c62d7936b63b9e63cb18744d9fc3fb239 +Subproject commit 00fd6df75a4311da4e50cc0173b6d6e50ddaf76d From 0c1a7edc494ef74bf070301649a93f13fc53d9fb Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Wed, 17 Apr 2024 11:47:11 -0400 Subject: [PATCH 66/66] General: Fix assay_type --- src/ingest-pipeline/airflow/dags/resource_map.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index 42588614..5ebe7dac 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -171,6 +171,13 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 50 + - 'dag_re': '.*visium' + 'preserve_scratch': true + 'lanes': 3 + 'tasks': + - 'task_re': '.*' + 'queue': 'general' + 'threads': 15 - 'dag_re': '.*' 'preserve_scratch': true 'lanes': 2