diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index c659432b..463ecdd1 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -22,6 +22,7 @@ get_queue_resource, get_preserve_scratch_resource, pythonop_get_dataset_state, + get_datatype_organ_based, ) from hubmap_operators.common_operators import ( CleanupTmpDirOperator, @@ -49,16 +50,15 @@ with HMDAG( - "pas_ftu_segmentation", - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - "tmp_dir_path": get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('pas_ftu_segmentation'), - }, + "pas_ftu_segmentation", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("pas_ftu_segmentation"), + }, ) as dag: - pipeline_name = "pas-ftu-segmentation-pipeline" cwl_workflows = get_named_absolute_workflows( segmentation=Path(pipeline_name, "pipeline.cwl"), @@ -83,22 +83,19 @@ def build_cwltool_cwl_segmentation(**kwargs): meta_yml_path = workflow.parent / "meta.yaml" # get organ type - ds_rslt = pythonop_get_dataset_state( - dataset_uuid_callable=get_dataset_uuid, - **kwargs - ) + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=get_dataset_uuid, **kwargs) - organ_list = list(set(ds_rslt['organs'])) - organ_code = organ_list[0] if len(organ_list) == 1 else 'multi' + organ_list = list(set(ds_rslt["organs"])) + organ_code = organ_list[0] if len(organ_list) == 1 else "multi" command = [ *get_cwltool_base_cmd(tmpdir), - #"--singularity", + # "--singularity", workflow, "--data_directory", data_dir, "--tissue_type", - organ_code + organ_code, ] return join_quote_command_str(command) @@ -131,7 +128,6 @@ def build_cwltool_cwl_segmentation(**kwargs): }, ) - prepare_cwl_ome_tiff_pyramid = DummyOperator(task_id="prepare_cwl_ome_tiff_pyramid") def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): @@ -239,7 +235,7 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "dataset_types": ["celldive_deepcell"], + "dataset_types_callable": get_datatype_organ_based, }, ) @@ -277,6 +273,7 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): ], cwl_workflows=list(cwl_workflows.values()), ) + t_send_status = PythonOperator( task_id="send_status_msg", python_callable=send_status_msg, provide_context=True ) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index a76b2064..47572725 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -307,6 +307,20 @@ def get_parent_dataset_uuid(**kwargs) -> str: return uuid_set.pop() +def get_datatype_organ_based(**kwargs) -> List[str]: + dataset_uuid = get_parent_dataset_uuid(**kwargs) + + def my_callable(**kwargs): + return dataset_uuid + + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) + organ_list = list(set(ds_rslt["organs"])) + organ_code = organ_list[0] if len(organ_list) == 1 else "multi" + if organ_code in ["LK", "RK"]: + return ["pas_ftu_segmentation"] + return ["image_pyramid"] + + def get_datatype_previous_version(**kwargs) -> List[str]: dataset_uuid = get_previous_revision_uuid(**kwargs) assert dataset_uuid is not None, "Missing previous_version_uuid" diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 02a73cf6..c39b6cda 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -47,9 +47,9 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'PAS' 'workflow': 'pas_ftu_segmentation' - - 'collection_type': '.*' - 'assay_type': 'PAS' - 'workflow': 'ometiff_pyramid' +# - 'collection_type': '.*' +# 'assay_type': 'PAS' +# 'workflow': 'ometiff_pyramid' - 'collection_type': '.*' 'assay_type': 'AF' 'workflow': 'ometiff_pyramid' diff --git a/src/ingest-pipeline/misc/tools/airflow_environments/env_dev.sh b/src/ingest-pipeline/misc/tools/airflow_environments/env_dev.sh index adab293d..c0804167 100644 --- a/src/ingest-pipeline/misc/tools/airflow_environments/env_dev.sh +++ b/src/ingest-pipeline/misc/tools/airflow_environments/env_dev.sh @@ -7,5 +7,5 @@ HM_AF_CONN_UUID_API_CONNECTION=http://https%3a%2f%2fuuid-api.dev.hubmapconsortiu HM_AF_CONN_FILES_API_CONNECTION=http://https%3a%2f%2ffiles-api.dev.hubmapconsortium.org/ HM_AF_CONN_SPATIAL_API_CONNECTION=http://https%3a%2f%2fspatial-api.dev.hubmapconsortium.org/ HM_AF_CONN_CELLS_API_CONNECTION=http://https%3a%2f%2fcells-api.dev.hubmapconsortium.org/ -HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fsearch-api.dev.hubmapconsortium.org%2fv3/ +HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fontoly-api.dev.hubmapconsortium.org%2fv3/ HM_AF_CONN_ENTITY_API_CONNECTION=http://https%3a%2f%2fentity-api.dev.hubmapconsortium.org/ diff --git a/src/ingest-pipeline/misc/tools/airflow_environments/env_prod.sh b/src/ingest-pipeline/misc/tools/airflow_environments/env_prod.sh index e43a1516..556df776 100644 --- a/src/ingest-pipeline/misc/tools/airflow_environments/env_prod.sh +++ b/src/ingest-pipeline/misc/tools/airflow_environments/env_prod.sh @@ -7,5 +7,5 @@ HM_AF_CONN_UUID_API_CONNECTION=http://https%3a%2f%2fuuid.api.hubmapconsortium.or HM_AF_CONN_FILES_API_CONNECTION=http://https%3a%2f%2ffiles.api.hubmapconsortium.org/ HM_AF_CONN_SPATIAL_API_CONNECTION=http://https%3a%2f%2fspatial.api.hubmapconsortium.org/ HM_AF_CONN_CELLS_API_CONNECTION=http://https%3a%2f%2fcells.api.hubmapconsortium.org/ -HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fsearch.api.hubmapconsortium.org%2fv3/ +HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fontology.api.hubmapconsortium.org%2fv3/ HM_AF_CONN_ENTITY_API_CONNECTION=http://https%3a%2f%2fentity.api.hubmapconsortium.org/ diff --git a/src/ingest-pipeline/misc/tools/airflow_environments/env_stage.sh b/src/ingest-pipeline/misc/tools/airflow_environments/env_stage.sh index 5322d10a..e25dc99a 100644 --- a/src/ingest-pipeline/misc/tools/airflow_environments/env_stage.sh +++ b/src/ingest-pipeline/misc/tools/airflow_environments/env_stage.sh @@ -8,5 +8,5 @@ HM_AF_CONN_UUID_API_CONNECTION=http://https%3a%2f%2fuuid-api.stage.hubmapconsort HM_AF_CONN_FILES_API_CONNECTION=http://https%3a%2f%2ffiles-api.stage.hubmapconsortium.org/ HM_AF_CONN_SPATIAL_API_CONNECTION=http://https%3a%2f%2fspatial-api.stage.hubmapconsortium.org/ HM_AF_CONN_CELLS_API_CONNECTION=http://https%3a%2f%2fcells-api.stage.hubmapconsortium.org/ -HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fsearch-api.stage.hubmapconsortium.org%2fv3/ +HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fontology-api.stage.hubmapconsortium.org%2fv3/ HM_AF_CONN_ENTITY_API_CONNECTION=http://https%3a%2f%2fentity-api.stage.hubmapconsortium.org/ diff --git a/src/ingest-pipeline/misc/tools/airflow_environments/env_test.sh b/src/ingest-pipeline/misc/tools/airflow_environments/env_test.sh index 866fc24c..0236751e 100644 --- a/src/ingest-pipeline/misc/tools/airflow_environments/env_test.sh +++ b/src/ingest-pipeline/misc/tools/airflow_environments/env_test.sh @@ -7,5 +7,5 @@ HM_AF_CONN_UUID_API_CONNECTION=http://https%3a%2f%2fuuid-api.test.hubmapconsorti HM_AF_CONN_FILES_API_CONNECTION=http://https%3a%2f%2ffiles-api.test.hubmapconsortium.org/ HM_AF_CONN_SPATIAL_API_CONNECTION=http://https%3a%2f%2fspatial-api.test.hubmapconsortium.org/ HM_AF_CONN_CELLS_API_CONNECTION=http://https%3a%2f%2fcells-api.test.hubmapconsortium.org/ -HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fsearch-api.test.hubmapconsortium.org%2fv3/ +HM_AF_CONN_SEARCH_API_CONNECTION=http://https%3a%2f%2fontology-api.test.hubmapconsortium.org%2fv3/ HM_AF_CONN_ENTITY_API_CONNECTION=http://https%3a%2f%2fentity-api.test.hubmapconsortium.org/