Skip to content

Commit

Permalink
Merge pull request #268 from coderxio/schema2
Browse files Browse the repository at this point in the history
Update DB Schemas [Attempt 2]
  • Loading branch information
jrlegrand authored Mar 27, 2024
2 parents 7dec1d1 + 1d7dc3f commit 4f6c4df
Show file tree
Hide file tree
Showing 100 changed files with 26,055 additions and 331 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/dailymed_daily/dailymed_daily_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def process_dailymed(data_folder, xslt, ti):
)
df.to_sql(
"dailymed_daily",
schema="datasource",
schema="sagerx_lake",
con=db_conn,
if_exists="append",
index=False,
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/dailymed_daily/organization_metrics.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ SELECT o.set_id
THEN 'Yes' ELSE '' END AS labeler_only
,COUNT(*)

FROM staging.dailymed_organization o
INNER JOIN staging.dailymed_main ma ON o.set_id = ma.set_id
LEFT JOIN staging.dailymed_organization_text ot ON o.set_id = ot.set_id
FROM sagerx.dailymed_organization o
INNER JOIN sagerx.dailymed_main ma ON o.set_id = ma.set_id
LEFT JOIN sagerx.dailymed_organization_text ot ON o.set_id = ot.set_id

GROUP BY o.set_id, ma.market_status
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_daily/staging-dailymed_interaction.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_interaction */
DROP TABLE IF EXISTS staging.dailymed_interaction CASCADE;
/* sagerx.dailymed_interaction */
DROP TABLE IF EXISTS sagerx.dailymed_interaction CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_interaction (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_interaction (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_interaction
INSERT INTO sagerx.dailymed_interaction
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('dailymed/InteractionText'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_daily/staging-dailymed_main.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_main */
DROP TABLE IF EXISTS staging.dailymed_main CASCADE;
/* sagerx.dailymed_main */
DROP TABLE IF EXISTS sagerx.dailymed_main CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_main (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_main (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -15,10 +15,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_main
INSERT INTO sagerx.dailymed_main
SELECT spl, y.*, 'https://dailymed.nlm.nih.gov/dailymed/drugInfo.cfm?setid=' || y.set_id
FROM xml_table x,
XMLTABLE('dailymed'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_daily/staging-dailymed_ndc.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_ndc */
--DROP TABLE IF EXISTS staging.dailymed_ndc CASCADE;
/* sagerx.dailymed_ndc */
--DROP TABLE IF EXISTS sagerx.dailymed_ndc CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_ndc (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_ndc (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_ndc
INSERT INTO sagerx.dailymed_ndc
SELECT spl, y.*, ndc_to_11(y.ndc) AS ndc11
FROM xml_table x,
XMLTABLE('dailymed/ndc_list/NDC'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_daily/staging-dailymed_organization.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_organization */
--DROP TABLE IF EXISTS staging.dailymed_organization CASCADE;
/* sagerx.dailymed_organization */
--DROP TABLE IF EXISTS sagerx.dailymed_organization CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_organization (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_organization (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -13,10 +13,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_organization
INSERT INTO sagerx.dailymed_organization
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('/dailymed/Organizations/establishment'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_organization_activity */
--DROP TABLE IF EXISTS staging.dailymed_organization_activity CASCADE;
/* sagerx.dailymed_organization_activity */
--DROP TABLE IF EXISTS sagerx.dailymed_organization_activity CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_organization_activity (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_organization_activity (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
WITH xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_organization_activity
INSERT INTO sagerx.dailymed_organization_activity
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('/dailymed/Organizations/establishment/function'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_organization_item */
--DROP TABLE IF EXISTS staging.dailymed_organization_item CASCADE;
/* sagerx.dailymed_organization_item */
--DROP TABLE IF EXISTS sagerx.dailymed_organization_item CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_organization_item (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_organization_item (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -13,10 +13,10 @@
WITH xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_organization_item
INSERT INTO sagerx.dailymed_organization_item
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('/dailymed/Organizations/establishment/function/item_list/item'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_organization_text */
--DROP TABLE IF EXISTS staging.dailymed_organization_text CASCADE;
/* sagerx.dailymed_organization_text */
--DROP TABLE IF EXISTS sagerx.dailymed_organization_text CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_organization_text (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_organization_text (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_daily
from sagerx_lake.dailymed_daily
)

INSERT INTO staging.dailymed_organization_text
INSERT INTO sagerx.dailymed_organization_text
SELECT spl
,document_id
,set_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* datasource.dailymed_pharm_class */
DROP TABLE IF EXISTS datasource.dailymed_pharm_class;
/* sagerx_lake.dailymed_pharm_class */
DROP TABLE IF EXISTS sagerx_lake.dailymed_pharm_class;

CREATE TABLE datasource.dailymed_pharm_class (
CREATE TABLE sagerx_lake.dailymed_pharm_class (
spl_setid TEXT,
spl_version TEXT,
pharma_setid TEXT,
pharma_version TEXT
);

COPY datasource.dailymed_pharm_class
COPY sagerx_lake.dailymed_pharm_class
FROM '{data_path}/pharmacologic_class_mappings.txt' DELIMITER '|' QUOTE E'\b' CSV HEADER;
2 changes: 1 addition & 1 deletion airflow/dags/dailymed_rx_full/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def process_dailymed(data_folder, xslt, ti):
)
df.to_sql(
"dailymed_rx_full",
schema="datasource",
schema="sagerx_lake",
con=db_conn,
if_exists="append",
index=False,
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_rx_full/staging-dailymed_interaction.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_interaction */
DROP TABLE IF EXISTS staging.dailymed_interaction CASCADE;
/* sagerx.dailymed_interaction */
DROP TABLE IF EXISTS sagerx.dailymed_interaction CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_interaction (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_interaction (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_rx_full
from sagerx_lake.dailymed_rx_full
)

INSERT INTO staging.dailymed_interaction
INSERT INTO sagerx.dailymed_interaction
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('dailymed/InteractionText'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_rx_full/staging-dailymed_main.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_main */
DROP TABLE IF EXISTS staging.dailymed_main CASCADE;
/* sagerx.dailymed_main */
DROP TABLE IF EXISTS sagerx.dailymed_main CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_main (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_main (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -15,10 +15,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_rx_full
from sagerx_lake.dailymed_rx_full
)

INSERT INTO staging.dailymed_main
INSERT INTO sagerx.dailymed_main
SELECT spl, y.*, 'https://dailymed.nlm.nih.gov/dailymed/drugInfo.cfm?setid=' || y.set_id
FROM xml_table x,
XMLTABLE('dailymed'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_rx_full/staging-dailymed_ndc.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_ndc */
--DROP TABLE IF EXISTS staging.dailymed_ndc CASCADE;
/* sagerx.dailymed_ndc */
--DROP TABLE IF EXISTS sagerx.dailymed_ndc CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_ndc (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_ndc (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -12,10 +12,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_rx_full
from sagerx_lake.dailymed_rx_full
)

INSERT INTO staging.dailymed_ndc
INSERT INTO sagerx.dailymed_ndc
SELECT spl, y.*, ndc_to_11(y.ndc) AS ndc11
FROM xml_table x,
XMLTABLE('dailymed/ndc_list/NDC'
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/dailymed_rx_full/staging-dailymed_organization.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* staging.dailymed_organization */
--DROP TABLE IF EXISTS staging.dailymed_organization CASCADE;
/* sagerx.dailymed_organization */
--DROP TABLE IF EXISTS sagerx.dailymed_organization CASCADE;

CREATE TABLE IF NOT EXISTS staging.dailymed_organization (
CREATE TABLE IF NOT EXISTS sagerx.dailymed_organization (
spl TEXT NOT NULL,
document_id TEXT NOT NULL,
set_id TEXT,
Expand All @@ -13,10 +13,10 @@
with xml_table as
(
select spl, xml_content::xml as xml_column
from datasource.dailymed_rx_full
from sagerx_lake.dailymed_rx_full
)

INSERT INTO staging.dailymed_organization
INSERT INTO sagerx.dailymed_organization
SELECT spl, y.*
FROM xml_table x,
XMLTABLE('/dailymed/Organizations/establishment'
Expand Down
8 changes: 4 additions & 4 deletions airflow/dags/dailymed_rxnorm/load-dailymed_rxnorm.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/* datasource.dailymed_rxnorm */
DROP TABLE IF EXISTS datasource.dailymed_rxnorm CASCADE;
/* sagerx_lake.dailymed_rxnorm */
DROP TABLE IF EXISTS sagerx_lake.dailymed_rxnorm CASCADE;

CREATE TABLE datasource.dailymed_rxnorm (
CREATE TABLE sagerx_lake.dailymed_rxnorm (
setid TEXT,
spl_version TEXT,
rxcui TEXT,
rxstr TEXT,
rxtty TEXT
);

COPY datasource.dailymed_rxnorm
COPY sagerx_lake.dailymed_rxnorm
FROM '{data_path}/rxnorm_mappings.txt' DELIMITER '|' QUOTE E'\b' CSV HEADER;
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/* datasource.dailymed_zip_file_metadata */
DROP TABLE IF EXISTS datasource.dailymed_zip_file_metadata;
/* sagerx_lake.dailymed_zip_file_metadata */
DROP TABLE IF EXISTS sagerx_lake.dailymed_zip_file_metadata;

CREATE TABLE datasource.dailymed_zip_file_metadata (
CREATE TABLE sagerx_lake.dailymed_zip_file_metadata (
setid TEXT,
zip_file_name TEXT,
upload_date TEXT,
spl_version TEXT,
title TEXT
);

COPY datasource.dailymed_zip_file_metadata
COPY sagerx_lake.dailymed_zip_file_metadata
FROM '{data_path}/dm_spl_zip_files_meta_data.txt' DELIMITER '|' QUOTE E'\b' CSV HEADER;
2 changes: 1 addition & 1 deletion airflow/dags/fda_enforcement/dag_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ def load_json(data_path):
df = pd.DataFrame(json_object["results"])
df.set_index("recall_number")
print(f"Dataframe loaded. Number of rows: {len(df)}")
load_df_to_pg(df,"datasource","fda_enforcement","replace",dtype_name="openfda")
load_df_to_pg(df,"sagerx_lake","fda_enforcement","replace",dtype_name="openfda")
8 changes: 4 additions & 4 deletions airflow/dags/fda_excluded/load_package.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* datasource.fda_excluded_package */
DROP TABLE IF EXISTS datasource.fda_excluded_package CASCADE;
/* sagerx_lake.fda_excluded_package */
DROP TABLE IF EXISTS sagerx_lake.fda_excluded_package CASCADE;

CREATE TABLE datasource.fda_excluded_package (
CREATE TABLE sagerx_lake.fda_excluded_package (
productid TEXT NOT NULL,
productndc TEXT NOT NULL,
ndcpackagecode TEXT,
Expand All @@ -12,5 +12,5 @@ ndc_exclude_flag TEXT,
sample_package TEXT
);

COPY datasource.fda_excluded_package
COPY sagerx_lake.fda_excluded_package
FROM '{data_path}/Packages_excluded.txt' DELIMITER E'\t' CSV HEADER ENCODING 'WIN1252';;
8 changes: 4 additions & 4 deletions airflow/dags/fda_excluded/load_product.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* datasource.fda_excluded_package */
DROP TABLE IF EXISTS datasource.fda_excluded_product CASCADE;
/* sagerx_lake.fda_excluded_package */
DROP TABLE IF EXISTS sagerx_lake.fda_excluded_product CASCADE;

CREATE TABLE datasource.fda_excluded_product (
CREATE TABLE sagerx_lake.fda_excluded_product (
productid TEXT,
productndc TEXT,
producttypename TEXT,
Expand All @@ -25,5 +25,5 @@ listing_record_certified_through TEXT,
PRIMARY KEY (productid)
);

COPY datasource.fda_excluded_product
COPY sagerx_lake.fda_excluded_product
FROM '{data_path}/Products_excluded.txt' DELIMITER E'\t' CSV HEADER ENCODING 'WIN1252';;
Loading

0 comments on commit 4f6c4df

Please sign in to comment.