Skip to content

Commit

Permalink
Merge pull request #850 from hubmapconsortium/sunset666/component_met…
Browse files Browse the repository at this point in the history
…adata

Building metadata for multi-assay components
  • Loading branch information
sunset666 authored Mar 4, 2024
2 parents 9147819 + d7080aa commit 54395f1
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 173 deletions.
23 changes: 22 additions & 1 deletion src/ingest-pipeline/airflow/dags/extra_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ def check_link_published_drvs(uuid: str, auth_tok: str) -> Tuple[bool, str]:
return needs_previous_version, published_uuid


def get_component_uuids(uuid:str, auth_tok: str) -> List:
children = []
endpoint = f"/children/{uuid}"
headers = {
"content-type": "application/json",
"X-Hubmap-Application": "ingest-pipeline",
"Authorization": f"Bearer {auth_tok}"
}
extra_options = {}

http_hook = HttpHook("GET", http_conn_id="entity_api_connection")

response = http_hook.run(endpoint, headers=headers, extra_options=extra_options)
print("response: ")
pprint(response.json())
for data in response.json():
if data.get("creation_action") == "Multi-Assay Split":
children.append(data.get("uuid"))
return children


class SoftAssayClient:
def __init__(self, metadata_files: List, auth_tok: str):
self.assay_components = []
Expand All @@ -48,7 +69,7 @@ def __init__(self, metadata_files: List, auth_tok: str):
assay_type = self.__get_assaytype_data(row=rows[0], auth_tok=auth_tok)
data_component = {
"assaytype": assay_type.get("assaytype"),
"dataset_type": assay_type.get("dataset-type"),
"dataset-type": assay_type.get("dataset-type"),
"contains-pii": assay_type.get("contains-pii", True),
"primary": assay_type.get("primary", False),
"metadata-file": metadata_file,
Expand Down
233 changes: 233 additions & 0 deletions src/ingest-pipeline/airflow/dags/multiassay_component_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import os
import yaml
import utils
from pprint import pprint

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowException
from airflow.configuration import conf as airflow_conf
from datetime import datetime, timedelta

from utils import (
HMDAG,
get_queue_resource,
get_preserve_scratch_resource,
create_dataset_state_error_callback,
pythonop_md_consistency_tests,
make_send_status_msg_function,
get_tmp_dir_path,
localized_assert_json_matches_schema as assert_json_matches_schema,
pythonop_get_dataset_state,
encrypt_tok,
)

from hubmap_operators.common_operators import (
CreateTmpDirOperator,
CleanupTmpDirOperator,
)


def get_uuid_for_error(**kwargs):
"""
Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise.
"""
return None


def get_dataset_uuid(**kwargs):
return kwargs["dag_run"].conf["uuid"]


def get_dataset_lz_path(**kwargs):
ctx = kwargs["dag_run"].conf
return ctx["lz_path"]


default_args = {
"owner": "hubmap",
"depends_on_past": False,
"start_date": datetime(2019, 1, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
"xcom_push": True,
"queue": get_queue_resource("rebuild_metadata"),
"on_failure_callback": create_dataset_state_error_callback(get_uuid_for_error),
}

with HMDAG(
"multiassay_component_metadata",
schedule_interval=None,
is_paused_upon_creation=False,
default_args=default_args,
user_defined_macros={
"tmp_dir_path": get_tmp_dir_path,
"preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"),
},
) as dag:

t_create_tmpdir = CreateTmpDirOperator(task_id="create_temp_dir")

def check_one_uuid(uuid, **kwargs):
"""
Look up information on the given uuid or HuBMAP identifier.
Returns:
- the uuid, translated from an identifier if necessary
- data type(s) of the dataset
- local directory full path of the dataset
"""
print(f"Starting uuid {uuid}")
my_callable = lambda **kwargs: uuid
ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=my_callable, **kwargs)
if not ds_rslt:
raise AirflowException(f"Invalid uuid/doi for group: {uuid}")
print("ds_rslt:")
pprint(ds_rslt)

for key in ["status", "uuid", "local_directory_full_path", "metadata", "dataset_type"]:
assert key in ds_rslt, f"Dataset status for {uuid} has no {key}"

if not ds_rslt["status"] in ["New", "Error", "QA", "Published"]:
raise AirflowException(f"Dataset {uuid} is not QA or better")

return (
ds_rslt["uuid"],
ds_rslt["local_directory_full_path"],
ds_rslt["metadata"],
ds_rslt["dataset_type"],
)

def check_uuids(**kwargs):
print("dag_run conf follows:")
pprint(kwargs["dag_run"].conf)

try:
assert_json_matches_schema(
kwargs["dag_run"].conf, "launch_checksums_metadata_schema.yml"
)
except AssertionError as e:
print("invalid metadata follows:")
pprint(kwargs["dag_run"].conf)
raise

uuid, lz_path, metadata, dataset_type = check_one_uuid(
kwargs["dag_run"].conf["uuid"], **kwargs
)
print(f"filtered metadata: {metadata}")
print(f"filtered paths: {lz_path}")
kwargs["dag_run"].conf["lz_path"] = lz_path
kwargs["dag_run"].conf["src_path"] = airflow_conf.as_dict()["connections"][
"src_path"
].strip("'")
kwargs["dag_run"].conf["dataset_type"] = dataset_type
kwargs["ti"].xcom_push(key="dataset_type", value=dataset_type)

t_check_uuids = PythonOperator(
task_id="check_uuids",
python_callable=check_uuids,
provide_context=True,
op_kwargs={
"crypt_auth_tok": encrypt_tok(
airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"]
).decode(),
},
)

t_run_md_extract = BashOperator(
task_id="run_md_extract",
bash_command=""" \
lz_dir="{{dag_run.conf.lz_path}}" ; \
component_type="{{dag_run.conf.dataset_type}}" ; \
src_dir="{{dag_run.conf.src_path}}/md" ; \
top_dir="{{dag_run.conf.src_path}}" ; \
work_dir="{{tmp_dir_path(run_id)}}" ; \
cd $work_dir ; \
env PYTHONPATH=${PYTHONPATH}:$top_dir \
${PYTHON_EXE} $src_dir/metadata_extract.py --out ./"$component_type"-rslt.yml --yaml "$lz_dir" \
--component "$component_type" >> session.log 2> error.log ; \
echo $? ; \
if [ -s error.log ] ; \
then echo 'ERROR!' `cat error.log` >> session.log ; \
else rm error.log ; \
fi
""",
env={
"AUTH_TOK": (
utils.get_auth_tok(
**{
"crypt_auth_tok": utils.encrypt_tok(
airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"]
).decode()
}
)
),
"PYTHON_EXE": os.environ["CONDA_PREFIX"] + "/bin/python",
"INGEST_API_URL": os.environ["AIRFLOW_CONN_INGEST_API_CONNECTION"],
"COMPONENTS_ASSAY_TYPE": "1",
},
)

def xcom_consistency_puller(**kwargs):
return kwargs["ti"].xcom_pull(task_ids="check_uuids", key="dataset_type")

t_md_consistency_tests = PythonOperator(
task_id="md_consistency_tests",
python_callable=pythonop_md_consistency_tests,
provide_context=True,
op_kwargs={"metadata_fname": "rslt.yml", "component": xcom_consistency_puller},
)

def read_metadata_file(**kwargs):
md_fname = os.path.join(
get_tmp_dir_path(kwargs["run_id"]),
kwargs["ti"].xcom_pull(task_ids="check_uuids", key="dataset_type") + "-rslt.yml",
)
with open(md_fname, "r") as f:
scanned_md = yaml.safe_load(f)
return scanned_md

send_status_msg = make_send_status_msg_function(
dag_file=__file__,
retcode_ops=["run_md_extract", "md_consistency_tests"],
cwl_workflows=[],
dataset_uuid_fun=get_dataset_uuid,
dataset_lz_path_fun=get_dataset_lz_path,
metadata_fun=read_metadata_file,
include_file_metadata=False,
)

def wrapped_send_status_msg(**kwargs):
if send_status_msg(**kwargs):
scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read
kwargs["ti"].xcom_push(
key="collectiontype",
value=(scanned_md["collectiontype"] if "collectiontype" in scanned_md else None),
)
if "assay_type" in scanned_md:
assay_type = scanned_md["assay_type"]
elif "metadata" in scanned_md and "assay_type" in scanned_md["metadata"]:
assay_type = scanned_md["metadata"]["assay_type"]
else:
assay_type = None
kwargs["ti"].xcom_push(key="assay_type", value=assay_type)
else:
kwargs["ti"].xcom_push(key="collectiontype", value=None)

t_send_status = PythonOperator(
task_id="send_status_msg",
python_callable=wrapped_send_status_msg,
provide_context=True,
trigger_rule="all_done",
op_kwargs={
"crypt_auth_tok": encrypt_tok(
airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"]
).decode(),
},
)

t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_temp_dir")

t_check_uuids >> t_create_tmpdir >> t_run_md_extract >> t_md_consistency_tests >> t_send_status >> t_cleanup_tmpdir
Loading

0 comments on commit 54395f1

Please sign in to comment.