Skip to content

Commit

Permalink
Merge branch 'main' into jrlegrand/synonyms
Browse files Browse the repository at this point in the history
  • Loading branch information
jrlegrand authored Jan 16, 2025
2 parents fbb06a5 + 5c1a981 commit cdf56b5
Show file tree
Hide file tree
Showing 39 changed files with 855 additions and 166 deletions.
193 changes: 193 additions & 0 deletions airflow/dags/ashp/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import json
import logging
import os
import re
from datetime import date, datetime
from time import sleep

import requests
from bs4 import BeautifulSoup
import pandas as pd

import pendulum

from airflow_operator import create_dag
from airflow.providers.postgres.operators.postgres import PostgresOperator

from common_dag_tasks import extract, transform, generate_sql_list, get_ds_folder
from sagerx import read_sql_file, load_df_to_pg

from airflow.decorators import task


dag_id = "ashp"

dag = create_dag(
dag_id=dag_id,
schedule="0 4 * * *",
start_date=pendulum.yesterday(),
catchup=False,
concurrency=2,
)

with dag:
landing_url = "https://www.ashp.org/drug-shortages/current-shortages/drug-shortages-list?page=CurrentShortages"
base_url = "https://www.ashp.org/drug-shortages/current-shortages/"
ndc_regex = re.compile(r"\d{5}\-\d{4}\-\d{2}") # ASHP shortage pages always have 5-4-2 format NDCs
created_regex = re.compile(r"Created (\w+ \d+, \d+)")
updated_regex = re.compile(r"Updated (\w+ \d+, \d+)")

ds_folder = get_ds_folder(dag_id)

transform_task = transform(dag_id)

@task
def extract_load_shortage_list():
logging.basicConfig(level=logging.INFO, format='%(asctime)s : %(levelname)s : %(message)s')

logging.info('Checking ASHP website for updates')
shortage_list = requests.get(landing_url)

if shortage_list.status_code != 200:
logging.error('ASHP website unreachable')
exit()

ashp_drugs = []
soup = BeautifulSoup(shortage_list.content, 'html.parser')
for link in soup.find(id='1_dsGridView').find_all('a'):
ashp_drugs.append({
'name': link.get_text(),
'detail_url': link.get('href')
})

for shortage in ashp_drugs:
shortage_detail_data = requests.get(base_url + shortage['detail_url'])
soup = BeautifulSoup(shortage_detail_data.content, 'html.parser')

# Get shortage reasons
shortage_reasons = []
try:
for reason in soup.find(id='1_lblReason').find_all('li'):
shortage_reasons.append(reason.get_text())
except AttributeError:
logging.debug(f'No shortage reasons for {shortage.get("name")}')
shortage['shortage_reasons'] = None
else:
shortage['shortage_reasons'] = json.dumps(shortage_reasons)

# Get resupply dates
resupply_dates = []
try:
for date_info in soup.find(id='1_lblResupply').find_all('li'):
resupply_dates.append(date_info.get_text())
except AttributeError:
logging.debug(f'No resupply dates for {shortage.get("name")}')
shortage['resupply_dates'] = None
else:
shortage['resupply_dates'] = json.dumps(resupply_dates)

# Get implications on patient care
care_implications = []
try:
for implication in soup.find(id='1_lblImplications').find_all('li'):
care_implications.append(implication.get_text())
except AttributeError:
logging.debug(f'No care implications for {shortage.get("name")}')
shortage['care_implications'] = None
else:
shortage['care_implications'] = json.dumps(care_implications)

# Get safety information
safety_notices = []
try:
for notice in soup.find(id='1_lblSafety').find_all('li'):
safety_notices.append(notice.get_text())
except AttributeError:
logging.debug(f'No safety notices for {shortage.get("name")}')
shortage['safety_notices'] = None
else:
shortage['safety_notices'] = json.dumps(safety_notices)

# Get alternative agents and management info
alternatives = []
try:
for alternative in soup.find(id='1_lblAlternatives').find_all('li'):
alternatives.append(alternative.get_text())
except AttributeError:
logging.debug(f'No alternatives/management information for {shortage.get("name")}')
shortage['alternatives_and_management'] = None
else:
shortage['alternatives_and_management'] = json.dumps(alternatives)

# Get affected NDCs
affected_ndcs = []
try:
for ndc_description in soup.find(id='1_lblProducts').find_all('li'):
ndc = re.search(ndc_regex, ndc_description.get_text())[0]
affected_ndcs.append(ndc)
shortage['affected_ndcs'] = affected_ndcs
except (TypeError, AttributeError):
logging.debug(f'No affected NDCs for {shortage.get("name")}')

# Get currently available NDCs
available_ndcs = []
try:
for ndc_description in soup.find(id='1_lblAvailable').find_all('li'):
ndc = ndc_regex.search(ndc_description.get_text())[0]
available_ndcs.append(ndc)
shortage['available_ndcs'] = available_ndcs
except (TypeError, AttributeError):
logging.debug(f'No available NDCs for {shortage.get("name")}')

# Get created date
stamp = soup.find(id='1_lblUpdated').find('p').get_text()
try:
created_date = created_regex.search(stamp).group(1)
created_date = datetime.strptime(created_date, '%B %d, %Y')
shortage['created_date'] = created_date
except AttributeError:
logging.debug(f'Missing ASHP created date for {shortage.get("name")}')
shortage['created_date'] = None
except ValueError:
logging.error(f'Could not parse created date for {shortage.get("name")}')
shortage['created_date'] = None

# Get last updated date
try:
updated_date = updated_regex.search(stamp).group(1)
updated_date = datetime.strptime(updated_date, '%B %d, %Y')
shortage['updated_date'] = updated_date
except AttributeError:
logging.debug(f'Missing ASHP update date for {shortage.get("name")}')
shortage['updated_date'] = None
except ValueError:
logging.error(f'Could not parse update date for {shortage.get("name")}')
shortage['updated_date'] = None

sleep(0.2)

if len(ashp_drugs) > 0:
# Load the main shortage table
shortage_columns = ['name', 'detail_url', 'shortage_reasons', 'resupply_dates',
'alternatives_and_management', 'care_implications', 'safety_notices',
'created_date', 'updated_date']
shortages = pd.DataFrame(ashp_drugs, columns=shortage_columns)
load_df_to_pg(shortages, "sagerx_lake", "ashp_shortage_list", "replace", index=False)

# Load the table of affected and available NDCs
affected_ndcs = pd.DataFrame(ashp_drugs, columns=['detail_url', 'affected_ndcs']).explode('affected_ndcs')
affected_ndcs['ndc_type'] = 'affected'
affected_ndcs = affected_ndcs.rename(columns={'affected_ndcs': 'ndc'})

available_ndcs = pd.DataFrame(ashp_drugs, columns=['detail_url', 'available_ndcs']).explode(
'available_ndcs')
available_ndcs['ndc_type'] = 'available'
available_ndcs = available_ndcs.rename(columns={'available_ndcs': 'ndc'})

ndcs = pd.concat([affected_ndcs, available_ndcs])
ndcs = ndcs[~ndcs['ndc'].isnull()] # Remove shortages that have no associated NDCs
load_df_to_pg(ndcs, "sagerx_lake", "ashp_shortage_list_ndcs", "replace", index=False)
else:
logging.error('Drug shortage list not found')

extract_load_shortage_list() >> transform_task
1 change: 1 addition & 0 deletions airflow/dags/build_marts/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def execute_external_dag_list(**kwargs):
# Once DBT freshness metrics are implemented, this task can be updated
@task
def transform_tasks():
run_subprocess_command(['docker', 'exec', 'dbt', 'dbt', 'seed'], cwd='/dbt/sagerx')
run_subprocess_command(['docker', 'exec', 'dbt', 'dbt', 'run', '--select', '+models/marts/ndc'], cwd='/dbt/sagerx')
run_subprocess_command(['docker', 'exec', 'dbt', 'dbt', 'run', '--select', '+models/marts/classification'], cwd='/dbt/sagerx')
run_subprocess_command(['docker', 'exec', 'dbt', 'dbt', 'run', '--select', '+models/marts/products'], cwd='/dbt/sagerx')
Expand Down
6 changes: 4 additions & 2 deletions airflow/dags/common_dag_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ def extract(dag_id,url) -> str:


@task
def transform(dag_id, models_subdir='staging',task_id="") -> None:
def transform(dag_id, models_subdir=['staging'], task_id="") -> None:
# Task to transform data using dbt
models = [f'models/{model_subdir}/{dag_id}' for model_subdir in models_subdir]
command = ['docker', 'exec', 'dbt', 'dbt', 'run', '--select'] + models

run_subprocess_command(
command=['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'],
command=command,
cwd='/dbt/sagerx'
)
85 changes: 52 additions & 33 deletions airflow/dags/dailymed/dag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path
import pendulum
import zipfile
import os

from airflow.decorators import dag, task
from airflow.hooks.subprocess import SubprocessHook
Expand All @@ -18,8 +20,15 @@ def dailymed():

ds_folder = Path("/opt/airflow/dags") / dag_id
data_folder = Path("/opt/airflow/data") / dag_id
file_set = "dm_spl_release_human_rx_part"
#file_set = "dm_spl_daily_update_07092024"

# NOTE: "dm_spl_release_human" accounts for both
# rx and otc SPLs (but no other types of SPLs)
# - "dm_spl_release_human_rx" for rx meds only
# - "dm_spl_release_human_otc" for otc meds only
# - "dm_spl_release_human_rx_part1" for a given part
# - "dm_spl_daily_update_MMDDYYYY" for a given date
# (replace MMDDYYY with your month, day, and year)
file_set = "dm_spl_release_human_rx"

def connect_to_ftp_dir(ftp_str: str, dir: str):
import ftplib
Expand All @@ -41,16 +50,14 @@ def obtain_ftp_file_list(ftp):
return file_list

def get_dailymed_files(ftp, file_name: str):
import zipfile
import os

zip_path = create_path(data_folder) / file_name

with open(zip_path, "wb") as file:
ftp.retrbinary(f"RETR {file_name}", file.write)

with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(data_folder.with_suffix(""))

os.remove(zip_path)

def transform_xml(input_xml, xslt):
Expand All @@ -63,21 +70,8 @@ def transform_xml(input_xml, xslt):
new_xml = xslt_transformer(dom)
return etree.tostring(new_xml, pretty_print=True).decode("utf-8")

@task
def extract():
dailymed_ftp = "public.nlm.nih.gov"
ftp_dir = "/nlmdata/.dailymed/"

ftp = connect_to_ftp_dir(dailymed_ftp, ftp_dir)

for file_name in obtain_ftp_file_list(ftp):
get_dailymed_files(ftp, file_name)

@task
def load():
import zipfile
def load_xml_data(spl_type_data_folder: Path):
import re
import os
import pandas as pd
import sqlalchemy

Expand All @@ -86,38 +80,63 @@ def load():
db_conn_string = os.environ["AIRFLOW_CONN_POSTGRES_DEFAULT"]
db_conn = sqlalchemy.create_engine(db_conn_string)

prescription_data_folder = (
data_folder
/ "prescription"
)

data = []
for zip_folder in prescription_data_folder.iterdir():
# logging.info(zip_folder)
for zip_folder in spl_type_data_folder.iterdir():
with zipfile.ZipFile(zip_folder) as unzipped_folder:
folder_name = zip_folder.stem
zip_file = zip_folder.stem
set_id = zip_file.split('_')[1]
for subfile in unzipped_folder.infolist():
if re.search("\.xml$", subfile.filename):
new_file = unzipped_folder.extract(subfile, prescription_data_folder)
xml_file = subfile.filename

# xslt transform
xml_content = transform_xml(new_file, xslt)
os.remove(new_file)
temp_xml_file = unzipped_folder.extract(subfile, spl_type_data_folder)
xml_content = transform_xml(temp_xml_file, xslt)
os.remove(temp_xml_file)

# append row to the data list
data.append({"spl": folder_name, "file_name": subfile.filename, "xml_content": xml_content})
data.append({"set_id": set_id, "zip_file": zip_file, "xml_file": xml_file, "xml_content": xml_content})

df = pd.DataFrame(
data,
columns=["spl", "file_name", "xml_content"],
columns=["set_id", "zip_file", "xml_file", "xml_content"],
)

load_df_to_pg(
df,
schema_name="sagerx_lake",
table_name="dailymed",
if_exists="replace",
if_exists="append", # TODO: make this better - maybe don't put stuff in multiple folders?
index=False,
)

@task
def extract():
dailymed_ftp = "public.nlm.nih.gov"
ftp_dir = "/nlmdata/.dailymed/"

ftp = connect_to_ftp_dir(dailymed_ftp, ftp_dir)

file_list = obtain_ftp_file_list(ftp)
print(f'Extracting {file_list}')

for file_name in file_list:
get_dailymed_files(ftp, file_name)

@task
def load():
spl_types = ['prescription', 'otc']

for spl_type in spl_types:
spl_type_data_folder = (
data_folder
/ spl_type
)
if os.path.exists(spl_type_data_folder):
print(f'Loading {spl_type} SPLs...')
load_xml_data(spl_type_data_folder)


# Task to transform data using dbt
@task
def transform():
Expand Down
Loading

0 comments on commit cdf56b5

Please sign in to comment.