diff --git a/.gitmodules b/.gitmodules index 31598e89..154bd101 100644 --- a/.gitmodules +++ b/.gitmodules @@ -58,3 +58,6 @@ [submodule "src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline"] path = src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline url = https://github.com/hubmapconsortium/mibi-pipeline +[submodule "src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline"] + path = src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline + url = https://github.com/hubmapconsortium/multiome-rna-atac-pipeline diff --git a/src/ingest-pipeline/airflow/dags/bulk_process.py b/src/ingest-pipeline/airflow/dags/bulk_process.py new file mode 100644 index 00000000..9b002b29 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/bulk_process.py @@ -0,0 +1,199 @@ +import ast +from pprint import pprint +from datetime import datetime, timedelta + +from airflow.operators.python import PythonOperator +from airflow.exceptions import AirflowException +from airflow.configuration import conf as airflow_conf +from hubmap_operators.flex_multi_dag_run import FlexMultiDagRunOperator + +import utils + +from utils import ( + localized_assert_json_matches_schema as assert_json_matches_schema, + HMDAG, + get_queue_resource, + get_preserve_scratch_resource, + get_soft_data_assaytype, + get_auth_tok, +) + +from extra_utils import check_link_published_drvs + + +def get_uuid_for_error(**kwargs) -> str: + """ + Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise. + """ + return "" + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("launch_multi_analysis"), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), +} + + +with HMDAG( + "bulk_process", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": utils.get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("launch_multi_analysis"), + }, +) as dag: + + def check_one_uuid( + uuid: str, previous_version_uuid: str, avoid_previous_version: bool, **kwargs + ): + """ + Look up information on the given uuid or HuBMAP identifier. + Returns: + - the uuid, translated from an identifier if necessary + - data type(s) of the dataset + - local directory full path of the dataset + """ + print(f"Starting uuid {uuid}") + my_callable = lambda **kwargs: uuid + ds_rslt = utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) + if not ds_rslt: + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") + pprint(ds_rslt) + + for key in ["status", "uuid", "local_directory_full_path", "metadata"]: + assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" + + if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: + raise AirflowException(f"Dataset {uuid} is not QA or better") + + dt = ds_rslt["dataset_type"] + if isinstance(dt, str) and dt.startswith("[") and dt.endswith("]"): + dt = ast.literal_eval(dt) + print(f"parsed dt: {dt}") + + if not previous_version_uuid and not avoid_previous_version: + previous_status, previous_uuid = check_link_published_drvs( + uuid, get_auth_tok(**kwargs) + ) + if previous_status: + previous_version_uuid = previous_uuid + + return ( + ds_rslt["uuid"], + dt, + ds_rslt["local_directory_full_path"], + ds_rslt["metadata"], + previous_version_uuid, + ) + + def check_uuids(**kwargs): + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) + + try: + assert_json_matches_schema(kwargs["dag_run"].conf, "launch_multi_metadata_schema.yml") + except AssertionError as e: + print("invalid metadata follows:") + pprint(kwargs["dag_run"].conf) + raise + + uuid_l = kwargs["dag_run"].conf["uuid_list"] + collection_type = kwargs["dag_run"].conf["collection_type"] + prev_version_uuid = kwargs["dag_run"].conf.get("previous_version_uuid", None) + avoid_previous_version = kwargs["dag_run"].conf.get("avoid_previous_version_find", False) + filtered_uuid_l = [] + for uuid in uuid_l: + uuid, dt, lz_path, metadata, prev_version_uuid = check_one_uuid( + uuid, prev_version_uuid, avoid_previous_version, **kwargs + ) + soft_data_assaytype = get_soft_data_assaytype(uuid, **kwargs) + print(f"Got {soft_data_assaytype} as the soft_data_assaytype for UUID {uuid}") + filtered_uuid_l.append( + { + "uuid": uuid, + "dataset_type": soft_data_assaytype, + "path": lz_path, + "metadata": metadata, + "prev_version_uuid": prev_version_uuid, + } + ) + # if prev_version_uuid is not None: + # prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, **kwargs)[0] + # print(f'Finished uuid {uuid}') + print(f"filtered data types: {soft_data_assaytype}") + print(f"filtered paths: {lz_path}") + print(f"filtered uuids: {uuid}") + print(f"filtered previous_version_uuid: {prev_version_uuid}") + kwargs["ti"].xcom_push(key="collectiontype", value=collection_type) + kwargs["ti"].xcom_push(key="uuids", value=filtered_uuid_l) + + check_uuids_t = PythonOperator( + task_id="check_uuids", + python_callable=check_uuids, + provide_context=True, + op_kwargs={ + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + def flex_maybe_spawn(**kwargs): + """ + This is a generator which returns appropriate DagRunOrders + """ + print("kwargs:") + pprint(kwargs) + print("dag_run conf:") + ctx = kwargs["dag_run"].conf + pprint(ctx) + collectiontype = kwargs["ti"].xcom_pull(key="collectiontype", task_ids="check_uuids") + for uuid in kwargs["ti"].xcom_pull(key="uuids", task_ids="check_uuids"): + lz_path = uuid.get("path") + parent_submission = uuid.get("uuid") + prev_version_uuid = uuid.get("prev_version_uuid") + metadata = uuid.get("metadata") + assay_type = uuid.get("dataset_type") + print("collectiontype: <{}>, assay_type: <{}>".format(collectiontype, assay_type)) + print(f"uuid: {uuid}") + print("lz_paths:") + pprint(lz_path) + print(f"previous version uuid: {prev_version_uuid}") + payload = { + "ingest_id": kwargs["run_id"], + "crypt_auth_tok": kwargs["crypt_auth_tok"], + "parent_lz_path": lz_path, + "parent_submission_id": parent_submission, + "previous_version_uuid": prev_version_uuid, + "metadata": metadata, + "dag_provenance_list": utils.get_git_provenance_list(__file__), + } + for next_dag in utils.downstream_workflow_iter(collectiontype, assay_type): + yield next_dag, payload + + + t_maybe_spawn = FlexMultiDagRunOperator( + task_id="flex_maybe_spawn", + dag=dag, + trigger_dag_id="launch_multi_analysis", + python_callable=flex_maybe_spawn, + op_kwargs={ + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + check_uuids_t >> t_maybe_spawn diff --git a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate index a5b2aa92..6694e4ba 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate +++ b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate @@ -1 +1 @@ -Subproject commit a5b2aa9271c476d5c2c2b41dbf9e0a32dcc003e7 +Subproject commit 6694e4ba1c14688664b79b3bf36ad86c8069970c diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline new file mode 160000 index 00000000..68e0cc1b --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -0,0 +1 @@ +Subproject commit 68e0cc1be35751f5ef5958050742ddfffd564d3c diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 5faabafd..00fd6df7 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 5faabafdcc37bc5e55d8c2bf0228f641aa83d765 +Subproject commit 00fd6df75a4311da4e50cc0173b6d6e50ddaf76d diff --git a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py index 1018fa14..ae5ca83c 100644 --- a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py +++ b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py @@ -127,9 +127,9 @@ def check_uuids(**kwargs): filtered_path_l.append(lz_path) filtered_uuid_l.append(uuid) filtered_md_l.append(metadata) - if prev_version_uuid is not None: - prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, - **kwargs)[0] + # if prev_version_uuid is not None: + # prev_version_uuid = check_one_uuid(prev_version_uuid, "", False, + # **kwargs)[0] # print(f'Finished uuid {uuid}') print(f"filtered data types: {filtered_data_types}") print(f"filtered paths: {filtered_path_l}") diff --git a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py index 567a009f..7d6d2b70 100644 --- a/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py +++ b/src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py @@ -90,7 +90,7 @@ def check_one_uuid(uuid, **kwargs): for key in ["status", "uuid", "local_directory_full_path", "metadata", "dataset_type"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]: + if not ds_rslt["status"] in ["New", "Error", "QA", "Published", "Submitted"]: raise AirflowException(f"Dataset {uuid} is not QA or better") return ( @@ -230,4 +230,11 @@ def wrapped_send_status_msg(**kwargs): t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir") - t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir + ( + t_check_uuids + >> t_create_tmpdir + >> t_run_md_extract + >> t_md_consistency_tests + >> t_send_status + >> t_cleanup_tmpdir + ) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py new file mode 100644 index 00000000..b09b0d1d --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -0,0 +1,356 @@ +from collections import namedtuple +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator +from hubmap_operators.common_operators import ( + CleanupTmpDirOperator, + CreateTmpDirOperator, + JoinOperator, + LogInfoOperator, + MoveDataOperator, + SetDatasetProcessingOperator, +) + +import utils +from utils import ( + get_absolute_workflows, + get_cwltool_base_cmd, + get_dataset_uuid, + get_parent_dataset_uuids_list, + get_parent_data_dirs_list, + build_dataset_name as inner_build_dataset_name, + get_previous_revision_uuid, + get_uuid_for_error, + join_quote_command_str, + make_send_status_msg_function, + get_tmp_dir_path, + pythonop_get_dataset_state, + HMDAG, + get_queue_resource, + get_threads_resource, + get_preserve_scratch_resource, +) + +MultiomeSequencingDagParameters = namedtuple( + "MultiomeSequencingDagParameters", + [ + "dag_id", + "pipeline_name", + "assay_rna", + "assay_atac", + ], +) + + +def find_atac_metadata_file(data_dir: Path) -> Path: + for path in data_dir.glob("*.tsv"): + name_lower = path.name.lower() + if path.is_file() and "atac" in name_lower and "metadata" in name_lower: + return path + raise ValueError("Couldn't find ATAC-seq metadata file") + + +def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: + default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource(params.dag_id), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), + } + + with HMDAG( + params.dag_id, + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource(params.dag_id), + }, + ) as dag: + cwl_workflows = get_absolute_workflows( + Path("multiome-rna-atac-pipeline", "pipeline.cwl"), + Path("azimuth-annotate", "pipeline.cwl"), + Path("portal-containers", "mudata-to-ui.cwl"), + ) + + def build_dataset_name(**kwargs): + return inner_build_dataset_name(dag.dag_id, params.pipeline_name, **kwargs) + + prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") + + prepare_cwl2 = DummyOperator(task_id="prepare_cwl2") + + prepare_cwl3 = DummyOperator(task_id="prepare_cwl3") + + def build_cwltool_cmd1(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + data_dirs = get_parent_data_dirs_list(**kwargs) + print("data_dirs: ", data_dirs) + + command = [ + *get_cwltool_base_cmd(tmpdir), + "--relax-path-checks", + "--outdir", + tmpdir / "cwl_out", + "--parallel", + cwl_workflows[0], + "--threads_rna", + get_threads_resource(dag.dag_id), + "--threads_atac", + get_threads_resource(dag.dag_id), + ] + + for component in ["RNA", "ATAC"]: + command.append(f"--assay_{component.lower()}") + command.append(getattr(params, f"assay_{component.lower()}")) + for data_dir in data_dirs: + command.append(f"--fastq_dir_{component.lower()}") + command.append(data_dir / Path(f"raw/fastq/{component}")) + + for data_dir in data_dirs: + command.append("--atac_metadata_file") + command.append(find_atac_metadata_file(data_dir)) + + return join_quote_command_str(command) + + def build_cwltool_cmd2(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + # get organ type + ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=get_dataset_uuid, **kwargs) + + organ_list = list(set(ds_rslt["organs"])) + organ_code = organ_list[0] if len(organ_list) == 1 else "multi" + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[1], + "--reference", + organ_code, + "--matrix", + "mudata_raw.h5mu", + "--secondary-analysis-matrix", + "secondary_analysis.h5mu", + "--assay", + params.assay_rna, + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd3(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[2], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + t_build_cmd1 = PythonOperator( + task_id="build_cmd1", + python_callable=build_cwltool_cmd1, + provide_context=True, + ) + + t_build_cmd2 = PythonOperator( + task_id="build_cmd2", + python_callable=build_cwltool_cmd2, + provide_context=True, + ) + + t_build_cmd3 = PythonOperator( + task_id="build_cmd3", + python_callable=build_cwltool_cmd3, + provide_context=True, + ) + + t_pipeline_exec = BashOperator( + task_id="pipeline_exec", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_pipeline_exec_azimuth_annotate = BashOperator( + task_id="pipeline_exec_azimuth_annotate", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd "$tmp_dir"/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui = BashOperator( + task_id="convert_for_ui", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl1 = BranchPythonOperator( + task_id="maybe_keep_cwl1", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl2", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec", + }, + ) + + t_maybe_keep_cwl2 = BranchPythonOperator( + task_id="maybe_keep_cwl2", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl3", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_azimuth_annotate", + }, + ) + + t_maybe_keep_cwl3 = BranchPythonOperator( + task_id="maybe_keep_cwl3", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "move_data", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui", + }, + ) + + t_send_create_dataset = PythonOperator( + task_id="send_create_dataset", + python_callable=utils.pythonop_send_create_dataset, + provide_context=True, + op_kwargs={ + "parent_dataset_uuid_callable": get_parent_dataset_uuids_list, + "previous_revision_uuid_callable": get_previous_revision_uuid, + "http_conn_id": "ingest_api_connection", + "dataset_name_callable": build_dataset_name, + "pipeline_shorthand": "Salmon + ArchR + Muon", + }, + ) + + t_set_dataset_error = PythonOperator( + task_id="set_dataset_error", + python_callable=utils.pythonop_set_dataset_state, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "dataset_uuid_callable": get_dataset_uuid, + "ds_state": "Error", + "message": f"An error occurred in {params.pipeline_name}", + }, + ) + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=[ + "pipeline_exec", + "pipeline_exec_azimuth_annotate", + "move_data", + "convert_for_ui", + ], + cwl_workflows=cwl_workflows, + ) + + t_send_status = PythonOperator( + task_id="send_status_msg", + python_callable=send_status_msg, + provide_context=True, + ) + + t_log_info = LogInfoOperator(task_id="log_info") + t_join = JoinOperator(task_id="join") + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") + t_set_dataset_processing = SetDatasetProcessingOperator(task_id="set_dataset_processing") + t_move_data = MoveDataOperator(task_id="move_data") + + ( + t_log_info + >> t_create_tmpdir + >> t_send_create_dataset + >> t_set_dataset_processing + >> prepare_cwl1 + >> t_build_cmd1 + >> t_pipeline_exec + >> t_maybe_keep_cwl1 + >> prepare_cwl2 + >> t_build_cmd2 + >> t_pipeline_exec_azimuth_annotate + >> t_maybe_keep_cwl2 + >> prepare_cwl3 + >> t_build_cmd3 + >> t_convert_for_ui + >> t_maybe_keep_cwl3 + >> t_move_data + >> t_send_status + >> t_join + ) + t_maybe_keep_cwl1 >> t_set_dataset_error + t_maybe_keep_cwl2 >> t_set_dataset_error + t_maybe_keep_cwl3 >> t_set_dataset_error + t_set_dataset_error >> t_join + t_join >> t_cleanup_tmpdir + + return dag + + +def get_simple_multiome_dag_params(assay: str) -> MultiomeSequencingDagParameters: + return MultiomeSequencingDagParameters( + dag_id=f"multiome_{assay}", + pipeline_name=f"multiome-{assay}", + assay_rna=assay, + assay_atac=assay, + ) + + +multiome_dag_params: List[MultiomeSequencingDagParameters] = [ + MultiomeSequencingDagParameters( + dag_id="multiome_10x", + pipeline_name="multiome-10x", + assay_rna="10x_v3_sn", + assay_atac="multiome_10x", + ), + get_simple_multiome_dag_params("snareseq"), +] + +for params in multiome_dag_params: + globals()[params.dag_id] = generate_multiome_dag(params) diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index 6e3b509c..4bc9bb50 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -235,7 +235,7 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "dataset_types_callable": get_dataset_type_organ_based, + "dataset_type_callable": get_dataset_type_organ_based, }, ) diff --git a/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py b/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py index 17333764..cdb63ac3 100644 --- a/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py +++ b/src/ingest-pipeline/airflow/dags/rebuild_primary_dataset_metadata.py @@ -36,38 +36,40 @@ def get_uuid_for_error(**kwargs): def get_dataset_uuid(**kwargs): - return kwargs['dag_run'].conf['uuid'] + return kwargs["dag_run"].conf["uuid"] def get_dataset_lz_path(**kwargs): - ctx = kwargs['dag_run'].conf - return ctx['lz_path'] + ctx = kwargs["dag_run"].conf + return ctx["lz_path"] default_args = { - 'owner': 'hubmap', - 'depends_on_past': False, - 'start_date': datetime(2019, 1, 1), - 'email': ['joel.welling@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - 'xcom_push': True, - 'queue': get_queue_resource('rebuild_metadata'), - 'on_failure_callback': create_dataset_state_error_callback(get_uuid_for_error) + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("rebuild_metadata"), + "on_failure_callback": create_dataset_state_error_callback(get_uuid_for_error), } -with HMDAG('rebuild_primary_dataset_metadata', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('rebuild_metadata') - }) as dag: +with HMDAG( + "rebuild_primary_dataset_metadata", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + }, +) as dag: - t_create_tmpdir = CreateTmpDirOperator(task_id='create_temp_dir') + t_create_tmpdir = CreateTmpDirOperator(task_id="create_temp_dir") def check_one_uuid(uuid, **kwargs): """ @@ -77,54 +79,56 @@ def check_one_uuid(uuid, **kwargs): - data type(s) of the dataset - local directory full path of the dataset """ - print(f'Starting uuid {uuid}') + print(f"Starting uuid {uuid}") my_callable = lambda **kwargs: uuid ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs) if not ds_rslt: - raise AirflowException(f'Invalid uuid/doi for group: {uuid}') - print('ds_rslt:') + raise AirflowException(f"Invalid uuid/doi for group: {uuid}") + print("ds_rslt:") pprint(ds_rslt) - for key in ['status', 'uuid', 'local_directory_full_path', 'metadata']: + for key in ["status", "uuid", "local_directory_full_path", "metadata"]: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" - if not ds_rslt['status'] in ['New', 'Error', 'QA', 'Published']: - raise AirflowException(f'Dataset {uuid} is not QA or better') + if not ds_rslt["status"] in ["New", "Error", "QA", "Published", "Submitted"]: + raise AirflowException(f"Dataset {uuid} is not QA or better") - return (ds_rslt['uuid'], ds_rslt['local_directory_full_path'], - ds_rslt['metadata']) + return (ds_rslt["uuid"], ds_rslt["local_directory_full_path"], ds_rslt["metadata"]) def check_uuids(**kwargs): - print('dag_run conf follows:') - pprint(kwargs['dag_run'].conf) + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) try: - assert_json_matches_schema(kwargs['dag_run'].conf, - 'launch_checksums_metadata_schema.yml') + assert_json_matches_schema( + kwargs["dag_run"].conf, "launch_checksums_metadata_schema.yml" + ) except AssertionError as e: - print('invalid metadata follows:') - pprint(kwargs['dag_run'].conf) + print("invalid metadata follows:") + pprint(kwargs["dag_run"].conf) raise - uuid, lz_path, metadata = check_one_uuid(kwargs['dag_run'].conf['uuid'], **kwargs) - print(f'filtered metadata: {metadata}') - print(f'filtered paths: {lz_path}') - kwargs['dag_run'].conf['lz_path'] = lz_path - kwargs['dag_run'].conf['src_path'] = airflow_conf.as_dict()['connections']['src_path'].strip("'") - + uuid, lz_path, metadata = check_one_uuid(kwargs["dag_run"].conf["uuid"], **kwargs) + print(f"filtered metadata: {metadata}") + print(f"filtered paths: {lz_path}") + kwargs["dag_run"].conf["lz_path"] = lz_path + kwargs["dag_run"].conf["src_path"] = airflow_conf.as_dict()["connections"][ + "src_path" + ].strip("'") t_check_uuids = PythonOperator( - task_id='check_uuids', + task_id="check_uuids", python_callable=check_uuids, provide_context=True, op_kwargs={ - 'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - } + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, ) t_run_md_extract = BashOperator( - task_id='run_md_extract', + task_id="run_md_extract", bash_command=""" \ lz_dir="{{dag_run.conf.lz_path}}" ; \ src_dir="{{dag_run.conf.src_path}}/md" ; \ @@ -141,71 +145,79 @@ def check_uuids(**kwargs): fi """, env={ - 'AUTH_TOK': ( + "AUTH_TOK": ( utils.get_auth_tok( **{ - 'crypt_auth_tok': utils.encrypt_tok( - airflow_conf.as_dict()['connections']['APP_CLIENT_SECRET']).decode() + "crypt_auth_tok": utils.encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode() } ) ), - 'PYTHON_EXE': os.environ["CONDA_PREFIX"] + "/bin/python", - 'INGEST_API_URL': os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"] - } + "PYTHON_EXE": os.environ["CONDA_PREFIX"] + "/bin/python", + "INGEST_API_URL": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"], + }, ) t_md_consistency_tests = PythonOperator( - task_id='md_consistency_tests', + task_id="md_consistency_tests", python_callable=pythonop_md_consistency_tests, provide_context=True, - op_kwargs={'metadata_fname': 'rslt.yml'} + op_kwargs={"metadata_fname": "rslt.yml"}, ) def read_metadata_file(**kwargs): - md_fname = os.path.join(get_tmp_dir_path(kwargs['run_id']), - 'rslt.yml') - with open(md_fname, 'r') as f: + md_fname = os.path.join(get_tmp_dir_path(kwargs["run_id"]), "rslt.yml") + with open(md_fname, "r") as f: scanned_md = yaml.safe_load(f) return scanned_md send_status_msg = make_send_status_msg_function( dag_file=__file__, - retcode_ops=['run_md_extract', 'md_consistency_tests'], + retcode_ops=["run_md_extract", "md_consistency_tests"], cwl_workflows=[], dataset_uuid_fun=get_dataset_uuid, dataset_lz_path_fun=get_dataset_lz_path, metadata_fun=read_metadata_file, - include_file_metadata=False + include_file_metadata=False, ) def wrapped_send_status_msg(**kwargs): if send_status_msg(**kwargs): scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read - kwargs['ti'].xcom_push(key='collectiontype', - value=(scanned_md['collectiontype'] - if 'collectiontype' in scanned_md - else None)) - if 'assay_type' in scanned_md: - assay_type = scanned_md['assay_type'] - elif 'metadata' in scanned_md and 'assay_type' in scanned_md['metadata']: - assay_type = scanned_md['metadata']['assay_type'] + kwargs["ti"].xcom_push( + key="collectiontype", + value=(scanned_md["collectiontype"] if "collectiontype" in scanned_md else None), + ) + if "assay_type" in scanned_md: + assay_type = scanned_md["assay_type"] + elif "metadata" in scanned_md and "assay_type" in scanned_md["metadata"]: + assay_type = scanned_md["metadata"]["assay_type"] else: assay_type = None - kwargs['ti'].xcom_push(key='assay_type', value=assay_type) + kwargs["ti"].xcom_push(key="assay_type", value=assay_type) else: - kwargs['ti'].xcom_push(key='collectiontype', value=None) + kwargs["ti"].xcom_push(key="collectiontype", value=None) t_send_status = PythonOperator( - task_id='send_status_msg', + task_id="send_status_msg", python_callable=wrapped_send_status_msg, provide_context=True, - trigger_rule='all_done', + trigger_rule="all_done", op_kwargs={ - 'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - } + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, ) - t_cleanup_tmpdir = CleanupTmpDirOperator(task_id='cleanup_temp_dir') + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir") - t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir \ No newline at end of file + ( + t_check_uuids + >> t_create_tmpdir + >> t_run_md_extract + >> t_md_consistency_tests + >> t_send_status + >> t_cleanup_tmpdir + ) diff --git a/src/ingest-pipeline/airflow/dags/reorganize_upload.py b/src/ingest-pipeline/airflow/dags/reorganize_upload.py index 7568a112..a5a807c7 100644 --- a/src/ingest-pipeline/airflow/dags/reorganize_upload.py +++ b/src/ingest-pipeline/airflow/dags/reorganize_upload.py @@ -374,13 +374,18 @@ def flex_maybe_multiassay_spawn(**kwargs): time.sleep(1) print(f"Triggering reorganization for UUID {uuid}") trigger_dag(dag_id, run_id, conf, execution_date=execution_date) - return [] + return 0 - t_maybe_multiassay_spawn = FlexMultiDagRunOperator( + # t_maybe_multiassay_spawn = FlexMultiDagRunOperator( + # task_id="flex_maybe_spawn", + # dag=dag, + # trigger_dag_id="scan_and_begin_processing", + # python_callable=flex_maybe_multiassay_spawn, + # ) + t_maybe_multiassay_spawn = PythonOperator( task_id="flex_maybe_spawn", - dag=dag, - trigger_dag_id="scan_and_begin_processing", python_callable=flex_maybe_multiassay_spawn, + provide_context=True, ) def _get_upload_uuid(**kwargs): diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index f42deb49..5ebe7dac 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -157,13 +157,27 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 6 - - 'dag_re': 'rebuild_metadata' + - 'dag_re': '.*metadata' 'preserve_scratch': true 'lanes': 2 'tasks': - 'task_re': '.*' 'queue': 'general' 'threads': 6 + - 'dag_re': '.*multiome' + 'preserve_scratch': true + 'lanes': 2 + 'tasks': + - 'task_re': '.*' + 'queue': 'general' + 'threads': 50 + - 'dag_re': '.*visium' + 'preserve_scratch': true + 'lanes': 3 + 'tasks': + - 'task_re': '.*' + 'queue': 'general' + 'threads': 15 - 'dag_re': '.*' 'preserve_scratch': true 'lanes': 2 diff --git a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py index 2962bad7..f7a88e74 100644 --- a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py +++ b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py @@ -51,22 +51,23 @@ def generate_atac_seq_dag(params: SequencingDagParameters) -> DAG: "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), } - with HMDAG(params.dag_id, - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - "tmp_dir_path": get_tmp_dir_path, - "preserve_scratch": get_preserve_scratch_resource(params.dag_id), - }) as dag: + with HMDAG( + params.dag_id, + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource(params.dag_id), + }, + ) as dag: cwl_workflows = get_absolute_workflows( Path("sc-atac-seq-pipeline", "sc_atac_seq_prep_process_analyze.cwl"), Path("portal-containers", "scatac-csv-to-arrow.cwl"), ) def build_dataset_name(**kwargs): - return inner_build_dataset_name(dag.dag_id, params.pipeline_name, - **kwargs) + return inner_build_dataset_name(dag.dag_id, params.pipeline_name, **kwargs) prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") @@ -174,7 +175,7 @@ def build_cwltool_cmd2(**kwargs): "previous_revision_uuid_callable": get_previous_revision_uuid, "http_conn_id": "ingest_api_connection", "dataset_name_callable": build_dataset_name, - "pipeline_shorthand": "SnapATAC" + "pipeline_shorthand": "ArchR", }, ) @@ -233,6 +234,7 @@ def build_cwltool_cmd2(**kwargs): return dag + atacseq_dag_data: List[SequencingDagParameters] = [ SequencingDagParameters( dag_id="sc_atac_seq_sci", diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index 2d58c511..fcbf02fe 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -130,8 +130,10 @@ def run_validation(**kwargs): plugin_directory=plugin_path, # offline=True, # noqa E265 add_notes=False, + extra_parameters={ + "coreuse": get_threads_resource("scan_and_begin_processing", "run_validation") + }, ignore_deprecation=True, - extra_parameters={'coreuse': get_threads_resource('validate_upload', 'run_validation')}, globus_token=get_auth_tok(**kwargs), app_context=app_context, ) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 9d322c20..145204ac 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -753,7 +753,7 @@ def pythonop_send_create_dataset(**kwargs) -> str: for arg in ["parent_dataset_uuid_callable", "http_conn_id"]: assert arg in kwargs, "missing required argument {}".format(arg) - for arg_options in [["pipeline_shorthand", "dataset_types_callable"]]: + for arg_options in [["pipeline_shorthand", "dataset_type_callable"]]: assert any([arg in kwargs for arg in arg_options]) http_conn_id = kwargs["http_conn_id"] diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 43a1b558..f2c09660 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -119,4 +119,13 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'visium-no-probes' 'workflow': 'visium_no_probes' +# - 'collection_type': '.*' # disconnected because we expect these to be multis +# 'assay_type': 'multiome-snare-seq2' +# 'workflow': 'multiome_snareseq' + - 'collection_type': 'multiome_snare-seq2_collection' + 'assay_type': 'multiome-snare-seq2' + 'workflow': 'multiome_snareseq' + - 'collection_type': '.*' + 'assay_type': '10x-multiome' + 'workflow': 'multiome_10x' diff --git a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py index b9099334..8a65ef1d 100644 --- a/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py +++ b/src/ingest-pipeline/airflow/plugins/hubmap_api/endpoint.py @@ -9,6 +9,7 @@ import pytz import yaml from cryptography.fernet import Fernet +from time import sleep from flask import request, Response @@ -28,34 +29,34 @@ LOGGER = logging.getLogger(__name__) -airflow_conf.read(os.path.join(os.environ['AIRFLOW_HOME'], 'instance', 'app.cfg')) +airflow_conf.read(os.path.join(os.environ["AIRFLOW_HOME"], "instance", "app.cfg")) # Tables of configuration elements needed at start-up. # Config elements must be lowercase NEEDED_ENV_VARS = [ - 'AIRFLOW_CONN_INGEST_API_CONNECTION', - 'AIRFLOW_CONN_UUID_API_CONNECTION', - 'AIRFLOW_CONN_FILES_API_CONNECTION', - 'AIRFLOW_CONN_SPATIAL_API_CONNECTION', - 'AIRFLOW_CONN_SEARCH_API_CONNECTION', - 'AIRFLOW_CONN_ENTITY_API_CONNECTION' - ] + "AIRFLOW_CONN_INGEST_API_CONNECTION", + "AIRFLOW_CONN_UUID_API_CONNECTION", + "AIRFLOW_CONN_FILES_API_CONNECTION", + "AIRFLOW_CONN_SPATIAL_API_CONNECTION", + "AIRFLOW_CONN_SEARCH_API_CONNECTION", + "AIRFLOW_CONN_ENTITY_API_CONNECTION", +] NEEDED_CONFIG_SECTIONS = [ - 'ingest_map', - ] + "ingest_map", +] NEEDED_CONFIGS = [ - ('ingest_map', 'scan.and.begin.processing'), - ('ingest_map', 'validate.upload'), - ('hubmap_api_plugin', 'build_number'), - ('connections', 'app_client_id'), - ('connections', 'app_client_secret'), - ('connections', 'docker_mount_path'), - ('connections', 'src_path'), - ('connections', 'output_group_name'), - ('connections', 'workflow_scratch'), - ('core', 'timezone'), - ('core', 'fernet_key') - ] + ("ingest_map", "scan.and.begin.processing"), + ("ingest_map", "validate.upload"), + ("hubmap_api_plugin", "build_number"), + ("connections", "app_client_id"), + ("connections", "app_client_secret"), + ("connections", "docker_mount_path"), + ("connections", "src_path"), + ("connections", "output_group_name"), + ("connections", "workflow_scratch"), + ("core", "timezone"), + ("core", "fernet_key"), +] def check_config(): @@ -64,18 +65,18 @@ def check_config(): failed = 0 for elt in NEEDED_ENV_VARS: if elt not in os.environ: - LOGGER.error('The environment variable {} is not set'.format(elt)) + LOGGER.error("The environment variable {} is not set".format(elt)) failed += 1 for key in NEEDED_CONFIG_SECTIONS + [a for a, b in NEEDED_CONFIGS]: if not (key in dct or key.upper() in dct): - LOGGER.error('The configuration section {} does not exist'.format(key)) + LOGGER.error("The configuration section {} does not exist".format(key)) failed += 1 for key1, key2 in NEEDED_CONFIGS: sub_dct = dct[key1] if key1 in dct else dct[key1.upper()] if not (key2 in sub_dct or key2.upper() in sub_dct): - LOGGER.error('The configuration parameter [{}] {} does not exist'.format(key1, key2)) + LOGGER.error("The configuration parameter [{}] {} does not exist".format(key1, key2)) failed += 1 - assert failed == 0, 'ingest-pipeline plugin found {} configuration errors'.format(failed) + assert failed == 0, "ingest-pipeline plugin found {} configuration errors".format(failed) check_config() @@ -91,20 +92,22 @@ def config(section, key): elif key.upper() in dct[section]: rslt = dct[section][key.upper()] else: - raise AirflowConfigException('No config entry for [{}] {}'.format(section, key)) + raise AirflowConfigException("No config entry for [{}] {}".format(section, key)) # airflow config reader leaves quotes, which we want to strip for qc in ['"', "'"]: if rslt.startswith(qc) and rslt.endswith(qc): rslt = rslt.strip(qc) return rslt else: - raise AirflowConfigException('No config section [{}]'.format(section)) + raise AirflowConfigException("No config section [{}]".format(section)) AUTH_HELPER = None if not AuthHelper.isInitialized(): - AUTH_HELPER = AuthHelper.create(clientId=config('connections', 'app_client_id'), - clientSecret=config('connections', 'app_client_secret')) + AUTH_HELPER = AuthHelper.create( + clientId=config("connections", "app_client_id"), + clientSecret=config("connections", "app_client_secret"), + ) else: AUTH_HELPER = AuthHelper.instance() @@ -115,85 +118,86 @@ class HubmapApiInputException(Exception): class HubmapApiConfigException(Exception): pass - - + + class HubmapApiResponse: - def __init__(self): pass - + STATUS_OK = 200 STATUS_BAD_REQUEST = 400 STATUS_UNAUTHORIZED = 401 STATUS_NOT_FOUND = 404 STATUS_SERVER_ERROR = 500 - + @staticmethod def standard_response(status, payload): - json_data = json.dumps({ - 'response': payload - }) - resp = Response(json_data, status=status, mimetype='application/json') + json_data = json.dumps({"response": payload}) + resp = Response(json_data, status=status, mimetype="application/json") return resp - + @staticmethod def success(payload): return HubmapApiResponse.standard_response(HubmapApiResponse.STATUS_OK, payload) - + @staticmethod def error(status, error): - return HubmapApiResponse.standard_response(status, { - 'error': error - }) - + return HubmapApiResponse.standard_response(status, {"error": error}) + @staticmethod def bad_request(error): return HubmapApiResponse.error(HubmapApiResponse.STATUS_BAD_REQUEST, error) - + + @staticmethod + def bad_request_bulk(error, success): + return HubmapApiResponse.standard_response(HubmapApiResponse.STATUS_BAD_REQUEST, {"error": error, + "success": success}) + @staticmethod - def not_found(error='Resource not found'): + def not_found(error="Resource not found"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_NOT_FOUND, error) - + @staticmethod - def unauthorized(error='Not authorized to access this resource'): + def unauthorized(error="Not authorized to access this resource"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_UNAUTHORIZED, error) - + @staticmethod - def server_error(error='An unexpected problem occurred'): + def server_error(error="An unexpected problem occurred"): return HubmapApiResponse.error(HubmapApiResponse.STATUS_SERVER_ERROR, error) -@api_bp.route('/test') +@api_bp.route("/test") @secured(groups="HuBMAP-read") def api_test(): token = None - client_id = config('connections', 'app_client_id') + client_id = config("connections", "app_client_id") print("Client id: " + client_id) - client_secret = config('connections', 'app_client_secret') + client_secret = config("connections", "app_client_secret") print("Client secret: " + client_secret) - if 'MAUTHORIZATION' in request.headers: + if "MAUTHORIZATION" in request.headers: token = str(request.headers["MAUTHORIZATION"])[8:] - elif 'AUTHORIZATION' in request.headers: + elif "AUTHORIZATION" in request.headers: token = str(request.headers["AUTHORIZATION"])[7:] print("Token: " + token) - return HubmapApiResponse.success({'api_is_alive': True}) - + return HubmapApiResponse.success({"api_is_alive": True}) + -@api_bp.route('/version') +@api_bp.route("/version") def api_version(): - return HubmapApiResponse.success({'api': API_VERSION, - 'build': config('hubmap_api_plugin', 'build_number')}) + return HubmapApiResponse.success( + {"api": API_VERSION, "build": config("hubmap_api_plugin", "build_number")} + ) + - def format_dag_run(dag_run): return { - 'run_id': dag_run.run_id, - 'dag_id': dag_run.dag_id, - 'state': dag_run.get_state(), - 'start_date': (None if not dag_run.start_date else str(dag_run.start_date)), - 'end_date': (None if not dag_run.end_date else str(dag_run.end_date)), - 'external_trigger': dag_run.external_trigger, - 'execution_date': str(dag_run.execution_date) + "run_id": dag_run.run_id, + "dag_id": dag_run.dag_id, + "state": dag_run.get_state(), + "start_date": (None if not dag_run.start_date else str(dag_run.start_date)), + "end_date": (None if not dag_run.end_date else str(dag_run.end_date)), + "external_trigger": dag_run.external_trigger, + "execution_date": str(dag_run.execution_date), } @@ -221,27 +225,29 @@ def check_ingest_parms(provider, submission_id, process, full_path): On error, HubmapApiInputException is raised. Return value is None. """ - if process.startswith('mock.'): + if process.startswith("mock."): # test request; there should be pre-recorded response data here_dir = os.path.dirname(os.path.abspath(__file__)) - yml_path = os.path.join(here_dir, '../../dags/mock_data/', - process + '.yml') + yml_path = os.path.join(here_dir, "../../dags/mock_data/", process + ".yml") try: - with open(yml_path, 'r') as f: + with open(yml_path, "r") as f: mock_data = yaml.safe_load(f) except yaml.YAMLError as e: - LOGGER.error('mock data contains invalid YAML: {}'.format(e)) - raise HubmapApiInputException('Mock data is invalid YAML for process %s', process) + LOGGER.error("mock data contains invalid YAML: {}".format(e)) + raise HubmapApiInputException("Mock data is invalid YAML for process %s", process) except IOError as e: - LOGGER.error('mock data load failed: {}'.format(e)) - raise HubmapApiInputException('No mock data found for process %s', process) + LOGGER.error("mock data load failed: {}".format(e)) + raise HubmapApiInputException("No mock data found for process %s", process) else: - dct = {'provider': provider, 'submission_id': submission_id, 'process': process} - base_path = config('connections', 'docker_mount_path') + # dct = {"provider": provider, "submission_id": submission_id, "process": process} + base_path = config("connections", "docker_mount_path") if os.path.commonprefix([full_path, base_path]) != base_path: - LOGGER.error("Ingest directory {} is not a subdirectory of DOCKER_MOUNT_PATH" - .format(full_path)) - raise HubmapApiInputException("Ingest directory is not a subdirectory of DOCKER_MOUNT_PATH") + LOGGER.error( + "Ingest directory {} is not a subdirectory of DOCKER_MOUNT_PATH".format(full_path) + ) + raise HubmapApiInputException( + "Ingest directory is not a subdirectory of DOCKER_MOUNT_PATH" + ) if os.path.exists(full_path) and os.path.isdir(full_path): try: num_files = len(os.listdir(full_path)) @@ -252,20 +258,24 @@ def check_ingest_parms(provider, submission_id, process, full_path): LOGGER.error("Ingest directory {} contains no files".format(full_path)) raise HubmapApiInputException("Ingest directory contains no files") else: - LOGGER.error("cannot find the ingest data for '%s' '%s' '%s' (expected %s)" - % (provider, submission_id, process, full_path)) - raise HubmapApiInputException("Cannot find the expected ingest directory for '%s' '%s' '%s'" - % (provider, submission_id, process)) - + LOGGER.error( + "cannot find the ingest data for '%s' '%s' '%s' (expected %s)" + % (provider, submission_id, process, full_path) + ) + raise HubmapApiInputException( + "Cannot find the expected ingest directory for '%s' '%s' '%s'" + % (provider, submission_id, process) + ) + def _auth_tok_from_request(): """ Parse out and return the authentication token from the current request - """ - authorization = request.headers.get('authorization') - LOGGER.info('top of request_ingest.') - assert authorization[:len('BEARER')].lower() == 'bearer', 'authorization is not BEARER' - substr = authorization[len('BEARER'):].strip() + """ + authorization = request.headers.get("authorization") + LOGGER.info("top of request_ingest.") + assert authorization[: len("BEARER")].lower() == "bearer", "authorization is not BEARER" + substr = authorization[len("BEARER") :].strip() auth_tok = substr # LOGGER.info('auth_tok: %s', auth_tok) # reduce visibility of auth_tok return auth_tok @@ -275,7 +285,7 @@ def _auth_tok_from_environment(): """ Get the 'permanent authorization token' """ - tok = airflow_conf.as_dict()['connections']['APP_CLIENT_SECRET'] + tok = airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] return tok @@ -296,68 +306,72 @@ def _auth_tok_from_environment(): @csrf.exempt -@api_bp.route('/request_ingest', methods=['POST']) +@api_bp.route("/request_ingest", methods=["POST"]) # @secured(groups="HuBMAP-read") def request_ingest(): auth_tok = _auth_tok_from_environment() # decode input data = request.get_json(force=True) - - LOGGER.debug('request_ingest data: {}'.format(str(data))) + + LOGGER.debug("request_ingest data: {}".format(str(data))) # Test and extract required parameters try: - provider = _get_required_string(data, 'provider') - submission_id = _get_required_string(data, 'submission_id') - process = _get_required_string(data, 'process') - full_path = _get_required_string(data, 'full_path') + provider = _get_required_string(data, "provider") + submission_id = _get_required_string(data, "submission_id") + process = _get_required_string(data, "process") + full_path = _get_required_string(data, "full_path") except HubmapApiInputException as e: - return HubmapApiResponse.bad_request('Must specify {} to request data be ingested'.format(str(e))) + return HubmapApiResponse.bad_request( + "Must specify {} to request data be ingested".format(str(e)) + ) - process = process.lower() # necessary because config parser has made the corresponding string lower case + process = ( + process.lower() + ) # necessary because config parser has made the corresponding string lower case try: - dag_id = config('ingest_map', process) + dag_id = config("ingest_map", process) except AirflowConfigException: - return HubmapApiResponse.bad_request('{} is not a known ingestion process'.format(process)) - + return HubmapApiResponse.bad_request("{} is not a known ingestion process".format(process)) + try: check_ingest_parms(provider, submission_id, process, full_path) - + session = settings.Session() # Produce one and only one run - tz = pytz.timezone(config('core', 'timezone')) + tz = pytz.timezone(config("core", "timezone")) execution_date = datetime.now(tz) - LOGGER.info('starting {} with execution_date: {}'.format(dag_id, - execution_date)) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) - run_id = '{}_{}_{}'.format(submission_id, process, execution_date.isoformat()) + run_id = "{}_{}_{}".format(submission_id, process, execution_date.isoformat()) ingest_id = run_id - fernet = Fernet(config('core', 'fernet_key').encode()) + fernet = Fernet(config("core", "fernet_key").encode()) crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() - conf = {'provider': provider, - 'submission_id': submission_id, - 'process': process, - 'dag_id': dag_id, - 'run_id': run_id, - 'ingest_id': ingest_id, - 'crypt_auth_tok': crypt_auth_tok, - 'src_path': config('connections', 'src_path'), - 'lz_path': full_path - } + conf = { + "provider": provider, + "submission_id": submission_id, + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "ingest_id": ingest_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "lz_path": full_path, + } if find_dag_runs(session, dag_id, run_id, execution_date): # The run already happened?? - raise AirflowException('The request happened twice?') + raise AirflowException("The request happened twice?") try: dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) except AirflowException as err: LOGGER.error(err) raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) - LOGGER.info('dagrun follows: {}'.format(dr)) + LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiInputException as e: return HubmapApiResponse.bad_request(str(e)) @@ -368,8 +382,103 @@ def request_ingest(): except Exception as e: return HubmapApiResponse.server_error(str(e)) - return HubmapApiResponse.success({'ingest_id': ingest_id, - 'run_id': run_id}) + return HubmapApiResponse.success({"ingest_id": ingest_id, "run_id": run_id}) + + +@csrf.exempt +@api_bp.route("/request_bulk_ingest", methods=["POST"]) +# @secured(groups="HuBMAP-read") +def request_bulk_ingest(): + auth_tok = _auth_tok_from_environment() + error_msgs = [] + success_msgs = [] + + # decode input + data = request.get_json(force=True) + + LOGGER.debug("request_ingest data: {}".format(str(data))) + + for item in data: + # Test and extract required parameters + try: + provider = _get_required_string(item, "provider") + submission_id = _get_required_string(item, "submission_id") + process = _get_required_string(item, "process") + full_path = _get_required_string(item, "full_path") + except HubmapApiInputException as e: + error_msgs.append({"message": "Must specify {} to request data be ingested".format(str(e)), + "submission_id": "not_found"}) + continue + + process = ( + process.lower() + ) # necessary because config parser has made the corresponding string lower case + + try: + dag_id = config("ingest_map", process) + except AirflowConfigException: + error_msgs.append({"message": "{} is not a known ingestion process".format(process), + "submission_id": submission_id}) + continue + + try: + check_ingest_parms(provider, submission_id, process, full_path) + + session = settings.Session() + + # Produce one and only one run + tz = pytz.timezone(config("core", "timezone")) + execution_date = datetime.now(tz) + sleep(0.05) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) + + run_id = "{}_{}_{}".format(submission_id, process, execution_date.isoformat()) + ingest_id = run_id + fernet = Fernet(config("core", "fernet_key").encode()) + crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() + + conf = { + "provider": provider, + "submission_id": submission_id, + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "ingest_id": ingest_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "lz_path": full_path, + } + + if find_dag_runs(session, dag_id, run_id, execution_date): + error_msgs.append({"message": "run_id already exists {}".format(run_id), + "submission_id": submission_id}) + # The run already happened?? + continue + + try: + dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date, replace_microseconds=False) + except AirflowException as err: + LOGGER.error(err) + error_msgs.append({"message": "Attempt to trigger run produced an error: {}".format(err), + "submission_id": submission_id}) + continue + success_msgs.append({"ingest_id": ingest_id, "run_id": run_id, "submission_id": submission_id}) + LOGGER.info("dagrun follows: {}".format(dr)) + session.close() + except HubmapApiInputException as e: + error_msgs.append({"message": str(e), "submission_id": submission_id}) + except ValueError as e: + error_msgs.append({"message": str(e), "submission_id": submission_id}) + except AirflowException as e: + error_msgs.append({"message": str(e), "submission_id": submission_id}) + except Exception as e: + error_msgs.append({"message": str(e), "submission_id": submission_id}) + + if error_msgs: + return HubmapApiResponse.bad_request_bulk(error_msgs, success_msgs) + if success_msgs: + return HubmapApiResponse.success(success_msgs) + return HubmapApiResponse.bad_request("Nothing to ingest") def generic_invoke_dag_on_uuid(uuid, process_name): @@ -377,71 +486,71 @@ def generic_invoke_dag_on_uuid(uuid, process_name): process = process_name run_id = "empty" try: - dag_id = config('ingest_map', process) + dag_id = config("ingest_map", process) session = settings.Session() # Produce one and only one run - tz = pytz.timezone(config('core', 'timezone')) + tz = pytz.timezone(config("core", "timezone")) execution_date = datetime.now(tz) - LOGGER.info('starting {} with execution_date: {}'.format(dag_id, - execution_date)) + LOGGER.info("starting {} with execution_date: {}".format(dag_id, execution_date)) - run_id = '{}_{}_{}'.format(uuid, process, execution_date.isoformat()) - fernet = Fernet(config('core', 'fernet_key').encode()) + run_id = "{}_{}_{}".format(uuid, process, execution_date.isoformat()) + fernet = Fernet(config("core", "fernet_key").encode()) crypt_auth_tok = fernet.encrypt(auth_tok.encode()).decode() - conf = {'process': process, - 'dag_id': dag_id, - 'run_id': run_id, - 'crypt_auth_tok': crypt_auth_tok, - 'src_path': config('connections', 'src_path'), - 'uuid': uuid - } + conf = { + "process": process, + "dag_id": dag_id, + "run_id": run_id, + "crypt_auth_tok": crypt_auth_tok, + "src_path": config("connections", "src_path"), + "uuid": uuid, + } if find_dag_runs(session, dag_id, run_id, execution_date): # The run already happened?? - raise AirflowException('The request happened twice?') + raise AirflowException("The request happened twice?") try: dr = trigger_dag(dag_id, run_id, conf, execution_date=execution_date) except AirflowException as err: LOGGER.error(err) raise AirflowException("Attempt to trigger run produced an error: {}".format(err)) - LOGGER.info('dagrun follows: {}'.format(dr)) + LOGGER.info("dagrun follows: {}".format(dr)) session.close() except HubmapApiConfigException: - return HubmapApiResponse.bad_request(f'{process} does not map to a known DAG') + return HubmapApiResponse.bad_request(f"{process} does not map to a known DAG") except HubmapApiInputException as e: return HubmapApiResponse.bad_request(str(e)) except ValueError as e: return HubmapApiResponse.server_error(str(e)) except KeyError as e: - HubmapApiResponse.not_found(f'{e}') + HubmapApiResponse.not_found(f"{e}") except AirflowException as e: return HubmapApiResponse.server_error(str(e)) except Exception as e: return HubmapApiResponse.server_error(str(e)) - return HubmapApiResponse.success({'run_id': run_id}) + return HubmapApiResponse.success({"run_id": run_id}) @csrf.exempt -@api_bp.route('/uploads//validate', methods=['PUT']) +@api_bp.route("/uploads//validate", methods=["PUT"]) # @secured(groups="HuBMAP-read") def validate_upload_uuid(uuid): - return generic_invoke_dag_on_uuid(uuid, 'validate.upload') + return generic_invoke_dag_on_uuid(uuid, "validate.upload") # auth_tok = _auth_tok_from_request() # process = 'validate.upload' @csrf.exempt -@api_bp.route('/uploads//reorganize', methods=['PUT']) +@api_bp.route("/uploads//reorganize", methods=["PUT"]) # @secured(groups="HuBMAP-read") def reorganize_upload_uuid(uuid): - return generic_invoke_dag_on_uuid(uuid, 'reorganize.upload') - - + return generic_invoke_dag_on_uuid(uuid, "reorganize.upload") + + """ Parameters for this request: None @@ -451,8 +560,8 @@ def reorganize_upload_uuid(uuid): """ -@api_bp.route('get_process_strings') +@api_bp.route("get_process_strings") def get_process_strings(): dct = airflow_conf.as_dict() - psl = [s.upper() for s in dct['ingest_map']] if 'ingest_map' in dct else [] - return HubmapApiResponse.success({'process_strings': psl}) + psl = [s.upper() for s in dct["ingest_map"]] if "ingest_map" in dct else [] + return HubmapApiResponse.success({"process_strings": psl}) diff --git a/src/ingest-pipeline/misc/tools/split_and_create.py b/src/ingest-pipeline/misc/tools/split_and_create.py index b3cf57b5..3b9a3670 100755 --- a/src/ingest-pipeline/misc/tools/split_and_create.py +++ b/src/ingest-pipeline/misc/tools/split_and_create.py @@ -3,6 +3,7 @@ import argparse import json import re +import os import time from pathlib import Path from pprint import pprint @@ -89,19 +90,6 @@ def create_fake_uuid_generator(): yield rslt -# def get_canonical_assay_type(row, entity_factory, default_type): -# """ -# Convert assay type to canonical form, with fallback -# """ -# try: -# rslt = entity_factory.type_client.getAssayType(row["assay_type"]).name -# except Exception: -# print(f"fallback {row['assay_type']} {default_type}") -# rslt = FALLBACK_ASSAY_TYPE_TRANSLATIONS.get(row["assay_type"], default_type) -# print(f"{row['assay_type']} -> {rslt}") -# return rslt - - def get_canonical_assay_type(row, dataset_type=None): # TODO: check if this needs to be rewrite to support old style metadata file_dataset_type = row["assay_type"] if hasattr(row, "assay_type") else row["dataset_type"] @@ -182,19 +170,55 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): uuid = row["new_uuid"] old_data_path = row["data_path"] row["data_path"] = "." - old_contrib_path = Path(row["contributors_path"]) - new_contrib_path = Path("extras") / old_contrib_path.name - row["contributors_path"] = str(new_contrib_path) - if "antibodies_path" in row: - old_antibodies_path = Path(row["antibodies_path"]) - new_antibodies_path = Path("extras") / old_antibodies_path.name - row["antibodies_path"] = str(new_antibodies_path) + + # Contributors and antibodies should point to the path directly. + old_paths = [] + for path_index in ["contributors_path", "antibodies_path"]: + if old_path := row.get(path_index): + old_path = Path(old_path) + old_paths.append(old_path) + row[path_index] = str(Path("extras") / old_path.name) + print(f"Old paths to copy over {old_paths}") + + # Have to cover two cases + # 1. Case when non_global is set but there is no global/non_global directories + # 2. Case when non_global is not set but there are global/non_global directories + is_shared_upload = {"global", "non_global"} == { + x.name + for x in source_entity.full_path.glob("*global") + if x.is_dir() and x.name in ["global", "non_global"] + } + non_global_files = row.get("non_global_files") + print(f"Is {uuid} part of a shared upload? {is_shared_upload}") + if non_global_files: + print(f"Non global files: {non_global_files}") + # Catch case 1 + assert ( + is_shared_upload + ), f"{uuid} has non_global_files specified but missing global or non_global directories" + + # Generate a list of file paths for where non_global_files live in the upload + # Structure is {source_path: path_passed_in_metadata (destination path relative to root of dataset)} + non_global_files = { + source_entity.full_path / "non_global" / Path(x.strip()): Path(x.strip()) + for x in non_global_files.split(";") + if x.strip() + } + + # Iterate over source_paths and make sure they exist. + for non_global_file in non_global_files.keys(): + assert ( + non_global_file.exists() + ), f"Non global file {non_global_file.as_posix()} does not exist in {source_entity.full_path}" else: - old_antibodies_path = None - new_antibodies_path = None - # row['assay_type'] = row['canonical_assay_type'] + # Catch case 2 + assert ( + not is_shared_upload + ), f"{uuid} has empty non_global_files but has global & non_global directories" + row_df = pd.DataFrame([row]) row_df = row_df.drop(columns=["canonical_assay_type", "new_uuid"]) + if dryrun: kid_path = Path(SCRATCH_PATH) / uuid kid_path.mkdir(0o770, parents=True, exist_ok=True) @@ -204,28 +228,28 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): kid_path = Path(entity_factory.get_full_path(uuid)) row_df.to_csv(kid_path / f"{uuid}-metadata.tsv", header=True, sep="\t", index=False) - extras_path = kid_path / "extras" + dest_extras_path = kid_path / "extras" if components is not None: for component in components: component_df = pd.read_csv(component.get("metadata-file"), sep="\t") component_df_cp = component_df.query(f'data_path=="{old_data_path}"').copy() for _, row_component in component_df_cp.iterrows(): - old_component_data_path = row_component["data_path"] + # This loop updates the data_path for the component row_component["data_path"] = "." - old_component_contrib_path = Path(row_component["contributors_path"]) - new_component_contrib_path = Path("extras") / old_component_contrib_path.name - row_component["contributors_path"] = str(new_component_contrib_path) - if "antibodies_path" in row_component: - old_component_antibodies_path = Path(row["antibodies_path"]) - new_component_antibodies_path = ( - Path("extras") / old_component_antibodies_path.name - ) - row_component["antibodies_path"] = str(new_component_antibodies_path) - if dryrun: - print(f"copy {old_component_antibodies_path} to {extras_path}") - else: - copy2(source_entity.full_path / old_component_antibodies_path, extras_path) + old_component_paths = [] + for path_index in ["contributors_path", "antibodies_path"]: + if old_component_path := row_component.get(path_index): + old_component_path = Path(old_component_path) + old_component_paths.append(old_component_path) + row_component[path_index] = str(Path("extras") / old_component_path.name) + + print(f"Old component paths to copy over {old_component_paths}") + + copy_contrib_antibodies( + dest_extras_path, source_entity, old_component_paths, dryrun + ) + row_component = pd.DataFrame([row_component]) row_component.to_csv( kid_path / f"{component.get('assaytype')}-metadata.tsv", @@ -233,20 +257,48 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): sep="\t", index=False, ) - if extras_path.exists(): - assert extras_path.is_dir(), f"{extras_path} is not a directory" + + if is_shared_upload: + copy_shared_data(kid_path, source_entity, non_global_files, dryrun) else: - source_extras_path = source_entity.full_path / "extras" - if source_extras_path.exists(): - if dryrun: - print(f"copy {source_extras_path} to {extras_path}") - else: - copytree(source_extras_path, extras_path) + # This moves everything in the source_data_path over to the dataset path + # So part of non-shared uploads + copy_data_path(kid_path, source_entity.full_path / old_data_path, dryrun) + + # START REGION - INDEPENDENT OF SHARED/NON-SHARED STATUS + # This moves extras over to the dataset extras directory + copy_extras(dest_extras_path, source_entity, dryrun) + # This copies contrib and antibodies over to dataset + copy_contrib_antibodies( + dest_extras_path, + source_entity, + old_paths, + dryrun, + ) + # END REGION - INDEPENDENT OF SHARED/NON-SHARED STATUS + + print(f"{old_data_path} -> {uuid} -> full path: {kid_path}") + + +def copy_shared_data(kid_path, source_entity, non_global_files, dryrun): + # Copy global files over to dataset directory + if dryrun: + print(f"Copy files from {source_entity.full_path / 'global'} to {kid_path}") + else: + print(f"Copy files from {source_entity.full_path / 'global'} to {kid_path}") + copytree(source_entity.full_path / "global", kid_path, dirs_exist_ok=True) + # Copy over non-global files to dataset directory + for source_non_global_file, dest_relative_non_global_file in non_global_files.items(): + dest_non_global_file = kid_path / dest_relative_non_global_file + if dryrun: + print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") else: - if dryrun: - print(f"creating {extras_path}") - extras_path.mkdir(0o770) - source_data_path = source_entity.full_path / old_data_path + print(f"Copy file from {source_non_global_file} to {dest_non_global_file}") + dest_non_global_file.parent.mkdir(parents=True, exist_ok=True) + copy2(source_non_global_file, dest_non_global_file) + + +def copy_data_path(kid_path, source_data_path, dryrun): for elt in source_data_path.glob("*"): dst_file = kid_path / elt.name if dryrun: @@ -261,28 +313,42 @@ def populate(row, source_entity, entity_factory, dryrun=False, components=None): sub_elt.rename(kid_path / elt.name / sub_elt.name) continue elt.rename(dst_file) - if dryrun: - print(f"copy {old_contrib_path} to {extras_path}") + + +def copy_extras(dest_extras_path, source_entity, dryrun): + if dest_extras_path.exists(): + assert dest_extras_path.is_dir(), f"{dest_extras_path} is not a directory" else: - src_path = source_entity.full_path / old_contrib_path - if src_path.exists(): - copy2(src_path, extras_path) + source_extras_path = source_entity.full_path / "extras" + if source_extras_path.exists(): + if dryrun: + print(f"copy {source_extras_path} to {dest_extras_path}") + else: + print(f"copy {source_extras_path} to {dest_extras_path}") + copytree(source_extras_path, dest_extras_path, dirs_exist_ok=True) else: - moved_path = kid_path / new_contrib_path - print(f"""Probably already copied/moved {src_path} - to {moved_path} {"it exists" if moved_path.exists() else "missing file"}""") - if old_antibodies_path is not None: + if dryrun: + print(f"creating {dest_extras_path}") + dest_extras_path.mkdir(0o770) + + +def copy_contrib_antibodies(dest_extras_path, source_entity, old_paths, dryrun): + for old_path in old_paths: if dryrun: - print(f"copy {old_antibodies_path} to {extras_path}") + print(f"copy {old_path} to {dest_extras_path}") else: - src_path = source_entity.full_path / old_antibodies_path + src_path = source_entity.full_path / old_path + if src_path.exists(): - copy2(source_entity.full_path / old_antibodies_path, extras_path) + dest_extras_path.mkdir(parents=True, exist_ok=True) + copy2(src_path, dest_extras_path / old_path.name) + print(f"copy {old_path} to {dest_extras_path}") else: - moved_path = kid_path / new_antibodies_path - print(f"""Probably already copied/moved {src_path} - to {moved_path} {"it exists" if moved_path.exists() else "missing file"}""") - print(f"{old_data_path} -> {uuid} -> full path: {kid_path}") + moved_path = dest_extras_path / old_path.name + print( + f"""Probably already copied/moved {src_path} + to {moved_path} {"it exists" if moved_path.exists() else "missing file"}""" + ) def apply_special_case_transformations( diff --git a/src/ingest-pipeline/requirements.txt b/src/ingest-pipeline/requirements.txt index 3c214144..4724b1bd 100644 --- a/src/ingest-pipeline/requirements.txt +++ b/src/ingest-pipeline/requirements.txt @@ -4,7 +4,7 @@ hubmap-commons==2.1.3 prov==1.5.1 # pylibczi>=1.1.1 # tifffile==2020.12.8 -tifffile==2020.09.3 +tifffile==2021.11.2 xmltodict==0.13.0 pyimzml==1.5.2 apache-airflow[celery,crypto,postgres,redis,ssh,amazon]==2.5.3 diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index c7b2ae7b..0ed60d63 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit c7b2ae7b93238a00c3918e950920f357d3329add +Subproject commit 0ed60d63617a8ae25d29ebf39b0f8485e4c7ab5a