Skip to content

Commit

Permalink
Merge pull request #302 from coderxio/gcp_cleanup
Browse files Browse the repository at this point in the history
cleanup GCP work
  • Loading branch information
jrlegrand authored Jul 2, 2024
2 parents 905e66a + 04ac0d8 commit 1d4231f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 52 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,25 @@ The access and secret-access keys can be found in 2 ways:

### Integration with Google Cloud Platform (GCP)

.env variables
Currently we are utilizing 2 GCP products: Google Cloud Storage (GCS) and BigQuery (BQ).

The current workflow has all of the dbt tables are created locally with only the final products being pushed to GCP. This reduces computational expenses especially as we test out new data sources and need to run dbt more frequently.

To accomplish this yourself, you need to follow these steps:

1. Set up a GCP account (including billing)
2. Made a new storage bucket under GCS
3. Enable IAM API
4. Create a new [service account](https://cloud.google.com/iam/docs/service-account-overview?hl=en) with permissions to add and delete content for both GCS and BQ, [dbt example](https://docs.getdbt.com/guides/bigquery?step=4)
5. Created new key for service account and download it as JSON, this gets added to the root directory of the project as gcp.json
6. Add the necessary environment variables to .env

- GCS_BUCKET
- GCP_PROJECT
- GCP_DATASET

7. Rebuild docker containers

### Troubleshooting

If you get issues on folder permissions:
Expand Down
15 changes: 5 additions & 10 deletions airflow/dags/build_marts/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from common_dag_tasks import get_most_recent_dag_run
from airflow.decorators import dag,task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.hooks.subprocess import SubprocessHook

from common_dag_tasks import run_subprocess_command

def run_dag_condition(dag_id):
last_run = get_most_recent_dag_run(dag_id)
Expand Down Expand Up @@ -54,14 +55,8 @@ def execute_external_dag_list(**kwargs):
# Once DBT freshness metrics are implemented, this task can be updated
@task
def transform_tasks():
ndc_subprocess = SubprocessHook()
result = ndc_subprocess.run_command(['dbt', 'run', '--select', '+models/marts/ndc'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
atc_subprocess = SubprocessHook()
result = atc_subprocess.run_command(['dbt', 'run', '--select', '+models/marts/classification'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
products_subprocess = SubprocessHook()
result = products_subprocess.run_command(['dbt', 'run', '--select', '+models/marts/products'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
run_subprocess_command(['dbt', 'run', '--select', '+models/marts/ndc'], cwd='/dbt/sagerx')
run_subprocess_command(['dbt', 'run', '--select', '+models/marts/classification'], cwd='/dbt/sagerx')
run_subprocess_command(['dbt', 'run', '--select', '+models/marts/products'], cwd='/dbt/sagerx')

execute_external_dag_list() >> transform_tasks()
23 changes: 16 additions & 7 deletions airflow/dags/common_dag_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ def upload_csv_to_gcs(dag_id):
gcp_tasks.append(gcp_task)
return gcp_tasks

def run_subprocess_command(command:list, cwd:str, success_code:int = 0) -> None:
from airflow.hooks.subprocess import SubprocessHook
from airflow.exceptions import AirflowException

subprocess = SubprocessHook()
run_results = subprocess.run_command(command, cwd=cwd)
if run_results.exit_code in [0,success_code]: #dbt default success code is 0
print(f"Command succeeded with output: {run_results.output}")
else:
raise AirflowException(f"Command failed with return code {run_results.exit_code}: {run_results.output}")


@task
def extract(dag_id,url) -> str:
# Task to download data from web location
Expand All @@ -106,11 +118,8 @@ def extract(dag_id,url) -> str:
@task
def transform(dag_id, models_subdir='staging',task_id="") -> None:
# Task to transform data using dbt
from airflow.hooks.subprocess import SubprocessHook
from airflow.exceptions import AirflowException

subprocess = SubprocessHook()
result = subprocess.run_command(['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'], cwd='/dbt/sagerx')
if result.exit_code != 0:
raise AirflowException(f"Command failed with return code {result.exit_code}: {result.output}")
print("Result from dbt:", result)
run_subprocess_command(
command=['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'],
cwd='/dbt/sagerx'
)
36 changes: 10 additions & 26 deletions airflow/dags/dbt_gcp/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

from airflow.operators.python_operator import PythonOperator

from common_dag_tasks import run_subprocess_command

from os import environ

dag_id = "dbt_gcp"

dag = create_dag(
Expand All @@ -19,26 +22,8 @@

with dag:
def run_dbt():
from airflow.hooks.subprocess import SubprocessHook
from airflow.exceptions import AirflowException

subprocess = SubprocessHook()

run_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run'], cwd='/dbt/sagerx')
if run_results.exit_code != 1:
raise AirflowException(f"Command failed with return code {run_results.exit_code}: {run_results.output}")
print(f"Command succeeded with output: {run_results.output}")

docs_results = subprocess.run_command(['docker','exec','dbt','dbt', "docs", "generate"], cwd='/dbt/sagerx')
if docs_results.exit_code != 0:
raise AirflowException(f"Command failed with return code {docs_results.exit_code}: {docs_results.output}")
print(f"Command succeeded with output: {docs_results.output}")

check_results = subprocess.run_command(['docker','exec','dbt','dbt', 'run-operation', 'check_data_availability'], cwd='/dbt/sagerx')
if check_results.exit_code != 0:
raise AirflowException(f"Command failed with return code {check_results.exit_code}: {check_results.output}")
print(f"Command succeeded with output: {check_results.output}")

run_subprocess_command(command=['docker','exec','dbt','dbt', 'run'], cwd='/dbt/sagerx', success_code=1)
run_subprocess_command(command=['docker','exec','dbt','dbt', 'run-operation', 'check_data_availability'], cwd='/dbt/sagerx')

def get_dbt_models():
from sagerx import run_query_to_df
Expand Down Expand Up @@ -70,7 +55,7 @@ def load_to_gcs(run_dbt_task):
task_id=f'postgres_to_gcs_{schema_name}.{table_name}',
postgres_conn_id='postgres_default',
sql=f"SELECT * FROM {schema_name}.{table_name}",
bucket="sagerx_bucket",
bucket=environ.get("GCS_BUCKET"),
filename=f'{table_name}',
export_format='csv',
gzip=False,
Expand All @@ -80,15 +65,14 @@ def load_to_gcs(run_dbt_task):
# Populate BigQuery tables with data from Cloud Storage Bucket
cs2bq_task = GCSToBigQueryOperator(
task_id=f"bq_load_{schema_name}.{table_name}",
bucket="sagerx_bucket",
bucket=environ.get("GCS_BUCKET"),
source_objects=[f"{table_name}"],
destination_project_dataset_table=f"sagerx-420700.sagerx_lake.{table_name}",
destination_project_dataset_table=f"{environ.get('GCP_PROJECT')}.{environ.get('GCP_DATASET')}.{table_name}",
autodetect = True,
external_table=False,
create_disposition= "CREATE_IF_NEEDED",
write_disposition= "WRITE_TRUNCATE",
gcp_conn_id = 'google_cloud_default',
location='us-east5',
gcp_conn_id = 'google_cloud_default',
dag = dag,
)

Expand Down
7 changes: 3 additions & 4 deletions airflow/dags/nadac/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from airflow.decorators import dag, task

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.subprocess import SubprocessHook

from common_dag_tasks import run_subprocess_command

starting_date = pendulum.parse("2013-12-01")

Expand Down Expand Up @@ -79,9 +80,7 @@ def get_download_url(self, year):
# Task to transform data using dbt
@task
def transform():
subprocess = SubprocessHook()
result = subprocess.run_command(['dbt', 'run', '--select', 'models/staging/nadac'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
run_subprocess_command(['dbt', 'run', '--select', 'models/staging/nadac'], cwd='/dbt/sagerx')

extract() >> load >> transform()

Expand Down
7 changes: 3 additions & 4 deletions airflow/dags/rxnorm/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from airflow.operators.python import get_current_context
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.subprocess import SubprocessHook

from common_dag_tasks import run_subprocess_command


@dag(
Expand Down Expand Up @@ -75,9 +76,7 @@ def extract(st: str):
# Task to transform data using dbt
@task
def transform():
subprocess = SubprocessHook()
result = subprocess.run_command(['dbt', 'run', '--select', 'models/staging/rxnorm', 'models/intermediate/rxnorm'], cwd='/dbt/sagerx')
print("Result from dbt:", result)
run_subprocess_command(['dbt', 'run', '--select', 'models/staging/rxnorm', 'models/intermediate/rxnorm'], cwd='/dbt/sagerx')

extract(get_st(get_tgt())) >> load >> transform()

Expand Down

0 comments on commit 1d4231f

Please sign in to comment.