Skip to content

Commit

Permalink
Merge pull request #782 from hubmapconsortium/phillips/status_changer
Browse files Browse the repository at this point in the history
StatusChanger
  • Loading branch information
sunset666 authored Nov 6, 2023
2 parents 72f907b + cc85eca commit 3d2512c
Show file tree
Hide file tree
Showing 13 changed files with 1,369 additions and 1,192 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ __pycache__
db.sqlite3
media/*
media
.envrc

docker/dev-local

Expand Down
164 changes: 0 additions & 164 deletions src/ingest-pipeline/airflow/dags/error_catching/failure_callback.py

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from requests.exceptions import HTTPError
from requests import codes
import traceback

from airflow.configuration import conf as airflow_conf
from airflow.hooks.http_hook import HttpHook

from export_and_backup.export_and_backup_plugin import ExportAndBackupPlugin, add_path
from status_change.status_utils import get_hubmap_id_from_uuid
from utils import get_auth_tok

from airflow.configuration import conf as airflow_conf

with add_path(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')):
from submodules import hubmapinventory

Expand All @@ -18,7 +16,7 @@ class PublishedBackupPlugin(ExportAndBackupPlugin):
def run_plugin(self):
return "PublishedBackupPlugin ran successfully"

## Future functionality
# Future functionality
# Back-up published datasets to appropriate location (S3)
# Also stage for inclusion in 6-month Glacier backup?
# Not sure how datasets are updated post-publication; that is likely a separate process--
Expand All @@ -32,34 +30,8 @@ def __init__(self, **kwargs):
self.kwargs = kwargs
self.token = get_auth_tok(**self.kwargs)

# Should I add this to utils instead, so it can be reused?
def get_hubmap_id(self, uuid):
method = 'GET'
headers = {
'authorization': f'Bearer {self.token}',
'content-type': 'application/json',
'X-Hubmap-Application': 'ingest-pipeline',
}
http_hook = HttpHook(method, http_conn_id='entity_api_connection')

endpoint = f'entities/{uuid}'

try:
response = http_hook.run(endpoint,
headers=headers,
extra_options={'check_response': False})
response.raise_for_status()
return response.json()
except HTTPError as e:
print(f'ERROR: {e}')
if e.response.status_code == codes.unauthorized:
raise RuntimeError('entity database authorization was rejected?')
else:
print('benign error')
return {}

def run_plugin(self):
hubmap_id = self.get_hubmap_id(self.kwargs["uuid"])["hubmap_id"]
hubmap_id = get_hubmap_id_from_uuid(self.token, self.kwargs["uuid"])["hubmap_id"]
dbgap_study_id = self.kwargs.get("dbgap_study_id", None)
# instance will need to be changed
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_params(*argv, **kwargs):
pprint(kwargs)
return 'maybe this will make it run only once'

# TODO: this code looks potentially out of date? Did not update with StatusManager yet.
def send_status_msg(**kwargs):
md_extract_retcode = int(kwargs['ti'].xcom_pull(task_ids="run_md_extract"))
print('md_extract_retcode: ', md_extract_retcode)
Expand Down
66 changes: 66 additions & 0 deletions src/ingest-pipeline/airflow/dags/status_change/failure_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
from pprint import pprint

from status_change.status_manager import StatusChanger
from status_change.status_utils import formatted_exception
from utils import get_auth_tok


class FailureCallbackException(Exception):
pass


class FailureCallback:
"""
List should be overridden by each subclass with appropriate values.
"""

def __init__(self, context):
self.context = context
self.uuid = self.context.get("task_instance").xcom_pull(key="uuid")
self.context["uuid"] = self.uuid
self.auth_tok = get_auth_tok(**context)
self.dag_run = self.context.get("dag_run")
self.task = self.context.get("task")
exception = self.context.get("exception")
self.formatted_exception = formatted_exception(exception)

self.pre_set_status()

def get_extra_fields(self):
"""
Error message might need to be overwritten when
subclassed for various DAGs.
'Error' is the default for FailureCallback, which indicates a pipeline has failed.
"""
return {
"validation_message": f"""
Process {self.dag_run.dag_id} started {self.dag_run.execution_date}
failed at task {self.task.task_id}.
{f'Error: {self.formatted_exception}' if self.formatted_exception else ""}
""",
}

def pre_set_status(self):
# Allows for alterations to props, before calling StatusChanger
# This was added to support some email functions and is perhaps
# at the moment over-engineered.
self.set_status()

def set_status(self):
"""
The failure callback needs to set the dataset status,
otherwise it will remain in the "Processing" state
"""
data = self.get_extra_fields()
logging.info("data: ")
logging.info(pprint(data))
StatusChanger(
self.uuid,
self.auth_tok,
"error",
{
"extra_fields": self.get_extra_fields(),
"extra_options": {},
},
).on_status_change()
Loading

0 comments on commit 3d2512c

Please sign in to comment.