diff --git a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid index b1b40c45..a063a340 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid +++ b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid @@ -1 +1 @@ -Subproject commit b1b40c45741a60e643312ab5513086dbb8105e9a +Subproject commit a063a3404a5f4345292508cb82313bd41ea6c615 diff --git a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq index 16dd8ca3..fa4f9c13 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq +++ b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq @@ -1 +1 @@ -Subproject commit 16dd8ca3ce99b7a2660e9549d74344a5e542d567 +Subproject commit fa4f9c13b96ef0c8168afd14bdd414d2d524a00c diff --git a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py index b9c5bdd1..1018fa14 100644 --- a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py +++ b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py @@ -72,13 +72,13 @@ def check_one_uuid( print("ds_rslt:") pprint(ds_rslt) - for key in ["status", "uuid", "data_types", "local_directory_full_path", "metadata"]: + for key in ["status", "uuid", "local_directory_full_path", "metadata"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: raise AirflowException(f"Dataset {uuid} is not QA or better") - dt = ds_rslt["data_types"] + dt = ds_rslt["dataset_type"] if isinstance(dt, str) and dt.startswith("[") and dt.endswith("]"): dt = ast.literal_eval(dt) print(f"parsed dt: {dt}") diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 907ad0b2..83c0eaad 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -1,5 +1,6 @@ import os import sys +import inspect from datetime import datetime, timedelta from pathlib import Path from pprint import pprint @@ -16,16 +17,17 @@ get_auth_tok, get_preserve_scratch_resource, get_queue_resource, + get_soft_data_assaytype, make_send_status_msg_function, pythonop_get_dataset_state, pythonop_maybe_keep, - get_soft_data_assaytype, ) from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.providers.http.hooks.http import HttpHook sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')) from submodules import ingest_validation_tools_upload # noqa E402 @@ -108,6 +110,11 @@ def run_validation(**kwargs): plugin_path = [path for path in ingest_validation_tests.__path__][0] ignore_globs = [uuid, "extras", "*metadata.tsv", "validation_report.txt"] + app_context = { + "entities_url": HttpHook.get_connection("entity_api_connection").host + "/entities/", + "ingest_url": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"], + "request_header": {"X-Hubmap-Application": "ingest-pipeline"}, + } # # Uncomment offline=True below to avoid validating orcid_id URLs &etc # @@ -120,6 +127,7 @@ def run_validation(**kwargs): add_notes=False, ignore_deprecation=True, globus_token=get_auth_tok(**kwargs), + app_context=app_context, ) # Scan reports an error result errors = upload.get_errors(plugin_kwargs=kwargs) @@ -134,6 +142,7 @@ def run_validation(**kwargs): f.write(report.as_text()) return 1 else: + kwargs["ti"].xcom_push(key="ivt_path", value=inspect.getfile(upload.__class__)) return 0 t_run_validation = PythonOperator( @@ -258,7 +267,10 @@ def flex_maybe_spawn(**kwargs): "parent_lz_path": lz_path, "parent_submission_id": uuid, "metadata": md, - "dag_provenance_list": utils.get_git_provenance_list(__file__), + "dag_provenance_list": utils.get_git_provenance_list( + [__file__, + kwargs["ti"].xcom_pull(task_ids="run_validation", key="ivt_path")] + ), } for next_dag in utils.downstream_workflow_iter(collectiontype, assay_type): yield next_dag, payload diff --git a/src/ingest-pipeline/airflow/dags/status_change/status_manager.py b/src/ingest-pipeline/airflow/dags/status_change/status_manager.py index 6df89934..bba22500 100644 --- a/src/ingest-pipeline/airflow/dags/status_change/status_manager.py +++ b/src/ingest-pipeline/airflow/dags/status_change/status_manager.py @@ -218,6 +218,7 @@ def set_entity_api_status(self) -> Dict: response = http_hook.run( endpoint, json.dumps(data), headers, self.extras["extra_options"] ) + logging.info(f"""Response: {response.json()}""") return response.json() except Exception as e: raise StatusChangerException( diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index ab573e70..9f471e71 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -318,7 +318,9 @@ def my_callable(**kwargs): ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) organ_list = list(set(ds_rslt["organs"])) organ_code = organ_list[0] if len(organ_list) == 1 else "multi" - pipeline_shorthand = "Kaggle-1 Glomerulus Segmentation" if organ_code in ["LK", "RK"] else "Image Pyramid" + pipeline_shorthand = ( + "Kaggle-1 Glomerulus Segmentation" if organ_code in ["LK", "RK"] else "Image Pyramid" + ) return f"{ds_rslt['dataset_type']} [{pipeline_shorthand}]" @@ -804,7 +806,7 @@ def pythonop_send_create_dataset(**kwargs) -> str: "dataset_type": dataset_type, "group_uuid": parent_group_uuid, "contains_human_genetic_sequences": False, - "creation_action": "Central Process" + "creation_action": "Central Process", } if "previous_revision_uuid_callable" in kwargs: previous_revision_uuid = kwargs["previous_revision_uuid_callable"](**kwargs) @@ -942,6 +944,7 @@ def pythonop_get_dataset_state(**kwargs) -> Dict: headers = { "authorization": f"Bearer {auth_tok}", "content-type": "application/json", + "Cache-Control": "no-cache", "X-Hubmap-Application": "ingest-pipeline", } http_hook = HttpHook(method, http_conn_id="entity_api_connection") @@ -1378,8 +1381,12 @@ def my_callable(**kwargs): status = "QA" else: status = ds_rslt.get("status", "QA") - if status in ["Processing", "New"]: - status = "QA" + if status in ["Processing", "New", "Invalid"]: + status = ( + "QA" + if kwargs["dag_run"].conf["dag_id"] == "scan_and_begin_processing" + else "Submitted" + ) if metadata_fun: if not contacts: contacts = ds_rslt.get("contacts", []) diff --git a/src/ingest-pipeline/airflow/dags/validate_upload.py b/src/ingest-pipeline/airflow/dags/validate_upload.py index 0dd55b56..4ebed639 100644 --- a/src/ingest-pipeline/airflow/dags/validate_upload.py +++ b/src/ingest-pipeline/airflow/dags/validate_upload.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import os import sys from datetime import datetime, timedelta from pathlib import Path @@ -25,6 +26,7 @@ from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator +from airflow.providers.http.hooks.http import HttpHook sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')) @@ -72,7 +74,7 @@ def my_callable(**kwargs): print("ds_rslt:") pprint(ds_rslt) - for key in ["entity_type", "status", "uuid", "data_types", "local_directory_full_path"]: + for key in ["entity_type", "status", "uuid", "local_directory_full_path"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" if ds_rslt["entity_type"] != "Upload": @@ -101,6 +103,11 @@ def run_validation(**kwargs): plugin_path = [path for path in ingest_validation_tests.__path__][0] ignore_globs = [uuid, "extras", "*metadata.tsv", "validation_report.txt"] + app_context = { + "entities_url": HttpHook.get_connection("entity_api_connection").host + "/entities/", + "ingest_url": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"], + "request_header": {"X-Hubmap-Application": "ingest-pipeline"}, + } # # Uncomment offline=True below to avoid validating orcid_id URLs &etc # @@ -111,8 +118,11 @@ def run_validation(**kwargs): plugin_directory=plugin_path, # offline=True, # noqa E265 add_notes=False, - extra_parameters={"coreuse": get_threads_resource("validate_upload", "run_validation")}, + extra_parameters={ + "coreuse": get_threads_resource("validate_upload", "run_validation") + }, globus_token=get_auth_tok(**kwargs), + app_context=app_context, ) # Scan reports an error result report = ingest_validation_tools_error_report.ErrorReport( diff --git a/src/ingest-pipeline/airflow/dags/visium.py b/src/ingest-pipeline/airflow/dags/visium.py new file mode 100644 index 00000000..6a9e43d9 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/visium.py @@ -0,0 +1,406 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator +from hubmap_operators.common_operators import ( + CleanupTmpDirOperator, + CreateTmpDirOperator, + JoinOperator, + LogInfoOperator, + MoveDataOperator, + SetDatasetProcessingOperator, +) + +import utils +from utils import ( + get_absolute_workflows, + get_cwltool_base_cmd, + get_dataset_uuid, + get_parent_dataset_uuids_list, + get_parent_data_dir, + build_dataset_name as inner_build_dataset_name, + get_previous_revision_uuid, + get_uuid_for_error, + join_quote_command_str, + make_send_status_msg_function, + get_tmp_dir_path, + HMDAG, + get_queue_resource, + get_threads_resource, + get_preserve_scratch_resource, +) + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("visium_no_probes"), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), +} + +with HMDAG( + "visium_no_probes", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("visium_no_probes"), + }, +) as dag: + cwl_workflows = get_absolute_workflows( + Path("salmon-rnaseq", "pipeline.cwl"), + Path("portal-containers", "h5ad-to-arrow.cwl"), + Path("portal-containers", "anndata-to-ui.cwl"), + Path("ome-tiff-pyramid", "pipeline.cwl"), + Path("portal-containers", "ome-tiff-offsets.cwl"), + ) + + def build_dataset_name(**kwargs): + return inner_build_dataset_name(dag.dag_id, "salmon-rnaseq", **kwargs) + + prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") + + prepare_cwl2 = DummyOperator(task_id="prepare_cwl2") + + prepare_cwl3 = DummyOperator(task_id="prepare_cwl3") + + prepare_cwl4 = DummyOperator(task_id="prepare_cwl4") + + prepare_cwl5 = DummyOperator(task_id="prepare_cwl5") + + def build_cwltool_cmd1(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + data_dir = get_parent_data_dir(**kwargs) + print("data_dirs: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + "--relax-path-checks", + "--outdir", + tmpdir / "cwl_out", + "--parallel", + cwl_workflows[0], + "--assay", + "visium-ff", + "--threads", + get_threads_resource(dag.dag_id), + ] + + command.append("--fastq_dir") + command.append(data_dir / "raw/fastq/") + + command.append("--img_dir") + command.append(data_dir / "lab_processed/images/") + + command.append("--metadata_dir") + command.append(data_dir / "raw/") + + return join_quote_command_str(command) + + def build_cwltool_cmd2(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[1], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd3(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[2], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd4(**kwargs): + run_id = kwargs["run_id"] + + # tmpdir is temp directory in /hubmap-tmp + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + # data directory is the stitched images, which are found in tmpdir + data_dir = get_parent_data_dir(**kwargs) + print("data_dir: ", data_dir) + + # this is the call to the CWL + command = [ + *get_cwltool_base_cmd(tmpdir), + "--relax-path-checks", + cwl_workflows[3], + "--ometiff_directory", + data_dir / "lab_processed/images/", + "--output_filename", + "visium_histology_hires_pyramid.ome.tif", + ] + return join_quote_command_str(command) + + def build_cwltool_cmd5(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[4], + "--input_dir", + data_dir / "ometiff-pyramids", + ] + + return join_quote_command_str(command) + + t_build_cmd1 = PythonOperator( + task_id="build_cmd1", + python_callable=build_cwltool_cmd1, + provide_context=True, + ) + + t_build_cmd2 = PythonOperator( + task_id="build_cmd2", + python_callable=build_cwltool_cmd2, + provide_context=True, + ) + + t_build_cmd3 = PythonOperator( + task_id="build_cmd3", + python_callable=build_cwltool_cmd3, + provide_context=True, + ) + + t_build_cmd4 = PythonOperator( + task_id="build_cmd4", + python_callable=build_cwltool_cmd4, + provide_context=True, + ) + + t_build_cmd5 = PythonOperator( + task_id="build_cmd5", + python_callable=build_cwltool_cmd5, + provide_context=True, + ) + + t_pipeline_exec = BashOperator( + task_id="pipeline_exec", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui = BashOperator( + task_id="convert_for_ui", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui_2 = BashOperator( + task_id="convert_for_ui_2", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_pipeline_exec_cwl_ome_tiff_pyramid = BashOperator( + task_id="pipeline_exec_cwl_ome_tiff_pyramid", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_pipeline_exec_cwl_ome_tiff_offsets = BashOperator( + task_id="pipeline_exec_cwl_ome_tiff_offsets", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd5')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl1 = BranchPythonOperator( + task_id="maybe_keep_cwl1", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl2", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec", + }, + ) + + t_maybe_keep_cwl2 = BranchPythonOperator( + task_id="maybe_keep_cwl2", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl3", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui", + }, + ) + + t_maybe_keep_cwl3 = BranchPythonOperator( + task_id="maybe_keep_cwl3", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl4", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui_2", + }, + ) + + t_maybe_keep_cwl4 = BranchPythonOperator( + task_id="maybe_keep_cwl4", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl5", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_ome_tiff_pyramid", + }, + ) + + t_maybe_keep_cwl5 = BranchPythonOperator( + task_id="maybe_keep_cwl5", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "move_data", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_ome_tiff_offsets", + }, + ) + + t_send_create_dataset = PythonOperator( + task_id="send_create_dataset", + python_callable=utils.pythonop_send_create_dataset, + provide_context=True, + op_kwargs={ + "parent_dataset_uuid_callable": get_parent_dataset_uuids_list, + "previous_revision_uuid_callable": get_previous_revision_uuid, + "http_conn_id": "ingest_api_connection", + "dataset_name_callable": build_dataset_name, + "pipeline_shorthand": "Salmon + Scanpy", + }, + ) + + t_set_dataset_error = PythonOperator( + task_id="set_dataset_error", + python_callable=utils.pythonop_set_dataset_state, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "dataset_uuid_callable": get_dataset_uuid, + "ds_state": "Error", + "message": f"An error occurred in salmon-rnaseq", + }, + ) + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=["pipeline_exec", "move_data", "convert_for_ui", "convert_for_ui_2"], + cwl_workflows=cwl_workflows, + ) + + t_send_status = PythonOperator( + task_id="send_status_msg", + python_callable=send_status_msg, + provide_context=True, + ) + + t_log_info = LogInfoOperator(task_id="log_info") + t_join = JoinOperator(task_id="join") + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") + t_set_dataset_processing = SetDatasetProcessingOperator(task_id="set_dataset_processing") + t_move_data = MoveDataOperator(task_id="move_data") + + ( + t_log_info + >> t_create_tmpdir + >> t_send_create_dataset + >> t_set_dataset_processing + >> prepare_cwl1 + >> t_build_cmd1 + >> t_pipeline_exec + >> t_maybe_keep_cwl1 + >> prepare_cwl2 + >> t_build_cmd2 + >> t_convert_for_ui + >> t_maybe_keep_cwl2 + >> prepare_cwl3 + >> t_build_cmd3 + >> t_convert_for_ui_2 + >> t_maybe_keep_cwl3 + >> prepare_cwl4 + >> t_build_cmd4 + >> t_pipeline_exec_cwl_ome_tiff_pyramid + >> t_maybe_keep_cwl4 + >> prepare_cwl5 + >> t_build_cmd5 + >> t_pipeline_exec_cwl_ome_tiff_offsets + >> t_maybe_keep_cwl5 + >> t_move_data + >> t_send_status + >> t_join + ) + t_maybe_keep_cwl1 >> t_set_dataset_error + t_maybe_keep_cwl2 >> t_set_dataset_error + t_maybe_keep_cwl3 >> t_set_dataset_error + t_maybe_keep_cwl4 >> t_set_dataset_error + t_maybe_keep_cwl5 >> t_set_dataset_error + t_set_dataset_error >> t_join + t_join >> t_cleanup_tmpdir diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index c39b6cda..5e087e5e 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -113,4 +113,7 @@ workflow_map: - 'collection_type': 'reversioning_annotations' 'assay_type': '.*' 'workflow': 'azimuth_annotations' + - 'collection_type': '.*' + 'assay_type': 'visium-no-probes' + 'workflow': 'visium_no_probes' diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index aabc1fb2..b726cb3f 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit aabc1fb2d6156c58b4659c017dfef9cfe03414ef +Subproject commit b726cb3fdf529cdf69039ec5aebc815780de77d9