|
| 1 | +#scripts/ms_dd.py |
| 2 | +import os |
| 3 | +import pandas as pd |
| 4 | +import pytd |
| 5 | +import requests |
| 6 | + |
| 7 | +def main(**kwargs): |
| 8 | + # passed params |
| 9 | + td_api_key = os.getenv('TD_API_KEY') |
| 10 | + # params from config.yml |
| 11 | + td_api_ep = kwargs.get('td_api_ep') |
| 12 | + ms_config_ep = kwargs.get('ms_config_ep') |
| 13 | + ms_id = kwargs.get('ms_id') |
| 14 | + temp_db = kwargs.get('temp_db') |
| 15 | + temp_schema_tbl = kwargs.get('temp_schema_tbl') |
| 16 | + temp_ms_conf_tbl = kwargs.get('temp_ms_conf_tbl') |
| 17 | + # Init import from / export to TD table |
| 18 | + td = pytd.Client(apikey = td_api_key, |
| 19 | + endpoint = td_api_ep, |
| 20 | + database = temp_db, |
| 21 | + default_engine = 'presto') |
| 22 | + |
| 23 | + # Fetch master segment config |
| 24 | + url = ms_config_ep + str(ms_id) |
| 25 | + headers = {'Authorization': f'TD1 {td_api_key}'} |
| 26 | + res = requests.get(url, headers = headers) |
| 27 | + ms_conf = res.json() |
| 28 | + |
| 29 | + # Extract and store master segment's name |
| 30 | + ms_name = ms_conf.get('name', 'N/A') |
| 31 | + ms_conf_df = pd.DataFrame({'name': [ms_name]}) |
| 32 | + td.load_table_from_dataframe(ms_conf_df, f'{temp_db}.{temp_ms_conf_tbl}', writer='bulk_import', if_exists='overwrite') |
| 33 | + |
| 34 | + # Extract master table database and table |
| 35 | + master_db = ms_conf.get('master', {}).get('parentDatabaseName', None) |
| 36 | + master_tbl = ms_conf.get('master', {}).get('parentTableName', None) |
| 37 | + # Fetch master table schema |
| 38 | + # Describe schema: Column, Type, Extra, Comment |
| 39 | + master_res = td.query(f'DESCRIBE {master_db}.{master_tbl}') |
| 40 | + master_df = pd.DataFrame(**master_res) |
| 41 | + master_df = master_df.drop('Extra', axis=1) |
| 42 | + master_df['database'] = master_db |
| 43 | + master_df['table'] = master_tbl |
| 44 | + master_df['column_alias'] = '' |
| 45 | + # Send data to TD table |
| 46 | + td.load_table_from_dataframe(master_df, f'{temp_db}.{temp_schema_tbl}', writer='bulk_import', if_exists='overwrite') |
| 47 | + |
| 48 | + # Extract database, table, and columns for attribute tables |
| 49 | + # Unique db, tbl set |
| 50 | + attribute_tbls = { |
| 51 | + (attr_tbl['parentDatabaseName'], attr_tbl['parentTableName']) |
| 52 | + for attr_tbl in ms_conf.get('attributes', []) |
| 53 | + } |
| 54 | + # All db, table, column alias, column name list |
| 55 | + attribute_tbls_cols = [ |
| 56 | + (attr['parentDatabaseName'], attr['parentTableName'], attr['name'], attr['parentColumn']) |
| 57 | + for attr in ms_conf.get('attributes', []) |
| 58 | + ] |
| 59 | + # Build attribute table schema |
| 60 | + for attr_tbls_db, attr_tbls_tbl in attribute_tbls: |
| 61 | + attr_res = td.query(f'DESCRIBE {attr_tbls_db}.{attr_tbls_tbl}') |
| 62 | + attr_df = pd.DataFrame(**attr_res) |
| 63 | + attr_dict_list = [] |
| 64 | + for attr_db, attr_tbl, col_alias, col_name in attribute_tbls_cols: |
| 65 | + if attr_db == attr_tbls_db and attr_tbl == attr_tbls_tbl: |
| 66 | + attr_dict = { |
| 67 | + 'database': attr_db, |
| 68 | + 'table': attr_tbl, |
| 69 | + 'column_alias': col_alias, |
| 70 | + 'column': col_name, |
| 71 | + 'type': attr_df[attr_df['Column'] == col_name].iloc[0,1], |
| 72 | + 'comment': attr_df[attr_df['Column'] == col_name].iloc[0,3], |
| 73 | + } |
| 74 | + attr_dict_list.append(attr_dict) |
| 75 | + attr_df = pd.DataFrame(attr_dict_list) |
| 76 | + # Send data to TD table |
| 77 | + td.load_table_from_dataframe(attr_df, f'{temp_db}.{temp_schema_tbl}', writer='bulk_import', if_exists='append') |
| 78 | + |
| 79 | + # Extract database, table, and columns for behavior tables |
| 80 | + # Unique db, tbl set |
| 81 | + behavior_tbls = { |
| 82 | + (behav_tbl['parentDatabaseName'], behav_tbl['parentTableName']) |
| 83 | + for behav_tbl in ms_conf.get('behaviors', []) |
| 84 | + } |
| 85 | + # All db, table, column alias, column name list |
| 86 | + behavior_tbls_cols = [] |
| 87 | + for behav in ms_conf.get('behaviors', []): |
| 88 | + behav_db = behav['parentDatabaseName'] |
| 89 | + behav_tbl = behav['parentTableName'] |
| 90 | + if behav.get('allColumns', True): |
| 91 | + behav_res = td.query(f'DESCRIBE {behav_db}.{behav_tbl}') |
| 92 | + behav_df = pd.DataFrame(**behav_res) |
| 93 | + for row in behav_df.iterrows(): |
| 94 | + behavior_tbls_cols.append([behav_db, behav_tbl, '', row[1]['Column']]) |
| 95 | + else: |
| 96 | + behav_schema = [ |
| 97 | + (behav_db, behav_tbl, schema['name'], schema['parentColumn']) |
| 98 | + for schema in behav.get('schema', []) |
| 99 | + ] |
| 100 | + behavior_tbls_cols.extend(behav_schema) |
| 101 | + # Build behavior table schema |
| 102 | + for behav_tbls_db, behav_tbls_tbl in behavior_tbls: |
| 103 | + behav_res = td.query(f'DESCRIBE {behav_tbls_db}.{behav_tbls_tbl}') |
| 104 | + behav_df = pd.DataFrame(**behav_res) |
| 105 | + behav_dict_list = [] |
| 106 | + for behav_db, behav_tbl, col_alias, col_name in behavior_tbls_cols: |
| 107 | + if behav_db == behav_tbls_db and behav_tbl == behav_tbls_tbl: |
| 108 | + behav_dict = { |
| 109 | + 'database': behav_db, |
| 110 | + 'table': behav_tbl, |
| 111 | + 'column_alias': col_alias, |
| 112 | + 'column': col_name, |
| 113 | + 'type': behav_df[behav_df['Column'] == col_name].iloc[0,1], |
| 114 | + 'comment': behav_df[behav_df['Column'] == col_name].iloc[0,3], |
| 115 | + } |
| 116 | + behav_dict_list.append(behav_dict) |
| 117 | + behav_df = pd.DataFrame(behav_dict_list) |
| 118 | + # Send data to TD table |
| 119 | + td.load_table_from_dataframe(behav_df, f'{temp_db}.{temp_schema_tbl}', writer='bulk_import', if_exists='append') |
| 120 | + |
| 121 | +# Main |
| 122 | +if __name__ == "__main__": |
| 123 | + main() |
0 commit comments