From d07aeb7b6bbda07a70d6a8bd6380226f5692a655 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 6 May 2024 15:35:45 -0400 Subject: [PATCH 1/9] General: First attempt at a bulk update DAG for data curators. --- .../airflow/dags/bulk_update_entities.py | 98 ++++++++++ .../airflow/dags/devtest_step2.py | 184 ------------------ .../airflow/dags/reset_submission_to_new.py | 79 -------- .../schemata/bulk_update_entities_schema.yml | 35 ++++ 4 files changed, 133 insertions(+), 263 deletions(-) create mode 100644 src/ingest-pipeline/airflow/dags/bulk_update_entities.py delete mode 100644 src/ingest-pipeline/airflow/dags/devtest_step2.py delete mode 100644 src/ingest-pipeline/airflow/dags/reset_submission_to_new.py create mode 100644 src/ingest-pipeline/schemata/bulk_update_entities_schema.yml diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py new file mode 100644 index 00000000..1c3de6bc --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -0,0 +1,98 @@ +from pprint import pprint +import time + +from airflow.operators.python import PythonOperator +from airflow.configuration import conf as airflow_conf +from datetime import datetime +from airflow import DAG +from airflow.hooks.http_hook import HttpHook + +from utils import ( + localized_assert_json_matches_schema as assert_json_matches_schema, + get_preserve_scratch_resource, + get_tmp_dir_path, + encrypt_tok, + pythonop_get_dataset_state, + get_auth_tok, +) + +default_args = { + "start_date": datetime(2019, 1, 1), +} + +with DAG( + "bulk_update_entities", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + }, +) as dag: + + def check_uuids(**kwargs): + print("dag_run conf follows:") + pprint(kwargs["dag_run"].conf) + + try: + assert_json_matches_schema(kwargs["dag_run"].conf, "launch_multi_metadata_schema.yml") + except AssertionError as e: + print("invalid DAG metadata follows:") + pprint(kwargs["dag_run"].conf) + raise + + uuids = kwargs["dag_run"].conf["uuids"] + filtered_uuids = [] + for uuid in uuids: + # If this fails out then we know its something other than an upload or dataset + try: + pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: uuid, **kwargs) + filtered_uuids.append(uuid) + except Exception as e: + print(f"{uuid} is neither a dataset nor an upload and will be skipped.") + print(repr(e)) + + kwargs["ti"].xcom_push(key="uuids", value=filtered_uuids) + + check_uuids_t = PythonOperator( + task_id="check_uuids", + python_callable=check_uuids, + provide_context=True, + op_kwargs={ + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + def update_uuids(**kwargs): + auth_tok = get_auth_tok(**kwargs) + headers = { + "content-type": "application/json", + "X-Hubmap-Application": "ingest-pipeline", + "Authorization": f"Bearer {auth_tok}", + } + + http_hook = HttpHook("PUT", http_conn_id="entity_api_connection") + uuids = kwargs["dag_run"].conf["uuids"] + + for uuid in uuids: + endpoint = f"entities/{uuid}" + response = http_hook.run(endpoint, headers=headers) + print("response: ") + pprint(response.json()) + time.sleep(10) + + update_uuids_t = PythonOperator( + task_id="update_uuids", + python_callable=update_uuids, + provide_context=True, + op_kwargs={ + "crypt_auth_tok": encrypt_tok( + airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"] + ).decode(), + }, + ) + + check_uuids_t >> update_uuids_t diff --git a/src/ingest-pipeline/airflow/dags/devtest_step2.py b/src/ingest-pipeline/airflow/dags/devtest_step2.py deleted file mode 100644 index d2d5b74e..00000000 --- a/src/ingest-pipeline/airflow/dags/devtest_step2.py +++ /dev/null @@ -1,184 +0,0 @@ - -from datetime import datetime, timedelta -from pprint import pprint - -from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator, BranchPythonOperator -from hubmap_operators.common_operators import ( - LogInfoOperator, - JoinOperator, - CreateTmpDirOperator, - CleanupTmpDirOperator, - SetDatasetProcessingOperator, -) - -import utils - -from utils import ( - get_dataset_uuid, - get_parent_dataset_uuids_list, - get_parent_data_dir, - build_dataset_name as inner_build_dataset_name, - get_previous_revision_uuid, - get_uuid_for_error, - join_quote_command_str, - make_send_status_msg_function, - get_tmp_dir_path, - HMDAG, - get_queue_resource, - get_preserve_scratch_resource, -) - -default_args = { - 'owner': 'hubmap', - 'depends_on_past': False, - 'start_date': datetime(2019, 1, 1), - 'email': ['joel.welling@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - 'xcom_push': True, - 'queue': get_queue_resource('devtest_step2'), - 'on_failure_callback': utils.create_dataset_state_error_callback(get_uuid_for_error), -} - -with HMDAG('devtest_step2', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('devtest_step2'), - }) as dag: - pipeline_name = 'devtest-step2-pipeline' - - def build_dataset_name(**kwargs): - return inner_build_dataset_name(dag.dag_id, pipeline_name, **kwargs) - - def build_cwltool_cmd1(**kwargs): - ctx = kwargs['dag_run'].conf - run_id = kwargs['run_id'] - tmpdir = get_tmp_dir_path(run_id) - tmp_subdir = tmpdir / 'cwl_out' - data_dir = get_parent_data_dir(**kwargs) - - try: - delay_sec = int(ctx['metadata']['delay_sec']) - except ValueError: - print('Could not parse delay_sec ' - '{} ; defaulting to 30 sec'.format(ctx['metadata']['delay_sec'])) - delay_sec = 30 - for fname in ctx['metadata']['files_to_copy']: - print(fname) - - commands = [ - [f'tmp_dir={tmpdir}'], - ['sleep', delay_sec], - ['cd', data_dir], - ['mkdir', '-p', tmp_subdir], - ] - - if ctx['metadata']['files_to_copy']: - commands.append(['cp', *ctx['metadata']['files_to_copy'], tmp_subdir]) - - print('command list:') - pprint(commands) - - command_strs = [join_quote_command_str(command) for command in commands] - command_str = ' ; '.join(command_strs) - print('overall command_str: ', command_str) - return command_str - - t_build_cmd1 = PythonOperator( - task_id='build_cmd1', - python_callable=build_cwltool_cmd1, - provide_context=True, - ) - - t_pipeline_exec = BashOperator( - task_id='pipeline_exec', - bash_command=""" \ - tmp_dir={{tmp_dir_path(run_id)}} ; \ - {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ - echo $? - """, - ) - - t_maybe_keep_cwl1 = BranchPythonOperator( - task_id='maybe_keep_cwl1', - python_callable=utils.pythonop_maybe_keep, - provide_context=True, - op_kwargs={ - 'next_op': 'move_data', - 'bail_op': 'set_dataset_error', - 'test_op': 'pipeline_exec', - }, - ) - - t_send_create_dataset = PythonOperator( - task_id='send_create_dataset', - python_callable=utils.pythonop_send_create_dataset, - provide_context=True, - op_kwargs={ - 'parent_dataset_uuid_callable': get_parent_dataset_uuids_list, - 'previous_revision_uuid_callable': get_previous_revision_uuid, - 'http_conn_id': 'ingest_api_connection', - 'dataset_name_callable': build_dataset_name, - 'dataset_types': ['devtest'], - }, - ) - - t_set_dataset_error = PythonOperator( - task_id='set_dataset_error', - python_callable=utils.pythonop_set_dataset_state, - provide_context=True, - trigger_rule='all_done', - op_kwargs={ - 'dataset_uuid_callable': get_dataset_uuid, - 'ds_state': 'Error', - 'message': f'An error occurred in {pipeline_name}', - }, - ) - - t_move_data = BashOperator( - task_id='move_data', - bash_command=""" - tmp_dir='{{tmp_dir_path(run_id)}}' ; \ - ds_dir='{{ti.xcom_pull(task_ids='send_create_dataset')}}' ; \ - groupname='{{conf.as_dict()['connections']['OUTPUT_GROUP_NAME']}}' ; \ - pushd '$ds_dir' ; \ - sudo chown airflow . ; \ - sudo chgrp $groupname . ; \ - popd ; \ - mv '$tmp_dir'/cwl_out/* '$ds_dir' >> '$tmp_dir/session.log' 2>&1 ; \ - echo $? - """, - ) - - send_status_msg = make_send_status_msg_function( - dag_file=__file__, - retcode_ops=['pipeline_exec', 'move_data'], - cwl_workflows=[], - ) - - t_send_status = PythonOperator( - task_id='send_status_msg', - python_callable=send_status_msg, - provide_context=True, - ) - - t_log_info = LogInfoOperator(task_id='log_info') - t_join = JoinOperator(task_id='join') - t_create_tmpdir = CreateTmpDirOperator(task_id='create_tmpdir') - t_cleanup_tmpdir = CleanupTmpDirOperator(task_id='cleanup_tmpdir') - t_set_dataset_processing = SetDatasetProcessingOperator(task_id='set_dataset_processing') - - ( - t_log_info >> t_create_tmpdir - >> t_send_create_dataset >> t_set_dataset_processing - >> t_build_cmd1 >> t_pipeline_exec >> t_maybe_keep_cwl1 - >> t_move_data >> t_send_status >> t_join - ) - t_maybe_keep_cwl1 >> t_set_dataset_error >> t_join - t_join >> t_cleanup_tmpdir diff --git a/src/ingest-pipeline/airflow/dags/reset_submission_to_new.py b/src/ingest-pipeline/airflow/dags/reset_submission_to_new.py deleted file mode 100644 index 9fd03ab7..00000000 --- a/src/ingest-pipeline/airflow/dags/reset_submission_to_new.py +++ /dev/null @@ -1,79 +0,0 @@ -from datetime import datetime, timedelta - -from airflow.operators.python import PythonOperator -from airflow.configuration import conf as airflow_conf - -import utils - -from utils import ( - HMDAG, - get_queue_resource, - get_preserve_scratch_resource, -) - -UUIDS_TO_RESET = [ - # '2c467ffa1d01c41effb7057d7d329c8f', - # '48c8dd2ad06aa23e36c095c9088a4913', - # '08ee9f5575339641eb9f8fb17cc1d1bd' - # "0eb5e457b4855ce28531bc97147196b6", # HCA scRNAseq-10x-v2 - # "33874bc3c95b6b41cbc387d2826d88eb", # San Diego scRNAseq-SNARE - # "4758c75194e26ef598ec611916042f51", # sample Upload on DEV - # "060dfa0fdf2b840864f62d2cd1a7a456", # GE CellDIVE - # "488f364142c308a9692e0b529f6697dd", # GUDMAP scRNAseq disguised as UCSD scRNAseq - '2c467ffa1d01c41effb7057d7d329c8f', - '48c8dd2ad06aa23e36c095c9088a4913', - '08ee9f5575339641eb9f8fb17cc1d1bd' - - ] - - -def get_uuid_for_error(**kwargs): - """ - Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise. - """ - return None - - -default_args = { - 'owner': 'hubmap', - 'depends_on_past': False, - 'start_date': datetime(2019, 1, 1), - 'email': ['joel.welling@gmail.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - 'xcom_push': True, - 'queue': get_queue_resource('reset_submission_to_new'), - 'on_failure_callback': utils.create_dataset_state_error_callback(get_uuid_for_error) -} - - -def uuid_fun(**kwargs): - return kwargs['uuid'] - - -with HMDAG('reset_submission_to_new', - schedule_interval=None, - is_paused_upon_creation=False, - default_args=default_args, - user_defined_macros={ - 'tmp_dir_path': utils.get_tmp_dir_path, - 'preserve_scratch': get_preserve_scratch_resource('reset_submission_to_new'), - }) as dag: - - for idx, uuid in enumerate(UUIDS_TO_RESET): - this_t = PythonOperator( - task_id=f'set_dataset_new_{idx}', - python_callable=utils.pythonop_set_dataset_state, - provide_context=True, - op_kwargs={'dataset_uuid_callable': uuid_fun, - 'ds_state': 'New', - 'message': 'Resetting state to NEW', - 'crypt_auth_tok': utils.encrypt_tok(airflow_conf.as_dict() - ['connections']['APP_CLIENT_SECRET']).decode(), - 'uuid': uuid - } - ) - - this_t diff --git a/src/ingest-pipeline/schemata/bulk_update_entities_schema.yml b/src/ingest-pipeline/schemata/bulk_update_entities_schema.yml new file mode 100644 index 00000000..a68145b7 --- /dev/null +++ b/src/ingest-pipeline/schemata/bulk_update_entities_schema.yml @@ -0,0 +1,35 @@ +'$schema': 'http://json-schema.org/schema#' +'$id': 'http://schemata.hubmapconsortium.org/bulk_update_entities_schema.yml' +'title': 'bulk_update_entities metadata schema' +'description': 'bulk_update_entities metadata schema' + +'allOf': [{'$ref': '#/definitions/bulk_update_entities'}] + +'definitions': + + 'bulk_update_entities': + 'type': 'object' + 'properties': + 'uuids': + 'type': 'array' + 'items': + 'type': 'string' + 'description': 'a dataset/upload uuid or DOI' + 'minItems': 1 + 'metadata': + 'type': 'object' + 'properties': + 'description': + 'type': 'string' + 'description': 'Entity description' + 'status': + 'type': 'string' + 'description': 'Entity status' + 'assigned_to_group_name': + 'type': 'string' + 'description': 'Group name for assignee' + 'ingest_task': + 'type': 'string' + 'description': 'Task description for assignee' + 'required': ['uuids', 'metadata'] + From 62ffe40dab8f107270c647cd52e6d6000f4872e0 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 12:58:57 -0400 Subject: [PATCH 2/9] General: Use HMDAG --- .../airflow/dags/bulk_update_entities.py | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 1c3de6bc..1590805e 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -1,33 +1,55 @@ from pprint import pprint import time +from datetime import timedelta + from airflow.operators.python import PythonOperator from airflow.configuration import conf as airflow_conf from datetime import datetime -from airflow import DAG from airflow.hooks.http_hook import HttpHook from utils import ( localized_assert_json_matches_schema as assert_json_matches_schema, get_preserve_scratch_resource, get_tmp_dir_path, + HMDAG, encrypt_tok, pythonop_get_dataset_state, get_auth_tok, + get_queue_resource, + create_dataset_state_error_callback, ) + +def get_uuid_for_error(**kwargs) -> str: + """ + Return the uuid for the derived dataset if it exists, and of the parent dataset otherwise. + """ + return "" + + default_args = { + "owner": "hubmap", + "depends_on_past": False, "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("bulk_update_entities"), + "on_failure_callback": create_dataset_state_error_callback(get_uuid_for_error), } -with DAG( - "bulk_update_entities", +with HMDAG( + "bulk_process", schedule_interval=None, is_paused_upon_creation=False, default_args=default_args, user_defined_macros={ "tmp_dir_path": get_tmp_dir_path, - "preserve_scratch": get_preserve_scratch_resource("rebuild_metadata"), + "preserve_scratch": get_preserve_scratch_resource("launch_multi_analysis"), }, ) as dag: From 657ad32744842cc12d4e58a20f6eba0c78c07404 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:01:29 -0400 Subject: [PATCH 3/9] General: Fix parameter names --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 1590805e..b6bb342f 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -43,13 +43,13 @@ def get_uuid_for_error(**kwargs) -> str: } with HMDAG( - "bulk_process", + "bulk_update_entities", schedule_interval=None, is_paused_upon_creation=False, default_args=default_args, user_defined_macros={ "tmp_dir_path": get_tmp_dir_path, - "preserve_scratch": get_preserve_scratch_resource("launch_multi_analysis"), + "preserve_scratch": get_preserve_scratch_resource("bulk_update_entities"), }, ) as dag: From 2109fc271a3a4bd659b109f7ea9a692d3af2aba3 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:03:35 -0400 Subject: [PATCH 4/9] General: Fix schema yaml name --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index b6bb342f..63b0c684 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -58,7 +58,7 @@ def check_uuids(**kwargs): pprint(kwargs["dag_run"].conf) try: - assert_json_matches_schema(kwargs["dag_run"].conf, "launch_multi_metadata_schema.yml") + assert_json_matches_schema(kwargs["dag_run"].conf, "bulk_update_entities_schema.yml") except AssertionError as e: print("invalid DAG metadata follows:") pprint(kwargs["dag_run"].conf) From 0787a68dd5acda75517533831f41a7dab10aaa75 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:08:07 -0400 Subject: [PATCH 5/9] General: Actually send data --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 63b0c684..8035e067 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -1,5 +1,6 @@ from pprint import pprint import time +import json from datetime import timedelta @@ -98,10 +99,11 @@ def update_uuids(**kwargs): http_hook = HttpHook("PUT", http_conn_id="entity_api_connection") uuids = kwargs["dag_run"].conf["uuids"] + metadata = kwargs["dag_run"].conf["metadata"] for uuid in uuids: endpoint = f"entities/{uuid}" - response = http_hook.run(endpoint, headers=headers) + response = http_hook.run(endpoint, headers=headers, data=json.dumps(metadata)) print("response: ") pprint(response.json()) time.sleep(10) From d6f5e5d6c18d44fd57abf5f95dd94d6f7350ac97 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:15:22 -0400 Subject: [PATCH 6/9] General: Try to update every uuid in the array --- .../airflow/dags/bulk_update_entities.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 8035e067..37bddc45 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -101,12 +101,21 @@ def update_uuids(**kwargs): uuids = kwargs["dag_run"].conf["uuids"] metadata = kwargs["dag_run"].conf["metadata"] + error_dict = {} + for uuid in uuids: endpoint = f"entities/{uuid}" - response = http_hook.run(endpoint, headers=headers, data=json.dumps(metadata)) - print("response: ") - pprint(response.json()) - time.sleep(10) + try: + response = http_hook.run(endpoint, headers=headers, data=json.dumps(metadata)) + print("response: ") + pprint(response.json()) + time.sleep(10) + except Exception as e: + error_dict[uuid] = repr(e) + + if error_dict: + print("The following uuids could not be updated for the following reasons: ") + print(json.dumps(error_dict)) update_uuids_t = PythonOperator( task_id="update_uuids", From 9fe2e3861f0567aef4d23046408f005ba22435af Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:21:23 -0400 Subject: [PATCH 7/9] General: Try to get all the details in the error dict --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 37bddc45..4fa098d8 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -111,7 +111,7 @@ def update_uuids(**kwargs): pprint(response.json()) time.sleep(10) except Exception as e: - error_dict[uuid] = repr(e) + error_dict[uuid] = f"Exception: {repr(e)}, \nResponse: {response.json()}" if error_dict: print("The following uuids could not be updated for the following reasons: ") From 6a56f29e8145d3ad42a4d558bb47ed5116d18a1b Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:23:15 -0400 Subject: [PATCH 8/9] General: Just print --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 4fa098d8..17d0e787 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -110,12 +110,8 @@ def update_uuids(**kwargs): print("response: ") pprint(response.json()) time.sleep(10) - except Exception as e: - error_dict[uuid] = f"Exception: {repr(e)}, \nResponse: {response.json()}" - - if error_dict: - print("The following uuids could not be updated for the following reasons: ") - print(json.dumps(error_dict)) + except: + print(f"ERROR: UUID {uuid} could not be updated.") update_uuids_t = PythonOperator( task_id="update_uuids", From 55c140383302ea646833c0e4370aee83ffa3b859 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Tue, 7 May 2024 13:47:02 -0400 Subject: [PATCH 9/9] General: Fixes to address Sunset's comments --- src/ingest-pipeline/airflow/dags/bulk_update_entities.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py index 17d0e787..ca952893 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_update_entities.py +++ b/src/ingest-pipeline/airflow/dags/bulk_update_entities.py @@ -76,7 +76,7 @@ def check_uuids(**kwargs): print(f"{uuid} is neither a dataset nor an upload and will be skipped.") print(repr(e)) - kwargs["ti"].xcom_push(key="uuids", value=filtered_uuids) + kwargs["dag_run"].conf["uuids"] = filtered_uuids check_uuids_t = PythonOperator( task_id="check_uuids", @@ -101,18 +101,17 @@ def update_uuids(**kwargs): uuids = kwargs["dag_run"].conf["uuids"] metadata = kwargs["dag_run"].conf["metadata"] - error_dict = {} - for uuid in uuids: endpoint = f"entities/{uuid}" try: response = http_hook.run(endpoint, headers=headers, data=json.dumps(metadata)) print("response: ") pprint(response.json()) - time.sleep(10) except: print(f"ERROR: UUID {uuid} could not be updated.") + time.sleep(10) + update_uuids_t = PythonOperator( task_id="update_uuids", python_callable=update_uuids,