Skip to content

Commit

Permalink
Merge branch 'master' into release_3
Browse files Browse the repository at this point in the history
  • Loading branch information
sunset666 committed Feb 21, 2024
2 parents 6777cbf + 16790d1 commit 3d005da
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq
Submodule salmon-rnaseq updated 39 files
+0 −65 bin/analysis/add_slideseq_coordinates.py
+179 −0 bin/analysis/add_spatial_coordinates.py
+17 −9 bin/analysis/alevin_to_anndata.py
+21 −5 bin/analysis/annotate_cells.py
+444 −0 bin/analysis/read_visium_positions.py
+5 −0 bin/analysis/scanpy_entry_point.py
+11 −3 bin/analysis/scvelo_analysis.py
+102 −0 bin/analysis/squidpy_entry_point.py
+8 −0 bin/common/common.py
+31 −4 bin/salmon/salmon_wrapper.py
+1 −1 bin/trim_reads/trim_reads.py
+ data/ensembl_hugo_mapping.json.xz
+4,992 −0 data/visium-v1_coordinates.txt
+4,992 −0 data/visium-v2_coordinates.txt
+4,992 −0 data/visium-v3_coordinates.txt
+4,992 −0 data/visium-v4_coordinates.txt
+14,336 −0 data/visium-v5_coordinates.txt
+1 −1 docker/analysis/Dockerfile
+6 −1 docker/analysis/requirements.txt
+10 −0 docker/salmon/Dockerfile
+1 −0 docker/salmon/requirements.txt
+34 −0 docker/squidpy/Dockerfile
+6 −0 docker/squidpy/requirements.txt
+1 −1 docker/trim_reads/Dockerfile
+1 −0 docker_images.txt
+57 −6 pipeline.cwl
+1 −1 steps/bulk-salmon.cwl
+1 −1 steps/compute-qc-metrics.cwl
+1 −1 steps/expression-matrix.cwl
+1 −1 steps/fastqc.cwl
+11 −1 steps/salmon-quantification.cwl
+1 −1 steps/salmon-quantification/adjust-barcodes.cwl
+6 −2 steps/salmon-quantification/alevin-to-anndata.cwl
+12 −2 steps/salmon-quantification/annotate-cells.cwl
+1 −1 steps/salmon-quantification/salmon.cwl
+1 −1 steps/salmon-quantification/trim-reads.cwl
+2 −2 steps/scanpy-analysis.cwl
+9 −3 steps/scvelo-analysis.cwl
+50 −0 steps/squidpy-analysis.cwl
4 changes: 2 additions & 2 deletions src/ingest-pipeline/airflow/dags/launch_multi_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ def check_one_uuid(
print("ds_rslt:")
pprint(ds_rslt)

for key in ["status", "uuid", "data_types", "local_directory_full_path", "metadata"]:
for key in ["status", "uuid", "local_directory_full_path", "metadata"]:
assert key in ds_rslt, f"Dataset status for {uuid} has no {key}"

if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]:
raise AirflowException(f"Dataset {uuid} is not QA or better")

dt = ds_rslt["data_types"]
dt = ds_rslt["dataset_type"]
if isinstance(dt, str) and dt.startswith("[") and dt.endswith("]"):
dt = ast.literal_eval(dt)
print(f"parsed dt: {dt}")
Expand Down
16 changes: 14 additions & 2 deletions src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import inspect
from datetime import datetime, timedelta
from pathlib import Path
from pprint import pprint
Expand All @@ -16,16 +17,17 @@
get_auth_tok,
get_preserve_scratch_resource,
get_queue_resource,
get_soft_data_assaytype,
make_send_status_msg_function,
pythonop_get_dataset_state,
pythonop_maybe_keep,
get_soft_data_assaytype,
)

from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.http.hooks.http import HttpHook

sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"'))
from submodules import ingest_validation_tools_upload # noqa E402
Expand Down Expand Up @@ -108,6 +110,11 @@ def run_validation(**kwargs):
plugin_path = [path for path in ingest_validation_tests.__path__][0]

ignore_globs = [uuid, "extras", "*metadata.tsv", "validation_report.txt"]
app_context = {
"entities_url": HttpHook.get_connection("entity_api_connection").host + "/entities/",
"ingest_url": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"],
"request_header": {"X-Hubmap-Application": "ingest-pipeline"},
}
#
# Uncomment offline=True below to avoid validating orcid_id URLs &etc
#
Expand All @@ -120,6 +127,7 @@ def run_validation(**kwargs):
add_notes=False,
ignore_deprecation=True,
globus_token=get_auth_tok(**kwargs),
app_context=app_context,
)
# Scan reports an error result
errors = upload.get_errors(plugin_kwargs=kwargs)
Expand All @@ -134,6 +142,7 @@ def run_validation(**kwargs):
f.write(report.as_text())
return 1
else:
kwargs["ti"].xcom_push(key="ivt_path", value=inspect.getfile(upload.__class__))
return 0

t_run_validation = PythonOperator(
Expand Down Expand Up @@ -258,7 +267,10 @@ def flex_maybe_spawn(**kwargs):
"parent_lz_path": lz_path,
"parent_submission_id": uuid,
"metadata": md,
"dag_provenance_list": utils.get_git_provenance_list(__file__),
"dag_provenance_list": utils.get_git_provenance_list(
[__file__,
kwargs["ti"].xcom_pull(task_ids="run_validation", key="ivt_path")]
),
}
for next_dag in utils.downstream_workflow_iter(collectiontype, assay_type):
yield next_dag, payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def set_entity_api_status(self) -> Dict:
response = http_hook.run(
endpoint, json.dumps(data), headers, self.extras["extra_options"]
)
logging.info(f"""Response: {response.json()}""")
return response.json()
except Exception as e:
raise StatusChangerException(
Expand Down
15 changes: 11 additions & 4 deletions src/ingest-pipeline/airflow/dags/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ def my_callable(**kwargs):
ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs)
organ_list = list(set(ds_rslt["organs"]))
organ_code = organ_list[0] if len(organ_list) == 1 else "multi"
pipeline_shorthand = "Kaggle-1 Glomerulus Segmentation" if organ_code in ["LK", "RK"] else "Image Pyramid"
pipeline_shorthand = (
"Kaggle-1 Glomerulus Segmentation" if organ_code in ["LK", "RK"] else "Image Pyramid"
)

return f"{ds_rslt['dataset_type']} [{pipeline_shorthand}]"

Expand Down Expand Up @@ -804,7 +806,7 @@ def pythonop_send_create_dataset(**kwargs) -> str:
"dataset_type": dataset_type,
"group_uuid": parent_group_uuid,
"contains_human_genetic_sequences": False,
"creation_action": "Central Process"
"creation_action": "Central Process",
}
if "previous_revision_uuid_callable" in kwargs:
previous_revision_uuid = kwargs["previous_revision_uuid_callable"](**kwargs)
Expand Down Expand Up @@ -942,6 +944,7 @@ def pythonop_get_dataset_state(**kwargs) -> Dict:
headers = {
"authorization": f"Bearer {auth_tok}",
"content-type": "application/json",
"Cache-Control": "no-cache",
"X-Hubmap-Application": "ingest-pipeline",
}
http_hook = HttpHook(method, http_conn_id="entity_api_connection")
Expand Down Expand Up @@ -1378,8 +1381,12 @@ def my_callable(**kwargs):
status = "QA"
else:
status = ds_rslt.get("status", "QA")
if status in ["Processing", "New"]:
status = "QA"
if status in ["Processing", "New", "Invalid"]:
status = (
"QA"
if kwargs["dag_run"].conf["dag_id"] == "scan_and_begin_processing"
else "Submitted"
)
if metadata_fun:
if not contacts:
contacts = ds_rslt.get("contacts", [])
Expand Down
14 changes: 12 additions & 2 deletions src/ingest-pipeline/airflow/dags/validate_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -25,6 +26,7 @@
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
from airflow.providers.http.hooks.http import HttpHook

sys.path.append(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"'))

Expand Down Expand Up @@ -72,7 +74,7 @@ def my_callable(**kwargs):
print("ds_rslt:")
pprint(ds_rslt)

for key in ["entity_type", "status", "uuid", "data_types", "local_directory_full_path"]:
for key in ["entity_type", "status", "uuid", "local_directory_full_path"]:
assert key in ds_rslt, f"Dataset status for {uuid} has no {key}"

if ds_rslt["entity_type"] != "Upload":
Expand Down Expand Up @@ -101,6 +103,11 @@ def run_validation(**kwargs):
plugin_path = [path for path in ingest_validation_tests.__path__][0]

ignore_globs = [uuid, "extras", "*metadata.tsv", "validation_report.txt"]
app_context = {
"entities_url": HttpHook.get_connection("entity_api_connection").host + "/entities/",
"ingest_url": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"],
"request_header": {"X-Hubmap-Application": "ingest-pipeline"},
}
#
# Uncomment offline=True below to avoid validating orcid_id URLs &etc
#
Expand All @@ -111,8 +118,11 @@ def run_validation(**kwargs):
plugin_directory=plugin_path,
# offline=True, # noqa E265
add_notes=False,
extra_parameters={"coreuse": get_threads_resource("validate_upload", "run_validation")},
extra_parameters={
"coreuse": get_threads_resource("validate_upload", "run_validation")
},
globus_token=get_auth_tok(**kwargs),
app_context=app_context,
)
# Scan reports an error result
report = ingest_validation_tools_error_report.ErrorReport(
Expand Down
Loading

0 comments on commit 3d005da

Please sign in to comment.