Skip to content

Commit

Permalink
Merge branch 'master' into release_3
Browse files Browse the repository at this point in the history
  • Loading branch information
sunset666 committed Feb 1, 2024
2 parents 9cbd18c + 7110eac commit 6777cbf
Show file tree
Hide file tree
Showing 32 changed files with 1,334 additions and 703 deletions.
4 changes: 2 additions & 2 deletions src/ingest-pipeline/airflow/dags/azimuth_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
HMDAG,
get_queue_resource,
get_preserve_scratch_resource,
get_datatype_previous_version,
get_dataset_type_previous_version,
get_dataname_previous_version,
build_provenance_function,
get_assay_previous_version,
Expand Down Expand Up @@ -235,7 +235,7 @@ def build_cwltool_cmd4(**kwargs):
"previous_revision_uuid_callable": get_previous_revision_uuid,
"http_conn_id": "ingest_api_connection",
"dataset_name_callable": get_dataname_previous_version,
"dataset_types_callable": get_datatype_previous_version,
"dataset_type_callable": get_dataset_type_previous_version,
},
)

Expand Down
2 changes: 1 addition & 1 deletion src/ingest-pipeline/airflow/dags/bulk_atacseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def build_cwltool_cmd1(**kwargs):
'previous_revision_uuid_callable': get_previous_revision_uuid,
'http_conn_id': 'ingest_api_connection',
'dataset_name_callable': build_dataset_name,
'dataset_types': ['bulk_atacseq'],
'pipeline_shorthand': 'BWA + MACS2',
},
)

Expand Down
2 changes: 1 addition & 1 deletion src/ingest-pipeline/airflow/dags/celldive_deepcell.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs):
'previous_revision_uuid_callable': get_previous_revision_uuid,
'http_conn_id': 'ingest_api_connection',
'dataset_name_callable': build_dataset_name,
'dataset_types': ['celldive_deepcell'],
'pipeline_shorthand': 'DeepCell + SPRM',
},
)

Expand Down
2 changes: 1 addition & 1 deletion src/ingest-pipeline/airflow/dags/codex_cytokit.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs):
'previous_revision_uuid_callable': get_previous_revision_uuid,
'http_conn_id': 'ingest_api_connection',
'dataset_name_callable': build_dataset_name,
'dataset_types': ['codex_cytokit']
'pipeline_shorthand': 'Cytokit + SPRM'
}
)

Expand Down
113 changes: 96 additions & 17 deletions src/ingest-pipeline/airflow/dags/extra_utils.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,109 @@
from pprint import pprint
from typing import Tuple
import json
from typing import List, Dict
from pathlib import Path
from csv import DictReader

from airflow.providers.http.hooks.http import HttpHook
from airflow.configuration import conf as airflow_conf
from pprint import pprint
from typing import Tuple


def check_link_published_drvs(uuid: str) -> Tuple[bool, str]:
def check_link_published_drvs(uuid: str, auth_tok: str) -> Tuple[bool, str]:
needs_previous_version = False
published_uuid = ''
endpoint = f'/children/{uuid}'
auth_tok = ''.join(e for e in airflow_conf.as_dict()['connections']['APP_CLIENT_SECRET'] if e.isalnum())
published_uuid = ""
endpoint = f"/children/{uuid}"
headers = {
'authorization': 'Bearer ' + auth_tok,
'content-type': 'application/json',
'X-Hubmap-Application': 'ingest-pipeline'}
"content-type": "application/json",
"X-Hubmap-Application": "ingest-pipeline",
"Authorization": f"Bearer {auth_tok}"
}
extra_options = {}

http_hook = HttpHook('GET', http_conn_id='entity_api_connection')
http_hook = HttpHook("GET", http_conn_id="entity_api_connection")

response = http_hook.run(endpoint,
headers=headers,
extra_options=extra_options)
print('response: ')
response = http_hook.run(endpoint, headers=headers, extra_options=extra_options)
print("response: ")
pprint(response.json())
for data in response.json():
if data.get('entity_type') in ('Dataset', 'Publication') and data.get('status') == 'Published':
if (
data.get("entity_type") in ("Dataset", "Publication")
and data.get("status") == "Published"
):
needs_previous_version = True
published_uuid = data.get('uuid')
published_uuid = data.get("uuid")
return needs_previous_version, published_uuid


class SoftAssayClient:
def __init__(self, metadata_files: List, auth_tok: str):
self.assay_components = []
self.primary_assay = {}
self.is_multiassay = True
for metadata_file in metadata_files:
try:
rows = self.__read_rows(metadata_file, encoding="UTF-8")
except Exception as e:
print(f"Error {e} reading metadata {metadata_file}")
return
assay_type = self.__get_assaytype_data(row=rows[0], auth_tok=auth_tok)
data_component = {
"assaytype": assay_type.get("assaytype"),
"dataset_type": assay_type.get("dataset-type"),
"contains-pii": assay_type.get("contains-pii", True),
"primary": assay_type.get("primary", False),
"metadata-file": metadata_file,
}
if not assay_type.get("must-contain"):
print(f"Component {assay_type}")
self.assay_components.append(data_component)
else:
print(f"Primary {assay_type}")
self.primary_assay = data_component
if not self.primary_assay and len(self.assay_components) == 1:
self.primary_assay = self.assay_components.pop()
self.is_multiassay = False

def __get_assaytype_data(
self,
row: Dict,
auth_tok: str,
) -> Dict:
http_hook = HttpHook("POST", http_conn_id="ingest_api_connection")
endpoint = f"/assaytype"
headers = {
"Authorization": f"Bearer {auth_tok}",
"Content-Type": "application/json",
}
response = http_hook.run(endpoint=endpoint, headers=headers, data=json.dumps(row))
response.raise_for_status()
return response.json()

def __get_context_of_decode_error(self, e: UnicodeDecodeError) -> str:
buffer = 20
codec = "latin-1" # This is not the actual codec of the string!
before = e.object[max(e.start - buffer, 0) : max(e.start, 0)].decode(codec) # noqa
problem = e.object[e.start : e.end].decode(codec) # noqa
after = e.object[e.end : min(e.end + buffer, len(e.object))].decode(codec) # noqa
in_context = f"{before} [ {problem} ] {after}"
return f'Invalid {e.encoding} because {e.reason}: "{in_context}"'

def __dict_reader_wrapper(self, path, encoding: str) -> list:
with open(path, encoding=encoding) as f:
rows = list(DictReader(f, dialect="excel-tab"))
return rows

def __read_rows(self, path: Path, encoding: str) -> List:
if not Path(path).exists():
message = {"File does not exist": f"{path}"}
raise message
try:
rows = self.__dict_reader_wrapper(path, encoding)
if not rows:
message = {"File has no data rows": f"{path}"}
else:
return rows
except IsADirectoryError:
message = {"Expected a TSV, but found a directory": f"{path}"}
except UnicodeDecodeError as e:
message = {"Decode Error": self.__get_context_of_decode_error(e)}
raise message
2 changes: 1 addition & 1 deletion src/ingest-pipeline/airflow/dags/gen_pub_ancillary.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def build_ancillary_data(**kwargs):
'previous_revision_uuid_callable': get_previous_revision_uuid,
'http_conn_id': 'ingest_api_connection',
'dataset_name_callable': build_dataset_name,
'dataset_types': ["publication_ancillary"]
'pipeline_shorthand': 'ancillary'
}
)

Expand Down
Loading

0 comments on commit 6777cbf

Please sign in to comment.