Skip to content

Commit e25c692

Browse files
committed
add custom scripting file storage
1 parent fb57634 commit e25c692

File tree

4 files changed

+73
-0
lines changed

4 files changed

+73
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Custom scripting file storage
2+
3+
----
4+
## Overview
5+
6+
This project provides a solution to storing a file temporarily during custom script runtime in a TD Workflow.
7+
8+
----
9+
## Implementation
10+
1. Copy and paste the code into a custom script in Treasure Workflows.
11+
12+
----
13+
## Considerations
14+
15+
N/A
16+
17+
----
18+
## Questions
19+
20+
Please feel free to reach out to [email protected] with any questions you have about using this code.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
db: some_db
2+
in_tbl: input_tbl
3+
out_tbl: output_tbl
4+
api_endpoint: https://api.treasuredata.com
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
_export:
2+
!include : config/params.yaml
3+
td:
4+
engine: presto
5+
database: ${db}
6+
7+
+create_db_tbl_if_not_exist:
8+
td_ddl>:
9+
create_tables: [ "${out_tbl}" ]
10+
empty_tables: [ "${out_tbl}" ]
11+
12+
+store:
13+
py>: scripts.store.main
14+
_env:
15+
TD_API_KEY: ${secret:td.apikey}
16+
TD_API_ENDPOINT: ${api_endpoint}
17+
docker:
18+
image: "digdag/digdag-python:3.10"
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
import pandas as pd
3+
import pytd
4+
5+
def main(**kwargs):
6+
tdAPIkey = os.getenv("TD_API_KEY")
7+
tdAPIendpoint = os.getenv("TD_API_ENDPOINT")
8+
database = kwargs.get('db')
9+
in_table = kwargs.get('in_tbl')
10+
out_table = kwargs.get('out_tbl')
11+
csv_filename = 'temp.csv'
12+
13+
td = pytd.Client(apikey=tdAPIkey,
14+
endpoint=tdAPIendpoint,
15+
database=database,
16+
default_engine='presto')
17+
18+
res = td.query(f'SELECT * FROM {database}.{in_table}')
19+
df = pd.DataFrame(**res)
20+
print(df)
21+
df.to_csv(csv_filename, sep=',', index=False, encoding='utf-8')
22+
print("Script directory:", os.path.dirname(os.path.abspath(__file__)))
23+
print("Stored csv directory:", os.path.dirname(os.path.abspath(csv_filename)))
24+
out_df = pd.read_csv(csv_filename)
25+
print(df)
26+
27+
td.load_table_from_dataframe(out_df,f'{database}.{out_table}',writer='bulk_import',if_exists='overwrite')
28+
29+
# Main
30+
if __name__ == "__main__":
31+
main()

0 commit comments

Comments
 (0)