Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source level schema detection as a new box #213

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions tool-box/schema-detection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Source level schema detection
With evolution of cloud storage and every changing application handling schema changes is a critical part of any application. Ensuring that our customers can bring data of any shape, size, type on our Customer Data Platform is the most critical requirement for us. In this example, we have created a workflow which will handle schema change at the source at run time.

# Getting started
1. Download the folder locally
2. Use [TD Toolbelt](https://support.treasuredata.com/hc/en-us/articles/360001262207) to upload this folder as a Treasure Workflow project into your Treasure data account
3. Provide a yaml file with input file and output database in treasure data information
4. Provide database name and apikey as a variable in .dig file
5. Run the wf_dynamic_mapping.dig file

# How it works

Here is the breif description what each tasks do in the workflow

1. Config-s3.yml file is provided which contains all the input and output TD database storage information ( In this example AWS s3 bucket is our source )
2. Database name and apikey information is provided as a variable into the .dig file
3. Custom python script runtime_linking.py is ran in the workflow in which config-s3.yml file is provided as a input
4. The "https://api.treasuredata.com/v3/bulk_loads/guess" post API request helps to create a guess file from the config file provided
5. The datatypes of all the columns in the guess file are converted to the string
6. When the workflow is ran for the first time the schema information is stored into a table at TD database , next time when the workflow runs it checks the number of columns present into the table and if there is a change then users are notified by a mail
7. To get the email notifications smtp connections must be set

# Output

Yaml file which needs to be provided

[![config-file.png](https://i.postimg.cc/CdTVbJvb/config-file.png)](https://postimg.cc/BPgy05fv)

Table into the treasure data platform

[![Screen-Shot-2019-09-18-at-3-47-49-PM.png](https://i.postimg.cc/gjHq6xXz/Screen-Shot-2019-09-18-at-3-47-49-PM.png)](https://postimg.cc/XBXyR7km)

Addition of extra column into the table

[![Screen-Shot-2019-09-18-at-3-47-55-PM.png](https://i.postimg.cc/66M8TVT7/Screen-Shot-2019-09-18-at-3-47-55-PM.png)](https://postimg.cc/MXj6FQ7x)

Email notification of any change into the schema

[![Screen-Shot-2019-09-18-at-3-58-21-PM.png](https://i.postimg.cc/DwZHnTHJ/Screen-Shot-2019-09-18-at-3-58-21-PM.png)](https://postimg.cc/VdxDW3Ym)

# Questions

Please feel free to reach out to [email protected] with any questions you have about using this code for Fuzzy Matching





15 changes: 15 additions & 0 deletions tool-box/schema-detection/config-s3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
config:
type: s3
access_key_id: "your access_key_id"
secret_access_key: "your secret_access_key"
bucket: "your bucket name"
path_prefix: "your path_prefix"
path_match_pattern: .csv
incremental: false
out:
type: td_internal
apikey: apikey
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to parameterize.

endpoint: api.treasuredata.com
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to parameterize.

database: "your database name"
table: "your table name"
mode: append
91 changes: 91 additions & 0 deletions tool-box/schema-detection/runtime_linking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
def use_df():
import sys
import os
import digdag
sys.path.append('/home/td-user/.local/lib/python3.6/site-packages')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems strange. It is for local, not server side one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have this line for the latest image digdag/digdag-python:3.7

os.system(f"{sys.executable} -m pip install --user pandas")
os.system(f"{sys.executable} -m pip install --user -U git+https://github.com/treasure-data/pytd#egg=pytd[spark]")
os.system(f"{sys.executable} -m pip install --user time")
os.system(f"{sys.executable} -m pip install --user pyyaml")
os.system(f"{sys.executable} -m pip install --user json")
os.system(f"{sys.executable} -m pip install --user requests")
os.system(f"{sys.executable} -m pip install --user digdag")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this pip command. Instead of it, please add
import digdag

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With digdag/digdag-python:3.7, you don't need to use --user option anymore.

The following packages are installed:

  • pandas
  • requests
  • pytd (old version, so would be better to use pip install

also should remove the following packages:

  • json (Python standard module)
  • time (Python standard module)
  • digdag (not on PyPI)

You should write like the following:

os.system(f"{sys.executable} -m pip install -U pytd==0.8.0")
os.system(f"{sys.executable} -m pip install pyyaml")

import time
import yaml
import pandas as pd
import pytd
import tdclient
import json
import requests

class InConnector:
apikey = os.environ['apikey']
def __init__(self):
self.apikey = os.environ['apikey']

def convert_to_dict(self,seed_file):
with open(seed_file, 'r') as f:
self.payload = yaml.safe_load(f)
return self.payload

def connector_guess(self, job):

headers = {"content-type": "application/json; charset=utf-8","authorization":"TD1 "your apikey""}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use apikey variable instead of writing apikey directly.

payload = json.dumps(job).encode("utf-8") if isinstance(job, dict) else job
print(payload)
with requests.post("https://api.treasuredata.com/v3/bulk_loads/guess", payload, headers=headers) as res:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endpoint shouldn't be static. Please allow users to configure other endpoints.

code,body = res.status_code,res.content
# print(body)
if code != 200:
print("DataConnector job preview failed", res, body)
return body.decode("utf-8")

def change_to_string(self, body):
body = json.loads(body)
list_of_col = body['config']['in']['parser']['columns']
for i in range(len(list_of_col)):
if list_of_col[i]['type'] != 'string':
list_of_col[i]['type'] = 'string'
try:
del list_of_col[i]['format']
except:
continue
return (body)


def write_schema(self,body,payload):
list_of_col = body['config']['in']['parser']['columns']
df = pd.DataFrame(list_of_col,columns = ['name','type'])
dest_table = payload['out']['table']
try:
with tdclient.connect(db='demo_schema', type='presto', wait_callback='on_waiting',apikey=InConnector.apikey) as td:
val = pd.read_sql('select count(*) as cnt from {} where time in (select max(time) from {})'.format(dest_table,dest_table), td)
prev_col = val['cnt'][0]
if prev_col - len(df) !=0:
print('Schema Changed from last run')
digdag.env.store({'change_status': 'true'})
client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name')
client.load_table_from_dataframe(df,dest_table, writer='bulk_import', if_exists='overwrite')
else:
print('No Change detected')
digdag.env.store(({'change_status': 'false'}))
client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name')
except:
client = pytd.Client(apikey=InConnector.apikey, endpoint='https://api.treasuredata.com/', database='database_name')
client.load_table_from_dataframe(df,dest_table, writer='bulk_import', if_exists='overwrite')





test = InConnector()
a = test.convert_to_dict('config-s3.yml')
b = test.connector_guess(a)
c = test.change_to_string(b)
test.write_schema(c,a)






23 changes: 23 additions & 0 deletions tool-box/schema-detection/wf_dynamic_mapping.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
_export:
td:
database: demo_schema
table: change_status
from: "source email address"

#Pass apikey as a secret
+secrets2:
docker:
image: "digdag/digdag-python:3.7.3-stretch"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use the latest image:

Suggested change
image: "digdag/digdag-python:3.7.3-stretch"
image: "digdag/digdag-python:3.7"

py>: runtime_linking.use_df
_env:
apikey : ${secret:apikey}


+task1:
if>: ${this.change_status}
_do:
mail>: {data: "Hello, This is to notify you that the definition of the table you are about to insert has been changed"}
subject: Table definition has been changed
to: ["destination email address"]