Skip to content

Commit

Permalink
Merge branch 'devel' into master.
Browse files Browse the repository at this point in the history
  • Loading branch information
jswelling committed Mar 8, 2021
2 parents c0f696a + 78f3e66 commit 459b675
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 171 deletions.
14 changes: 6 additions & 8 deletions src/ingest-pipeline/airflow/dags/launch_multi_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ def check_uuids(**kwargs):
for uuid in uuid_l:
print(f'Starting uuid {uuid}')
my_callable = lambda **kwargs: uuid
rslt=utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable,
http_conn_id='ingest_api_connection',
**kwargs)
if not rslt:
ds_rslt=utils.pythonop_get_dataset_state(dataset_uuid_callable=my_callable,
http_conn_id='ingest_api_connection',
**kwargs)
if not ds_rslt:
raise AirflowException(f'Invalid uuid/doi for group: {uuid}')
print('rslt:')
pprint(rslt)
assert 'dataset' in rslt, f"Status for {uuid} has no dataset entry"
ds_rslt = rslt['dataset']
print('ds_rslt:')
pprint(ds_rslt)

for key in ['status', 'uuid', 'data_types', 'local_directory_full_path']:
assert key in ds_rslt, f"Dataset status for {uuid} has no {key}"
Expand Down
108 changes: 30 additions & 78 deletions src/ingest-pipeline/airflow/dags/scan_and_begin_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from hubmap_operators.flex_multi_dag_run import FlexMultiDagRunOperator
import utils

from utils import localized_assert_json_matches_schema as assert_json_matches_schema
from utils import (
localized_assert_json_matches_schema as assert_json_matches_schema,
make_send_status_msg_function
)


def get_dataset_uuid(**kwargs):
Expand Down Expand Up @@ -48,53 +51,29 @@ def get_dataset_uuid(**kwargs):
default_args=default_args,
user_defined_macros={'tmp_dir_path' : utils.get_tmp_dir_path}
) as dag:

def send_status_msg(**kwargs):
ctx = kwargs['dag_run'].conf
retcode_ops = ['run_md_extract', 'md_consistency_tests']
print('raw: ', [kwargs['ti'].xcom_pull(task_ids=op) for op in retcode_ops])
retcodes = [int(kwargs['ti'].xcom_pull(task_ids=op))
for op in retcode_ops]
retcode_dct = {k:v for k, v in zip(retcode_ops, retcodes)}
print('retcodes: ', retcode_dct)
success = all([rc == 0 for rc in retcodes])
ds_dir = ctx['lz_path']
http_conn_id='ingest_api_connection'
endpoint='/datasets/status'
method='PUT'
headers={'authorization' : 'Bearer ' + utils.decrypt_tok(ctx['crypt_auth_tok'].encode()),
'content-type' : 'application/json'}
print('headers:')
pprint(headers)
extra_options=[]

http = HttpHook(method,
http_conn_id=http_conn_id)

if success:
md_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']),
'rslt.yml')
with open(md_fname, 'r') as f:
scanned_md = yaml.safe_load(f)
dag_prv = utils.get_git_provenance_list([__file__])
md = {'dag_provenance_list' : dag_prv,
'metadata' : scanned_md}
# Inclusion of files information in this message is getting disabled due to size
#md.update(utils.get_file_metadata_dict(ds_dir,
# utils.get_tmp_dir_path(kwargs['run_id'])))
try:
assert_json_matches_schema(md, 'dataset_metadata_schema.yml')
data = {'dataset_id' : ctx['submission_id'],
'status' : 'QA',
'message' : 'the process ran',
'metadata': md}
except AssertionError as e:
print('invalid metadata follows:')
pprint(md)
data = {'dataset_id' : ctx['submission_id'],
'status' : 'Error',
'message' : 'internal error; schema violation: {}'.format(e),
'metadata': {}}

def read_metadata_file(**kwargs):
md_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']),
'rslt.yml')
with open(md_fname, 'r') as f:
scanned_md = yaml.safe_load(f)
return scanned_md

def get_dataset_lz_path(**kwargs):
return kwargs['dag_run'].conf['lz_path']

send_status_msg = make_send_status_msg_function(
dag_file=__file__,
retcode_ops=['run_md_extract', 'md_consistency_tests'],
cwl_workflows=[],
dataset_uuid_fun=get_dataset_uuid,
dataset_lz_path_fun=get_dataset_lz_path,
metadata_fun=read_metadata_file
)

def wrapped_send_status_msg(**kwargs):
if send_status_msg(**kwargs):
scanned_md = read_metadata_file(**kwargs) # Yes, it's getting re-read
kwargs['ti'].xcom_push(key='collectiontype',
value=(scanned_md['collectiontype']
if 'collectiontype' in scanned_md
Expand All @@ -107,34 +86,7 @@ def send_status_msg(**kwargs):
assay_type = None
kwargs['ti'].xcom_push(key='assay_type', value=assay_type)
else:
for op in retcode_ops:
if retcode_dct[op]:
if op == 'run_md_extract':
msg_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']),
'error.log')
if not os.path.exists(msg_fname):
msg_fname = os.path.join(utils.get_tmp_dir_path(kwargs['run_id']),
'session.log')
with open(msg_fname, 'r') as f:
err_txt = '\n'.join(f.readlines())
else:
err_txt = kwargs['ti'].xcom_pull(task_ids=op, key='err_msg')
break
else:
err_txt = 'Unknown error'
data = {'dataset_id' : ctx['submission_id'],
'status' : 'Invalid',
'message' : err_txt}
kwargs['ti'].xcom_push(key='collectiontype', value=None)
print('data: ')
pprint(data)

response = http.run(endpoint,
json.dumps(data),
headers,
extra_options)
print('response: ')
pprint(response.json())

t_run_md_extract = BashOperator(
task_id='run_md_extract',
Expand Down Expand Up @@ -164,9 +116,9 @@ def send_status_msg(**kwargs):

t_send_status = PythonOperator(
task_id='send_status_msg',
python_callable=send_status_msg,
python_callable=wrapped_send_status_msg,
provide_context=True
)
)

t_create_tmpdir = BashOperator(
task_id='create_temp_dir',
Expand All @@ -176,7 +128,7 @@ def send_status_msg(**kwargs):

t_cleanup_tmpdir = BashOperator(
task_id='cleanup_temp_dir',
bash_command='echo rm -r {{tmp_dir_path(run_id)}}',
bash_command='rm -r {{tmp_dir_path(run_id)}}',
trigger_rule='all_success'
)

Expand Down
Loading

0 comments on commit 459b675

Please sign in to comment.