diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index f6190efc..3bec682c 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit f6190efc24b11543da13e4a6f20db43bb16d69be +Subproject commit 3bec682c5329af2647aa1fc0fb58b153053ade6d diff --git a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py index 745ea3e4..36a1db65 100644 --- a/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py +++ b/src/ingest-pipeline/airflow/dags/launch_multi_analysis.py @@ -73,15 +73,13 @@ def check_uuids(**kwargs): for uuid in uuid_l: print(f'Starting uuid {uuid}') my_callable = lambda **kwargs: uuid - rslt=utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable, - http_conn_id='ingest_api_connection', - **kwargs) - if not rslt: + ds_rslt=utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable, + http_conn_id='ingest_api_connection', + **kwargs) + if not ds_rslt: raise AirflowException(f'Invalid uuid/doi for group: {uuid}') - print('rslt:') - pprint(rslt) - assert 'dataset' in rslt, f"Status for {uuid} has no dataset entry" - ds_rslt = rslt['dataset'] + print('ds_rslt:') + pprint(ds_rslt) for key in ['status', 'uuid', 'data_types', 'local_directory_full_path']: assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" diff --git a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py index d35c94dc..35c14fdc 100644 --- a/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py +++ b/src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py @@ -18,7 +18,10 @@ from hubmap_operators.flex_multi_dag_run import FlexMultiDagRunOperator import utils -from utils import localized_assert_json_matches_schema as assert_json_matches_schema +from utils import ( + localized_assert_json_matches_schema as assert_json_matches_schema, + make_send_status_msg_function + ) def get_dataset_uuid(**kwargs): @@ -48,53 +51,29 @@ def get_dataset_uuid(**kwargs): default_args=default_args, user_defined_macros={'tmp_dir_path' : utils.get_tmp_dir_path} ) as dag: - - def send_status_msg(**kwargs): - ctx = kwargs['dag_run'].conf - retcode_ops = ['run_md_extract', 'md_consistency_tests'] - print('raw: ', [kwargs['ti'].xcom_pull(task_ids=op) for op in retcode_ops]) - retcodes = [int(kwargs['ti'].xcom_pull(task_ids=op)) - for op in retcode_ops] - retcode_dct = {k:v for k, v in zip(retcode_ops, retcodes)} - print('retcodes: ', retcode_dct) - success = all([rc == 0 for rc in retcodes]) - ds_dir = ctx['lz_path'] - http_conn_id='ingest_api_connection' - endpoint='/datasets/status' - method='PUT' - headers={'authorization' : 'Bearer ' + utils.decrypt_tok(ctx['crypt_auth_tok'].encode()), - 'content-type' : 'application/json'} - print('headers:') - pprint(headers) - extra_options=[] - - http = HttpHook(method, - http_conn_id=http_conn_id) - - if success: - md_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']), - 'rslt.yml') - with open(md_fname, 'r') as f: - scanned_md = yaml.safe_load(f) - dag_prv = utils.get_git_provenance_list([__file__]) - md = {'dag_provenance_list' : dag_prv, - 'metadata' : scanned_md} - # Inclusion of files information in this message is getting disabled due to size - #md.update(utils.get_file_metadata_dict(ds_dir, - # utils.get_tmp_dir_path(kwargs['run_id']))) - try: - assert_json_matches_schema(md, 'dataset_metadata_schema.yml') - data = {'dataset_id' : ctx['submission_id'], - 'status' : 'QA', - 'message' : 'the process ran', - 'metadata': md} - except AssertionError as e: - print('invalid metadata follows:') - pprint(md) - data = {'dataset_id' : ctx['submission_id'], - 'status' : 'Error', - 'message' : 'internal error; schema violation: {}'.format(e), - 'metadata': {}} + + def read_metadata_file(**kwargs): + md_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']), + 'rslt.yml') + with open(md_fname, 'r') as f: + scanned_md = yaml.safe_load(f) + return scanned_md + + def get_dataset_lz_path(**kwargs): + return kwargs['dag_run'].conf['lz_path'] + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=['run_md_extract', 'md_consistency_tests'], + cwl_workflows=[], + dataset_uuid_fun=get_dataset_uuid, + dataset_lz_path_fun=get_dataset_lz_path, + metadata_fun=read_metadata_file + ) + + def wrapped_send_status_msg(**kwargs): + if send_status_msg(**kwargs): + scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read kwargs['ti'].xcom_push(key='collectiontype', value=(scanned_md['collectiontype'] if 'collectiontype' in scanned_md @@ -107,34 +86,7 @@ def send_status_msg(**kwargs): assay_type = None kwargs['ti'].xcom_push(key='assay_type', value=assay_type) else: - for op in retcode_ops: - if retcode_dct[op]: - if op == 'run_md_extract': - msg_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']), - 'error.log') - if not os.path.exists(msg_fname): - msg_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']), - 'session.log') - with open(msg_fname, 'r') as f: - err_txt = '\n'.join(f.readlines()) - else: - err_txt = kwargs['ti'].xcom_pull(task_ids=op, key='err_msg') - break - else: - err_txt = 'Unknown error' - data = {'dataset_id' : ctx['submission_id'], - 'status' : 'Invalid', - 'message' : err_txt} kwargs['ti'].xcom_push(key='collectiontype', value=None) - print('data: ') - pprint(data) - - response = http.run(endpoint, - json.dumps(data), - headers, - extra_options) - print('response: ') - pprint(response.json()) t_run_md_extract = BashOperator( task_id='run_md_extract', @@ -164,9 +116,9 @@ def send_status_msg(**kwargs): t_send_status = PythonOperator( task_id='send_status_msg', - python_callable=send_status_msg, + python_callable=wrapped_send_status_msg, provide_context=True - ) + ) t_create_tmpdir = BashOperator( task_id='create_temp_dir', @@ -176,7 +128,7 @@ def send_status_msg(**kwargs): t_cleanup_tmpdir = BashOperator( task_id='cleanup_temp_dir', - bash_command='echo rm -r {{tmp_dir_path(run_id)}}', + bash_command='rm -r {{tmp_dir_path(run_id)}}', trigger_rule='all_success' ) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 1a9bde5c..95bfc5d6 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -35,6 +35,9 @@ 'schemata') SCHEMA_BASE_URI = 'http://schemata.hubmapconsortium.org/' +# one of 'INGEST_LEGACY_API' or 'INGEST_REFACTOR_API' +INGEST_API_MODE = 'INGEST_REFACTOR_API' + # Some constants PIPELINE_BASE_DIR = Path(__file__).resolve().parent / 'cwl' @@ -484,6 +487,18 @@ def pythonop_maybe_keep(**kwargs) -> str: return bail_op +def _get_auth_tok(**kwargs) -> str: + """ + Recover the authorization token from the environment, and + decrpyt it. + """ + crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs + else kwargs['dag_run'].conf['crypt_auth_tok']) + auth_tok = ''.join(e for e in decrypt_tok(crypt_auth_tok.encode()) + if e.isalnum()) # strip out non-alnum characters + return auth_tok + + def pythonop_send_create_dataset(**kwargs) -> str: """ Requests creation of a new dataset. Returns dataset info via XCOM @@ -516,18 +531,13 @@ def pythonop_send_create_dataset(**kwargs) -> str: ctx = kwargs['dag_run'].conf method = 'POST' - crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs - else kwargs['dag_run'].conf['crypt_auth_tok']) - auth_tok = ''.join(e for e in decrypt_tok(crypt_auth_tok.encode()) - if e.isalnum()) # strip out non-alnum characters headers = { - 'authorization' : 'Bearer ' + auth_tok, + 'authorization' : 'Bearer ' + _get_auth_tok(**kwargs), 'content-type' : 'application/json'} #print('headers:') #pprint(headers) # Reduce exposure of auth_tok extra_options = [] - http = HttpHook(method, - http_conn_id=http_conn_id) + http_hook = HttpHook(method, http_conn_id=http_conn_id) if 'dataset_types' in kwargs: dataset_types = kwargs['dataset_types'] else: @@ -540,10 +550,10 @@ def pythonop_send_create_dataset(**kwargs) -> str: } print('data:') pprint(data) - response = http.run(endpoint, - json.dumps(data), - headers, - extra_options) + response = http_hook.run(endpoint, + json.dumps(data), + headers, + extra_options) print('response: ') pprint(response.json()) data_dir_path = response.json()['full_path'] @@ -574,17 +584,15 @@ def pythonop_set_dataset_state(**kwargs) -> None: ds_state = kwargs['ds_state'] if 'ds_state' in kwargs else 'Processing' message = kwargs['message'] if 'message' in kwargs else 'update state' method = 'PUT' - crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs - else kwargs['dag_run'].conf['crypt_auth_tok']) headers = { - 'authorization' : 'Bearer ' + decrypt_tok(crypt_auth_tok.encode()), + 'authorization' : 'Bearer ' + _get_auth_tok(**kwargs), 'content-type' : 'application/json'} # print('headers:') # pprint(headers) # reduce visibility of auth_tok extra_options = [] - http = HttpHook(method, - http_conn_id=http_conn_id) + http_hook = HttpHook(method, + http_conn_id=http_conn_id) data = {'dataset_id' : dataset_uuid, 'status' : ds_state, @@ -593,9 +601,9 @@ def pythonop_set_dataset_state(**kwargs) -> None: print('data: ') pprint(data) - response = http.run(endpoint, - json.dumps(data), - headers, + response = http_hook.run(endpoint, + json.dumps(data), + headers, extra_options) print('response: ') pprint(response.json()) @@ -612,26 +620,31 @@ def pythonop_get_dataset_state(**kwargs) -> JSONType: """ for arg in ['dataset_uuid_callable', 'http_conn_id']: assert arg in kwargs, "missing required argument {}".format(arg) - dataset_uuid = kwargs['dataset_uuid_callable'](**kwargs) + uuid = kwargs['dataset_uuid_callable'](**kwargs) http_conn_id = kwargs['http_conn_id'] - endpoint = f'datasets/{dataset_uuid}' method = 'GET' - crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs - else kwargs['dag_run'].conf['crypt_auth_tok']) - auth_tok = ''.join(e for e in decrypt_tok(crypt_auth_tok.encode()) - if e.isalnum()) # strip out non-alnum characters + auth_tok = _get_auth_tok(**kwargs) headers = { 'authorization' : f'Bearer {auth_tok}', - 'content-type' : 'application/json'} + 'content-type' : 'application/json' + } + http_hook = HttpHook(method, http_conn_id=http_conn_id) - try: - http = HttpHook(method, - http_conn_id=http_conn_id) + if INGEST_API_MODE == 'INGEST_LEGACY_API': + endpoint = f'datasets/{uuid}' + elif INGEST_API_MODE == 'INGEST_REFACTOR_API': + endpoint = f'entities/{uuid}' + else: + raise RuntimeError(f'Unknown INGEST_API_MODE {INGEST_API_MODE}') - response = http.run(endpoint, - headers=headers, - extra_options={'check_response': False}) + try: + response = http_hook.run(endpoint, + headers=headers, + extra_options={'check_response': False}) response.raise_for_status() + query_rslt = response.json() + print('query rslt:') + pprint(query_rslt) except HTTPError as e: print(f'ERROR: {e}') if e.response.status_code == codes.unauthorized: @@ -639,27 +652,63 @@ def pythonop_get_dataset_state(**kwargs) -> JSONType: else: print('benign error') return {} - return response.json() + + if INGEST_API_MODE == 'INGEST_LEGACY_API': + assert 'dataset' in query_rslt, f"Status for {uuid} has no dataset entry" + ds_rslt = rslt['dataset'] + key = 'local_directory_full_path' + assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" + full_path = ds_rslt[key] + elif INGEST_API_MODE == 'INGEST_REFACTOR_API': + ds_rslt = query_rslt + endpoint = f'datasets/{uuid}/file-system-abs-path' + try: + response = http_hook.run(endpoint, + headers=headers, + extra_options={'check_response': False}) + response.raise_for_status() + path_query_rslt = response.json() + print('path_query rslt:') + pprint(path_query_rslt) + except HTTPError as e: + print(f'ERROR: {e}') + if e.response.status_code == codes.unauthorized: + raise RuntimeError('entity database authorization was rejected?') + else: + print('benign error') + return {} + assert 'path' in path_query_rslt, f"Dataset path for {uuid} produced no path" + full_path = path_query_rslt['path'] + else: + raise RuntimeError(f'Unknown INGEST_API_MODE {INGEST_API_MODE}') + + for key in ['status', 'uuid', 'data_types']: + assert key in ds_rslt, f"Dataset status for {uuid} has no {key}" + rslt = { + 'status': ds_rslt['status'], + 'uuid': ds_rslt['uuid'], + 'data_types': ds_rslt['data_types'], + 'local_directory_full_path': full_path + } + return rslt def _uuid_lookup(uuid, **kwargs): http_conn_id = 'uuid_api_connection' endpoint = 'hmuuid/{}'.format(uuid) method = 'GET' - crypt_auth_tok = (kwargs['crypt_auth_tok'] if 'crypt_auth_tok' in kwargs - else kwargs['dag_run'].conf['crypt_auth_tok']) - headers = {'authorization' : 'Bearer ' + decrypt_tok(crypt_auth_tok.encode())} + headers = {'authorization' : 'Bearer ' + _get_auth_tok(**kwargs)} # print('headers:') # pprint(headers) extra_options = [] - http = HttpHook(method, - http_conn_id=http_conn_id) + http_hook = HttpHook(method, + http_conn_id=http_conn_id) - response = http.run(endpoint, - None, - headers, - extra_options) + response = http_hook.run(endpoint, + None, + headers, + extra_options) # print('response: ') # pprint(response.json()) return response.json() @@ -780,40 +829,69 @@ def make_send_status_msg_function( cwl_workflows: List[Path], http_conn_id: str = 'ingest_api_connection', uuid_src_task_id: str = 'send_create_dataset', -): + dataset_uuid_fun: Optional[Callable[..., str]] = None, + dataset_lz_path_fun: Optional[Callable[..., str]] = None, + metadata_fun: Optional[Callable[..., dict]] = None +) -> Callable[..., None]: """ + The function which is generated by this function will return a boolean, + True if the message which was ultimately sent was for a success and + False otherwise. This return value is not necessary in most circumstances + but is useful when the generated function is being wrapped. + + The user can specify dataset_uuid_fun and dataset_lz_path_fun, or leave + both to their empty default values and specify 'uuid_src_task_id'. + `dag_file` should always be `__file__` wherever this function is used, to include the DAG file in the provenance. This could be "automated" with something like `sys._getframe(1).f_code.co_filename`, but that doesn't seem worth it at the moment 'http_conn_id' is the Airflow connection id associated with the /datasets/status service. - 'uuid_src_task_id' is the Airflow task_id of a task providing the uuid via the XCOM - key 'derived_dataset_uuid' and the dataset data directory via - the None key + + 'dataset_uuid_fun' is a function which returns the uuid of the dataset to be + updated, or None. If given, it will be called with **kwargs arguments. + + 'dataset_lz_path_fun' is a function which returns the full path of the dataset + data directory, or None. If given, it will be called with **kwargs arguments. + + 'uuid_src_task_id' is the Airflow task_id of a task providing the uuid via + the XCOM key 'derived_dataset_uuid' and the dataset data directory + via the None key. This is used only if dataset_uuid is None or dataset_lz_path + is None. + + 'metadata_fun' is a function which returns additional metadata in JSON form, + or None. If given, it will be called with **kwargs arguments. This function + will only be evaluated if retcode_ops have all returned 0. """ - def send_status_msg(**kwargs): + def send_status_msg(**kwargs) -> bool: retcodes = [ int(kwargs['ti'].xcom_pull(task_ids=op)) for op in retcode_ops ] print('retcodes: ', {k: v for k, v in zip(retcode_ops, retcodes)}) success = all(rc == 0 for rc in retcodes) - derived_dataset_uuid = kwargs['ti'].xcom_pull( - key='derived_dataset_uuid', - task_ids=uuid_src_task_id, - ) - ds_dir = kwargs['ti'].xcom_pull(task_ids=uuid_src_task_id) + if dataset_uuid_fun is None: + dataset_uuid = kwargs['ti'].xcom_pull( + key='derived_dataset_uuid', + task_ids=uuid_src_task_id, + ) + else: + dataset_uuid = dataset_uuid_fun(**kwargs) + if dataset_lz_path_fun is None: + ds_dir = kwargs['ti'].xcom_pull(task_ids=uuid_src_task_id) + else: + ds_dir = dataset_lz_path_fun(**kwargs) endpoint = '/datasets/status' method = 'PUT' - crypt_auth_tok = kwargs['dag_run'].conf['crypt_auth_tok'] headers = { - 'authorization': 'Bearer ' + decrypt_tok(crypt_auth_tok.encode()), + 'authorization': 'Bearer ' + _get_auth_tok(**kwargs), 'content-type': 'application/json', } extra_options = [] + return_status = True # mark false on failure - http = HttpHook(method, http_conn_id=http_conn_id) + http_hook = HttpHook(method, http_conn_id=http_conn_id) if success: md = {} @@ -830,6 +908,9 @@ def send_status_msg(**kwargs): dag_prv.extend(get_git_provenance_list(files_for_provenance)) md['dag_provenance_list'] = dag_prv + if metadata_fun: + md['metadata'] = metadata_fun(**kwargs) + manifest_files = find_pipeline_manifests(cwl_workflows) md.update( get_file_metadata_dict( @@ -841,7 +922,7 @@ def send_status_msg(**kwargs): try: assert_json_matches_schema(md, 'dataset_metadata_schema.yml') data = { - 'dataset_id': derived_dataset_uuid, + 'dataset_id': dataset_uuid, 'status': 'QA', 'message': 'the process ran', 'metadata': md, @@ -850,24 +931,26 @@ def send_status_msg(**kwargs): print('invalid metadata follows:') pprint(md) data = { - 'dataset_id': derived_dataset_uuid, + 'dataset_id': dataset_uuid, 'status': 'Error', 'message': 'internal error; schema violation: {}'.format(e), 'metadata': {}, } + return_status = False else: log_fname = Path(get_tmp_dir_path(kwargs['run_id']), 'session.log') with open(log_fname, 'r') as f: err_txt = '\n'.join(f.readlines()) data = { - 'dataset_id': derived_dataset_uuid, + 'dataset_id': dataset_uuid, 'status': 'Invalid', 'message': err_txt, } + return_status = False print('data: ') pprint(data) - response = http.run( + response = http_hook.run( endpoint, json.dumps(data), headers, @@ -876,6 +959,8 @@ def send_status_msg(**kwargs): print('response: ') pprint(response.json()) + return return_status + return send_status_msg @@ -904,7 +989,7 @@ def set_dataset_state_error(contextDict: Mapping, **kwargs) -> None: new_kwargs = kwargs.copy() new_kwargs.update(contextDict) new_kwargs.update({'dataset_uuid_callable' : dataset_uuid_callable, - 'http_conn_id' : 'ingest_api_connection', + 'http _conn_id' : 'ingest_api_connection', 'endpoint' : '/datasets/status', 'ds_state' : 'Error', 'message' : msg diff --git a/src/ingest-pipeline/airflow/dags/validation_test.py b/src/ingest-pipeline/airflow/dags/validation_test.py index 21d3cba6..e26883c7 100644 --- a/src/ingest-pipeline/airflow/dags/validation_test.py +++ b/src/ingest-pipeline/airflow/dags/validation_test.py @@ -57,17 +57,15 @@ def find_uuid(**kwargs): def my_callable(**kwargs): return uuid - rslt = utils.pythonop_get_dataset_state( + ds_rslt = utils.pythonop_get_dataset_state( dataset_uuid_callable=my_callable, http_conn_id='ingest_api_connection', **kwargs ) - if not rslt: + if not ds_rslt: raise AirflowException(f'Invalid uuid/doi for group: {uuid}') - print('rslt:') - pprint(rslt) - assert 'dataset' in rslt, f"Status for {uuid} has no dataset entry" - ds_rslt = rslt['dataset'] + print('ds_rslt:') + pprint(ds_rslt) for key in ['status', 'uuid', 'data_types', 'local_directory_full_path']: diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index 3cbe5cc6..3b09203d 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -26,22 +26,22 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'AF' 'workflow': 'ometiff_pyramid' -# - 'collection_type': 'single_metadatatsv' +# - 'collection_type': 'generic_metadatatsv' # 'assay_type': 'SNARE-seq2' # 'workflow': 'sc_atac_seq_snare' - 'collection_type': 'snare_atac_collection' 'assay_type': 'SNAREseq' 'workflow': 'sc_atac_seq_snare' - - 'collection_type': 'single_metadatatsv' + - 'collection_type': 'generic_metadatatsv' 'assay_type': 'sciATACseq' 'workflow': 'sc_atac_seq_sci' - - 'collection_type': 'single_metadatatsv' + - 'collection_type': 'generic_metadatatsv' 'assay_type': 'bulk RNA' 'workflow': 'salmon_rnaseq_bulk' - - 'collection_type': 'single_metadatatsv' + - 'collection_type': 'generic_metadatatsv' 'assay_type': 'sciRNAseq' 'workflow': 'salmon_rnaseq_sciseq' -# - 'collection_type': 'single_metadatatsv' +# - 'collection_type': 'generic_metadatatsv' # 'assay_type': 'SNARE2-RNAseq' # 'workflow': 'salmon_rnaseq_snareseq' - 'collection_type': 'snare_rna_collection' @@ -53,12 +53,12 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'seqFISH' 'workflow': 'ometiff_pyramid' - - 'collection_type': 'single_metadatatsv' + - 'collection_type': 'generic_metadatatsv' 'assay_type': 'snATACseq' 'workflow': 'sc_atac_seq_sn' - - 'collection_type': 'single_metadatatsv' + - 'collection_type': 'generic_metadatatsv' 'assay_type': 'snRNAseq' - 'workflow': 'salmon_sn_rnaseq_10x' + 'workflow': 'salmon_rnaseq_10x' - 'collection_type': 'bulkatacseq_collection' 'assay_type': 'ATACseq-bulk' 'workflow': 'bulk_atacseq' diff --git a/src/ingest-pipeline/md/data_collection_types/generic_metadatatsv_data_collection.py b/src/ingest-pipeline/md/data_collection_types/generic_metadatatsv_data_collection.py index 37355b1b..d9541718 100755 --- a/src/ingest-pipeline/md/data_collection_types/generic_metadatatsv_data_collection.py +++ b/src/ingest-pipeline/md/data_collection_types/generic_metadatatsv_data_collection.py @@ -21,7 +21,7 @@ class GenericMetadataTSVDataCollection(DataCollection): dir_regex = None # expected_file pairs are (globable name, filetype key) - expected_files = [('*-metadata.tsv', 'METADATATSV')] + expected_files = [('*metadata.tsv', 'METADATATSV')] optional_files = [] diff --git a/src/ingest-pipeline/misc/tools/new_dataset_survey.py b/src/ingest-pipeline/misc/tools/new_dataset_survey.py new file mode 100755 index 00000000..85099d20 --- /dev/null +++ b/src/ingest-pipeline/misc/tools/new_dataset_survey.py @@ -0,0 +1,82 @@ +#! /usr/bin/env python + +import sys +import argparse +import requests +import json +from pprint import pprint +import pandas as pd + +from survey import Entity, Dataset, Sample, EntityFactory, is_uuid + + +def detect_metadatatsv(ds): + """ + Returns (True, nrecs) if there is a useable metadata.tsv file in the dataset + top level directory, or (False, 0) otherwise + """ + for path in ds.full_path.glob('*metadata.tsv'): + md_df = pd.read_csv(path, sep='\t') + print(len(md_df), 'assay_type' in md_df.columns) + if 'assay_type' in md_df.columns: + return (True, len(md_df)) + return (False, 0) + + +def main(): + """ + main + """ + parser = argparse.ArgumentParser() + parser.add_argument("uuid_txt", help="input files containing uuids") + parser.add_argument("--out", help="name of the output .tsv file", required=True) + args = parser.parse_args() + auth_tok = input('auth_tok: ') + entity_factory = EntityFactory(auth_tok) + + uuid_l = [] + with open(args.uuid_txt) as f: + for line in f: + uuid = None + if is_uuid(line.strip()): + uuid = line.strip() + else: + words = line.strip().split() + for word in words: + a, b = word.split(':') + if a.lower() == 'uuid': + uuid = b + break + if uuid: + uuid_l.append(uuid) + print(f'{uuid}') + else: + print(f'cannot find uuid in {line.strip()}') + + out_recs = [] + + known_uuids = set() + for uuid in uuid_l: + ds = entity_factory.get(uuid) + ds.describe() + new_uuids = ds.all_uuids() + rec = ds.build_rec() + rec['has_metadata'], rec['n_md_recs'] = detect_metadatatsv(ds) + if any([uuid in known_uuids for uuid in new_uuids]): + rec['note'] = 'UUID COLLISION! ' + known_uuids = known_uuids.union(new_uuids) + out_recs.append(rec) + out_df = pd.DataFrame(out_recs).rename(columns={'sample_display_doi':'sample_doi', + 'sample_hubmap_display_id':'sample_display_id', + 'qa_child_uuid':'derived_uuid', + 'qa_child_display_doi':'derived_doi', + 'qa_child_data_type':'derived_data_type', + 'qa_child_status':'derived_status'}) + out_df.to_csv(args.out, sep='\t', index=False, + columns=['uuid', 'group_name', 'display_doi', 'status', 'data_types', + 'has_metadata', 'n_md_recs']) + + +if __name__ == '__main__': + main() + diff --git a/src/ingest-pipeline/misc/tools/survey.py b/src/ingest-pipeline/misc/tools/survey.py index a9ca3f74..85b8599e 100755 --- a/src/ingest-pipeline/misc/tools/survey.py +++ b/src/ingest-pipeline/misc/tools/survey.py @@ -4,6 +4,7 @@ import argparse import requests import json +from pathlib import Path from pprint import pprint import pandas as pd @@ -125,9 +126,11 @@ def __init__(self, prop_dct, entity_factory): self.kid_uuids = [elt['uuid'] for elt in prop_dct['immediate_descendants']] self.kid_dataset_uuids = [elt['uuid'] for elt in prop_dct['immediate_descendants'] if elt['entity_type'] == 'Dataset'] - self.data_types = prop_dct['data_types'] + self.data_types = prop_dct['data_types'] if 'data_types' in prop_dct else [] self.donor_uuid = prop_dct['donor']['uuid'] - + self.group_name = prop_dct['group_name'] + self.contains_human_genetic_sequences = (prop_dct['contains_human_genetic_sequences'].lower() in + ['yes', 'true']) self._kid_dct = None self._parent_dct = None @@ -143,6 +146,14 @@ def parents(self): self._parent_dct = {uuid: self.entity_factory.get(uuid) for uuid in self.parent_uuids} return self._parent_dct + @property + def full_path(self): + assert self.status == 'New', f'full_path is not yet implemented for {self.status} files' + if self.contains_human_genetic_sequences: + return Path('/hive/hubmap/data/protected') / self.group_name / self.uuid + else: + return Path('/hive/hubmap/data/consortium') / self.group_name / self.uuid + def describe(self, prefix='', file=sys.stdout): print(f"{prefix}Dataset {self.uuid}: " f"{self.display_doi} " @@ -168,10 +179,12 @@ def build_rec(self): QA_child.status (which must be QA or Published) note """ - rec = {'uuid': self.uuid, 'display_doi': self.display_doi, 'status': self.status} - assert self.data_types, f"No data_types found?" - if len(self.data_types) == 1: - rec['data_type'] = self.data_types[0] + rec = {'uuid': self.uuid, 'display_doi': self.display_doi, 'status': self.status, + 'group_name': self.group_name} + if not self.data_types: + rec['data_types'] = "[]" + elif len(self.data_types) == 1: + rec['data_types'] = self.data_types[0] else: rec['data_types'] = f"[{','.join(self.data_types)}]" other_parent_uuids = [uuid for uuid in self.parent_uuids if uuid not in self.parent_dataset_uuids] diff --git a/src/ingest-pipeline/requirements.txt b/src/ingest-pipeline/requirements.txt index 05f8da9b..04af7d54 100644 --- a/src/ingest-pipeline/requirements.txt +++ b/src/ingest-pipeline/requirements.txt @@ -6,7 +6,7 @@ tifffile xmltodict>=0.12.0 pyimzml>=1.2.6 apache-airflow[celery,crypto,postgres,redis,ssh]<2.0.0 -airflow-multi-dagrun>=1.2 +airflow-multi-dagrun>=1.2,<2.0 jsonschema==3.2.0 fastjsonschema==2.14.2 requests>=1.2 @@ -15,7 +15,6 @@ PyYAML>=5.3.1 rdflib==4.2.2 rdflib-jsonld==0.4.0 Flask-OAuthlib==0.9.6 -globus-sdk==1.10.0 git+git://github.com/hubmapconsortium/cwltool.git@docker-gpu#egg=cwltool # We need the dependencies of ingest-validation tools, but relative paths don't work # -r ${CWD}/submodules/ingest-validation-tools/requirements.txt @@ -25,3 +24,4 @@ tableschema==1.15.0 goodtables==2.4.9 globus-cli==1.12.0 yattag==1.14.0 +frictionless==4.0.0 diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index 9ceef0e4..9685fd46 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit 9ceef0e4f914806bdd3e163d278151746bbd8d93 +Subproject commit 9685fd46bdcb0802661ebc69a9abe25f4bbaffef