From e4e1125e8b4649a11eaf05c7c325f5a521f1fff9 Mon Sep 17 00:00:00 2001 From: Prachi Chavan Date: Wed, 18 Sep 2019 16:38:51 -0700 Subject: [PATCH] Add source level schema detection as a new box --- tool-box/schema-detection/README.md | 48 ++++++++++ tool-box/schema-detection/config-s3.yml | 15 +++ tool-box/schema-detection/runtime_linking.py | 91 +++++++++++++++++++ .../schema-detection/wf_dynamic_mapping.dig | 23 +++++ 4 files changed, 177 insertions(+) create mode 100644 tool-box/schema-detection/README.md create mode 100644 tool-box/schema-detection/config-s3.yml create mode 100644 tool-box/schema-detection/runtime_linking.py create mode 100644 tool-box/schema-detection/wf_dynamic_mapping.dig diff --git a/tool-box/schema-detection/README.md b/tool-box/schema-detection/README.md new file mode 100644 index 00000000..4da6da53 --- /dev/null +++ b/tool-box/schema-detection/README.md @@ -0,0 +1,48 @@ +# Source level schema detection +With evolution of cloud storage and every changing application handling schema changes is a critical part of any application. Ensuring that our customers can bring data of any shape, size, type on our Customer Data Platform is the most critical requirement for us. In this example, we have created a workflow which will handle schema change at the source at run time. + +# Getting started +1. Download the folder locally +2. Use [TD Toolbelt](https://support.treasuredata.com/hc/en-us/articles/360001262207) to upload this folder as a Treasure Workflow project into your Treasure data account +3. Provide a yaml file with input file and output database in treasure data information +4. Provide database name and apikey as a variable in .dig file +5. Run the wf_dynamic_mapping.dig file + +# How it works + +Here is the breif description what each tasks do in the workflow + +1. Config-s3.yml file is provided which contains all the input and output TD database storage information ( In this example AWS s3 bucket is our source ) +2. Database name and apikey information is provided as a variable into the .dig file +3. Custom python script runtime_linking.py is ran in the workflow in which config-s3.yml file is provided as a input +4. The "https://api.treasuredata.com/v3/bulk_loads/guess" post API request helps to create a guess file from the config file provided +5. The datatypes of all the columns in the guess file are converted to the string +6. When the workflow is ran for the first time the schema information is stored into a table at TD database , next time when the workflow runs it checks the number of columns present into the table and if there is a change then users are notified by a mail +7. To get the email notifications smtp connections must be set + +# Output + + Yaml file which needs to be provided + +[![config-file.png](https://i.postimg.cc/CdTVbJvb/config-file.png)](https://postimg.cc/BPgy05fv) + + Table into the treasure data platform + + [![Screen-Shot-2019-09-18-at-3-47-49-PM.png](https://i.postimg.cc/gjHq6xXz/Screen-Shot-2019-09-18-at-3-47-49-PM.png)](https://postimg.cc/XBXyR7km) + + Addition of extra column into the table + + [![Screen-Shot-2019-09-18-at-3-47-55-PM.png](https://i.postimg.cc/66M8TVT7/Screen-Shot-2019-09-18-at-3-47-55-PM.png)](https://postimg.cc/MXj6FQ7x) + + Email notification of any change into the schema + + [![Screen-Shot-2019-09-18-at-3-58-21-PM.png](https://i.postimg.cc/DwZHnTHJ/Screen-Shot-2019-09-18-at-3-58-21-PM.png)](https://postimg.cc/VdxDW3Ym) + + # Questions + + Please feel free to reach out to support@treasure-data.com with any questions you have about using this code for Fuzzy Matching + + + + + diff --git a/tool-box/schema-detection/config-s3.yml b/tool-box/schema-detection/config-s3.yml new file mode 100644 index 00000000..55f530f1 --- /dev/null +++ b/tool-box/schema-detection/config-s3.yml @@ -0,0 +1,15 @@ +config: + type: s3 + access_key_id: "your access_key_id" + secret_access_key: "your secret_access_key" + bucket: "your bucket name" + path_prefix: "your path_prefix" + path_match_pattern: .csv + incremental: false +out: + type: td_internal + apikey: apikey + endpoint: api.treasuredata.com + database: "your database name" + table: "your table name" + mode: append \ No newline at end of file diff --git a/tool-box/schema-detection/runtime_linking.py b/tool-box/schema-detection/runtime_linking.py new file mode 100644 index 00000000..0f333532 --- /dev/null +++ b/tool-box/schema-detection/runtime_linking.py @@ -0,0 +1,91 @@ +def use_df(): + import sys + import os + import digdag + sys.path.append('/home/td-user/.local/lib/python3.6/site-packages') + os.system(f"{sys.executable} -m pip install --user pandas") + os.system(f"{sys.executable} -m pip install --user -U git+https://github.com/treasure-data/pytd#egg=pytd[spark]") + os.system(f"{sys.executable} -m pip install --user time") + os.system(f"{sys.executable} -m pip install --user pyyaml") + os.system(f"{sys.executable} -m pip install --user json") + os.system(f"{sys.executable} -m pip install --user requests") + os.system(f"{sys.executable} -m pip install --user digdag") + import time + import yaml + import pandas as pd + import pytd + import tdclient + import json + import requests + + class InConnector: + apikey = os.environ['apikey'] + def __init__(self): + self.apikey = os.environ['apikey'] + + def convert_to_dict(self,seed_file): + with open(seed_file, 'r') as f: + self.payload = yaml.safe_load(f) + return self.payload + + def connector_guess(self, job): + + headers = {"content-type": "application/json; charset=utf-8","authorization":"TD1 "your apikey""} + payload = json.dumps(job).encode("utf-8") if isinstance(job, dict) else job + print(payload) + with requests.post("https://api.treasuredata.com/v3/bulk_loads/guess", payload, headers=headers) as res: + code,body = res.status_code,res.content + # print(body) + if code != 200: + print("DataConnector job preview failed", res, body) + return body.decode("utf-8") + + def change_to_string(self, body): + body = json.loads(body) + list_of_col = body['config']['in']['parser']['columns'] + for i in range(len(list_of_col)): + if list_of_col[i]['type'] != 'string': + list_of_col[i]['type'] = 'string' + try: + del list_of_col[i]['format'] + except: + continue + return (body) + + + def write_schema(self,body,payload): + list_of_col = body['config']['in']['parser']['columns'] + df = pd.DataFrame(list_of_col,columns = ['name','type']) + dest_table = payload['out']['table'] + try: + with tdclient.connect(db='demo_schema', type='presto', wait_callback='on_waiting',apikey=InConnector.apikey) as td: + val = pd.read_sql('select count(*) as cnt from {} where time in (select max(time) from {})'.format(dest_table,dest_table), td) + prev_col = val['cnt'][0] + if prev_col - len(df) !=0: + print('Schema Changed from last run') + digdag.env.store({'change_status': 'true'}) + client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name') + client.load_table_from_dataframe(df,dest_table, writer='bulk_import', if_exists='overwrite') + else: + print('No Change detected') + digdag.env.store(({'change_status': 'false'})) + client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name') + except: + client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name') + client.load_table_from_dataframe(df,dest_table, writer='bulk_import', if_exists='overwrite') + + + + + + test = InConnector() + a = test.convert_to_dict('config-s3.yml') + b = test.connector_guess(a) + c = test.change_to_string(b) + test.write_schema(c,a) + + + + + + diff --git a/tool-box/schema-detection/wf_dynamic_mapping.dig b/tool-box/schema-detection/wf_dynamic_mapping.dig new file mode 100644 index 00000000..012d1aa3 --- /dev/null +++ b/tool-box/schema-detection/wf_dynamic_mapping.dig @@ -0,0 +1,23 @@ +_export: + td: + database: demo_schema + table: change_status + from: "source email address" + +#Pass apikey as a secret ++secrets2: + docker: + image: "digdag/digdag-python:3.7.3-stretch" + py>: runtime_linking.use_df + _env: + apikey : ${secret:apikey} + + ++task1: + if>: ${this.change_status} + _do: + mail>: {data: "Hello, This is to notify you that the definition of the table you are about to insert has been changed"} + subject: Table definition has been changed + to: ["destination email address"] + + \ No newline at end of file