Skip to content

Commit

Permalink
Merge pull request #316 from coderxio/jrlegrand/index-bug
Browse files Browse the repository at this point in the history
Fix index bug
  • Loading branch information
jrlegrand authored Jul 31, 2024
2 parents b0cf084 + 1fa0a24 commit d574ce9
Showing 1 changed file with 4 additions and 12 deletions.
16 changes: 4 additions & 12 deletions airflow/dags/sagerx.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,6 @@ def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
engine = pg_hook.get_sqlalchemy_engine()

if if_exists == "replace":
engine.execute(f'DROP TABLE IF EXISTS {schema_name}.{table_name} cascade')

# Create index before loading data if specified
if create_index and index_columns:
if len(index_columns) == 1:
engine.execute(f'CREATE INDEX IF NOT EXISTS idx_{table_name}_{index_columns[0]} ON {schema_name}.{table_name} ({index_columns[0]})')
else:
columns_str = ', '.join(index_columns)
engine.execute(f'CREATE INDEX IF NOT EXISTS idx_{table_name}_{"_".join(index_columns)} ON {schema_name}.{table_name} ({columns_str})')


if dtype_name:
dtype = {dtype_name:sqlalchemy.types.JSON}
else:
Expand All @@ -189,6 +177,10 @@ def load_df_to_pg(df,schema_name:str,table_name:str,if_exists:str,dtype_name:str
index=index
)

if create_index and index_columns:
columns_str = ', '.join(index_columns)
engine.execute(f'CREATE INDEX IF NOT EXISTS idx_{table_name}_{"_".join(index_columns)} ON {schema_name}.{table_name} ({columns_str})')

def run_query_to_df(query:str) -> pd.DataFrame:
from airflow.hooks.postgres_hook import PostgresHook

Expand Down

0 comments on commit d574ce9

Please sign in to comment.