-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathyaml_to_sql.py
162 lines (137 loc) · 6.01 KB
/
yaml_to_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import yaml
TYPES_MAPPING = {
"int": {"sql_type": "INT"},
"float": {"sql_type": "DOUBLE"},
"str": {"sql_type": "VARCHAR", "default_size": 255},
"fixedstr": {"sql_type": "CHAR", "default_size": 5},
"intdate": {"sql_type": "INT"},
"geocode": {"sql_type": "VARCHAR", "default_size": 32},
"bool": {"sql_type": "TINYINT", "default_size": 1},
}
NEWLINE = "\n\t"
class YamlReader:
def __init__(self, schemadef_yaml_path):
self.schemadef_yaml_path = schemadef_yaml_path
self.tables_info = self.yaml_to_json()
def yaml_to_json(self):
with open(self.schemadef_yaml_path, "r") as f:
try:
return yaml.safe_load(f)
except yaml.YAMLError as exc:
print(exc)
def get_table_info(self, entry_name):
table_info = self.tables_info.get(entry_name)
return table_info
def get_table_key_columns(self, table_info):
key_columns = table_info["KEY_COLS"]
return key_columns
def get_table_ordered_csv_columns(self, table_info):
ordered_csv_columns = table_info["ORDERED_CSV_COLUMNS"]
return ordered_csv_columns
def get_table_unique_indexes(self, table_info):
unique_indexes = table_info["UNIQUE_INDEXES"]
return unique_indexes
def get_table_indexes(self, table_info):
indexes = table_info["INDEXES"]
return indexes
def get_table_name(self, table_info):
table_name = table_info["TABLE_NAME"]
return table_name
def generate_create_table_statement(self, entry_name):
table_info = self.get_table_info(entry_name)
columns = []
for column in self.get_table_ordered_csv_columns(table_info):
column_name = column[1] if column[2] is None else column[2]
column_type_split = column[0].split(":")
column_type, column_length = column_type_split[0], column_type_split[1] if len(column_type_split) == 2 else None
sql_column_type = TYPES_MAPPING[column_type]["sql_type"]
if column_length:
sql_column_type += f"({column_length})"
else:
if TYPES_MAPPING[column_type].get("default_size"):
sql_column_type += f"({TYPES_MAPPING[column_type]['default_size']})"
else:
pass
not_null = "NOT NULL" if column_name in self.get_table_key_columns(table_info) else ""
columns.append(f"`{column_name}` {sql_column_type} {not_null}".strip())
unique_keys = []
for k, v in table_info["UNIQUE_INDEXES"].items():
cols = ", ".join([f"`{col}`" for col in v])
unique_keys.append(f"UNIQUE KEY `{k}` ({cols})")
keys = []
for k, v in table_info["INDEXES"].items():
cols = ", ".join([f"`{col}`" for col in v])
keys.append(f"KEY `{k}` ({cols})")
sql_statement = f"""
CREATE TABLE `{self.get_table_name(table_info)}` (
`id` INT NOT NULL AUTO_INCREMENT,
`issue` INT NOT NULL,
{f",{NEWLINE}".join([column for column in columns])},
PRIMARY KEY (`id`),
{f",{NEWLINE}".join([u for u in unique_keys])},
{f",{NEWLINE}".join([k for k in keys])}
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
return sql_statement
def generate_create_metadata_table_statement(self, entry_name):
table_info = self.get_table_info(entry_name)
aggregated_key_cols = [
column
for column in table_info["ORDERED_CSV_COLUMNS"]
if column[1] in table_info["AGGREGATE_KEY_COLS"]
]
columns = []
for column in aggregated_key_cols:
column_name = column[1] if column[2] is None else column[2]
column_type_split = column[0].split(":")
column_type, column_length = column_type_split[0], column_type_split[1] if len(column_type_split) == 2 else None
sql_column_type = TYPES_MAPPING[column_type]["sql_type"]
if column_length:
sql_column_type += f"({column_length})"
else:
if TYPES_MAPPING[column_type].get("default_size"):
column_type += f"({TYPES_MAPPING[column_type]['default_size']})"
else:
pass
not_null = "NOT NULL" if column_name in self.get_table_key_columns(table_info) else ""
columns.append(f"`{column_name}` {column_type} {not_null}".strip())
unique_keys = []
for k, v in table_info["UNIQUE_INDEXES"].items():
if k not in table_info["AGGREGATE_KEY_COLS"]:
continue
cols = []
for col in v:
if col in table_info["AGGREGATE_KEY_COLS"]:
cols.append(col)
cols = ", ".join([f"`{col}`" for col in cols])
unique_keys.append(f"UNIQUE KEY `{k}` ({cols})")
keys = []
for k, v in table_info["INDEXES"].items():
if k not in table_info["AGGREGATE_KEY_COLS"]:
continue
cols = []
for col in v:
if col in table_info["AGGREGATE_KEY_COLS"]:
cols.append(col)
cols = ", ".join([f"`{col}`" for col in cols])
keys.append(f"KEY `{k}` ({cols})")
sql_statement = f"""
CREATE TABLE `{self.get_table_name(table_info)}` (
`id` INT NOT NULL AUTO_INCREMENT,
{f",{NEWLINE}".join([column for column in columns])},
PRIMARY KEY (`id`),
{f",{NEWLINE}".join([u for u in unique_keys])},
{f",{NEWLINE}".join([k for k in keys])}
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
return sql_statement
def main():
yaml_reader = YamlReader("covid_hosp_schemadefs.yaml")
ddl = yaml_reader.generate_create_table_statement("covid_hosp_facility")
print(ddl)
metadata = yaml_reader.generate_create_metadata_table_statement("covid_hosp_facility")
print("*" * 20)
print(metadata)
# TODO: generate covid_hosp_facility_key
if __name__ == "__main__":
main()