-
Notifications
You must be signed in to change notification settings - Fork 0
/
SQL_Tools.py
512 lines (464 loc) · 23.6 KB
/
SQL_Tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
########################################################
# The following module dealls with SQL and all kinds of fits that are required when working with dataframes and SQL.
# It contains the following functions:
# InsertMissingFields - Compares a database table and a dataframe and does the following:
# 1. Add a field to the database table that appears in the dataframe and not in the database
# table.
# 2. If the capitalization of the dataframe is different than in the database then it changes
# the dataframe column
# to fit the database table capitalization.
# 3. Optional. It converts the dataframe columns type to fit the database table columns type.
# BuildSQLAndSend - Allow us to send 1 record to a database table with the following features:
# 1. If the record key already exists then it only UPDATEs the record. If not then it INSERT
# a new record.
# 2. If the record is existed and a table with the same name and the word archieve exists in the
# database, then it saves
# the existing record to the archieve table and only then it updates the record
# FindProc - show the process table of the database (processes from the same user)
# KillDBProc - kill a specific process from the database
# GetSQLasDict - Gets an SQL statement and returns the output as dictionary.
# FromDicToSqlUpdStatement - Gets a dictionary and returns an UPDATE Sql statement
# (also gets table name and where clause)
# OnlyNewRecords - Gets a dataframe and a table name and a key list and returns a dataframe with only the records
# that are in the dataframe but not in the table. (can be use for df.to_sql)
########################################################
# Imports
import pandas as pd
from pandas.api.types import is_string_dtype
from pandas.api.types import is_numeric_dtype
from pandas.api.types import is_datetime64_any_dtype
from pandas.api.types import is_bool_dtype
import datetime as dt
import pytz
import numpy as np
def InsertMissingFields(df, DB_tableName, connection, typeConverDic=None, adjustFieldType=True, errorType='coerce',
verbose=True):
"""
This function compares a dataframe to a database table and does the following:
1. If a column appears in the dataframe but not in the database table, it adds it to the
database table (but not vice versa).
2. If the same column exists in different capitalization in the dataframe ver. the database table, then
it changes the capitalization to fit the database table.
3. It can convert the dataframe column type to the database type. The conversion is only done by
finding if the column type is a string, date, or numeric. Change it to a
double/float type if a numeric is required. There is no conversion between double/float and an int or vice versa.
If the type is different from numeric/date/string, it does nothing.
The function returns a NEW dataframe with the converted columns
parameters:
:param df dataframe. The input dataframe.
:param DB_tableName string. The name of the database table
:param connection database connection
:param typeConverDic: dictionary. A dictionary that convert the database type to a category of: 'object' for
strings,'Numeric' for numeric types, 'bool' for boolean, 'Datetime' for dates.
If none are given then it uses the default dictionary based on Mysql types.
:param adjustFieldType bool. If True (default) it adjust the datafram column to fit the database table column types
It support the following conversions (DB: database type, DF: dataframe type):
(DB: string, DF: not string), (DB: Datetime, DF: string), (DB: Numeric, DF: string)
, (DB: bool, DF: string), (DB: Numeric, DF: bool)
:param errorType bool. In case the conversion is not working what should be done.
Can get one of the following: {‘ignore’, ‘raise’, ‘coerce’}
If ‘raise’, then invalid parsing will raise an exception.
If ‘coerce’, then invalid parsing will be set as NaN.
If ‘ignore’, then invalid parsing will return the input.
:param verbose bool. If True (default) it prints the changes that were done to the database table or
the dataframe
Returns a copy of the dataframe after changes
"""
df2 = df.copy()
ConvTypeDic = typeConverDic
if ConvTypeDic is None:
ConvTypeDic = {'varchar': 'object', 'char': 'object', 'varbinary': 'object', 'tinytext': 'object',
'text': 'object',
'mediumtext': 'object', 'longtext': 'object', 'tinyint': 'Numeric', 'smallint': 'Numeric',
'mediumint': 'Numeric',
'int': 'Numeric', 'integer': 'Numeric', 'bigint': 'Numeric', 'float': 'Numeric',
'double': 'Numeric',
'double precision': 'Numeric', 'decimal': 'Numeric', 'dec': 'Numeric', 'year': 'Numeric',
'date': 'Datetime',
'datetime': 'Datetime', 'time': 'Datetime'}
DfCol = list(df2.columns)
DfCol_lower = [x.lower() for x in DfCol]
SQL = 'SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = "' + DB_tableName + '"'
DBtable = pd.read_sql(SQL, connection)
TableColumns = DBtable.COLUMN_NAME.unique()
TableColumnsLower = [x.lower() for x in TableColumns]
currCursor = connection.cursor()
for col in DfCol_lower:
# Find the original name before lower case
Colindx = [i for i, x in enumerate(DfCol_lower) if x == col][0]
OriginalColName = DfCol[Colindx]
# Find the generic type of the column
GenerType, DBType = __findGenericType(df2[OriginalColName])
# Check if the column exists in the table
if col not in TableColumnsLower:
SQL = 'ALTER TABLE ' + str(DB_tableName) + ' ADD `' + str(OriginalColName) + '` ' + str(DBType)
currCursor.execute(SQL)
if verbose:
print('Column: "' + str(OriginalColName) + '" added to table: ' + str(DB_tableName))
else:
# In case the same column appears in different capitalization in dataframe ver. the DB table
# Then convert the dataframe name to fit the table name
if OriginalColName not in TableColumns:
DBColindx = [i for i, x in enumerate(TableColumnsLower) if x == col][0]
DBOriginalColName = TableColumns[DBColindx]
df2 = df2.rename({OriginalColName: DBOriginalColName}, axis=1)
if verbose:
print('Dataframe column: "' + str(OriginalColName) + '" changed to "' + str(DBOriginalColName) +
'" to fit database table column name')
OriginalColName = DBOriginalColName
# column exist. If asked adjust the type
if adjustFieldType:
colType = DBtable[DBtable['COLUMN_NAME'].str.lower() == col]['DATA_TYPE'].iloc[0]
DbGenerType = __FindTablColGenType(col, colType, ConvTypeDic, DB_tableName)
if DbGenerType == 'Other':
continue
if DbGenerType == 'object' and GenerType != 'object':
try:
df2[OriginalColName] = df2[OriginalColName].astype(str)
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a string type as in database table.')
except Exception as e:
print('The conversion of column: "' + str(OriginalColName) +
'" to string type, in the dataframe, did not succeed. The following error generated:'
+ str(e))
elif DbGenerType == 'Datetime' and GenerType == 'object':
try:
df2[OriginalColName] = pd.to_datetime(df2[OriginalColName], errors=errorType)
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a date type as in database table.')
except Exception as e:
print('The conversion of column: "' + str(OriginalColName) +
'" to date type, in the dataframe, did not succeed. The following error generated:'
+ str(e))
elif DbGenerType == 'Numeric' and GenerType == 'object':
try:
df2[OriginalColName] = pd.to_numeric(df2[OriginalColName], errors=errorType)
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a numeric type as in database table.')
except Exception as e:
print('The conversion of column: "' + str(OriginalColName) +
'" to numeric type, in the dataframe, did not succeed. The following error generated:' +
str(e))
elif DbGenerType == 'bool' and GenerType == 'object':
try:
df2[OriginalColName] = df2[OriginalColName].astype(bool)
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a bool type as in database table.')
except Exception as e:
print('The conversion of column: "' + str(OriginalColName) +
'" to bool type, in the dataframe, did not succeed. The following error generated:' +
str(e))
elif DbGenerType == 'bool' and GenerType == 'Numeric':
# Gets True if the numeric value is not zero and False if Zero
df2[OriginalColName] = df2[OriginalColName] != 0
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a bool type as in database table.')
elif DbGenerType == 'Numeric' and GenerType == 'bool':
df2[OriginalColName] = np.where(df2[OriginalColName], 1, 0)
if verbose:
print('Column: "' + str(
OriginalColName) + '" in dataframe converted to a numeric type as in database table.')
else:
continue
currCursor.close()
return df2
def __findGenericType(PandasSer):
GenerType = 'Other'
DBType = 'text'
if is_string_dtype(PandasSer):
GenerType = 'object'
DBType = 'text'
elif is_bool_dtype(PandasSer):
GenerType = 'bool'
DBType = 'bool'
elif is_numeric_dtype(PandasSer):
GenerType = 'Numeric'
DBType = 'double'
elif is_datetime64_any_dtype(PandasSer):
GenerType = 'Datetime'
DBType = 'Datetime'
return GenerType, DBType
def __FindTablColGenType(col, colType, ConvTypeDic, DB_tableName):
if colType not in ConvTypeDic.keys():
print('Column: ' + str(col) + ' in the database table: ' + str(DB_tableName) + ' has a type ' +
str(colType) + ' that is not supported.')
return 'Other'
else:
return ConvTypeDic[colType]
#### Use the following 2 functions to read processes on the database (originaly made for Mysql) and kill them
def FindProc(conn):
"""
Find running processes from the current user. The ID allows us to use kill to remove the process
conn - engine connection to the database
returns dataframe with the processes
"""
SQL = 'SELECT * FROM information_schema.processlist ORDER BY time DESC'
return pd.read_sql(SQL, conn)
def KillDBProc(ProcID, conn, showProcList=True):
"""
Get a process ID and a connection and kill the process in the database
If it gets a list it will do the list
If it gets None then it shows the list of processes
ProcID list or number that is the process id
conn database connection
showProcList bool. If True then shows the remaining processes
"""
if ProcID is None:
return FindProc(conn)
crsr = conn.cursor()
ProcIDLst = []
if not isinstance(ProcID, list):
ProcIDLst.append(ProcID)
else:
ProcIDLst=ProcID
for elem in ProcIDLst:
crsr.execute('kill ' + str(elem))
crsr.close()
if showProcList:
return FindProc(conn)
################ Get table from database and change it to dictionary #############
def GetSQLasDict(SQL, conn):
"""
Gets an SQL statement and returns the output as dictionary. Any rowin the table is a dictionary
In each dictionary(row) contains header names as keys and values as dictionary values
SQL - string The SQL string
conn - DB connection
Example:
GetSQLasDict('select * from YourTable',conn)
"""
Result = conn.execute(SQL)
d, a = {}, []
for rowproxy in Result:
# rowproxy.items() returns an array like [(key0, value0), (key1, value1)]
for column, value in rowproxy.items():
# build up the dictionary
d = {**d, **{column: value}}
a.append(d)
return a
##################################################################################################################
# The following functions are used for sending SQL messages to update dada tables in a database.
# It checks if the record exist by checking if a record with the same keys exists.
# If there is such a record it uses the SQL Update statement to update the record.
# If not it uses the INSERT INTO statement.
# It also copies the record to an archive table (with name: original table name + "_archive"). The archive table
# should contain the same columns + "ArchiveDate" column as Datetime type.
# Use this function to send SQL: BuildSQLAndSend
#
# Example of using this module:
# Table2Upd = 'clients_devices'
# Rec = {'idDevice':DeviceID,'idClients':1,'idSite':1,'comment':'Welcome new Device','Start_date':Now}
# KeyRec={'idDevice':DeviceID}
# sql=BuildSQLAndSend(Rec,KeyRec,Table2Upd,connection)
# The Rec dictionary uses keys as the table columns and the values are the values we want to enter the record.
# The KeyRec dictionary is the dictionary that contains keys of the primary keys in the table. (the algorithm will
# use KeyRec dictionary to look if the record is already exists in the database.)
##################################################################################################################
def CrtUpdateSQL(RecDic, KeyDic, TblName):
"""
Creates an UPDATE Sql statement that gets the record in RecDic, the table
name and the keyDic for the WHERE clause and creates the statement.
Inputs:
TblName str. table name
RecDic dict. Dictionary where the keys are the headers in the table and
the values are the values of the record.
Return: str. An SQL statement
"""
sql = "UPDATE " + TblName + " SET "
# Update the SET paragraph
valuesStr = ''
for i in RecDic.keys():
key = str(i)
PreVal = RecDic[i]
if isinstance(PreVal, str) or isinstance(PreVal, dt.datetime):
valuesStr = valuesStr + key + ' = "' + str(PreVal) + '", '
else:
valuesStr = valuesStr + key + ' = ' + str(PreVal) + ', '
valuesStr = valuesStr[0:-2]
# Update the where paragraph
WhereStr = ''
for i in KeyDic.keys():
key = str(i)
PreVal = KeyDic[i]
if isinstance(PreVal, str) or isinstance(PreVal, dt.datetime):
WhereStr = WhereStr + key + ' = "' + str(PreVal) + '" and '
else:
WhereStr = WhereStr + key + ' = ' + str(PreVal) + ' and '
WhereStr = WhereStr[0:-4]
sql = sql + valuesStr + " WHERE " + WhereStr
return sql
def CrtInsertSQL(RecDic, TblName):
"""
Creates an INSERT INTO Sql statement that gets the record in RecDic and
the table name and creates the statement.
Inputs:
TblName str. table name
RecDic dict. Dictionary where the keys are the headers in the table and
the values are the values of the record.
Return str. An SQL statement
"""
sql = "INSERT INTO " + TblName + " ("
Keystr = ''
for i in RecDic.keys():
Keystr = Keystr + str(i) + ', '
Keystr = Keystr[0:-2]
sql = sql + Keystr + ") VALUES ("
# Update the SET paragraph
valuesStr = ''
for i in RecDic.keys():
PreVal = RecDic[i]
if isinstance(PreVal, str) or isinstance(PreVal, dt.datetime):
valuesStr = valuesStr + '"' + str(PreVal) + '", '
else:
valuesStr = valuesStr + str(PreVal) + ', '
valuesStr = valuesStr[0:-2]
sql = sql + valuesStr + ") "
return sql
# Checks if a record exist based on key parameters
def CheckIfExists(keyDic, TableName, connection, Debug=False):
"""
This function gets a keyDic and a table name and find out if there is
a record in the database that contains this key.
Input:
keyDic Dict. Contains the headers and the values of a table
as keys and values in a dictionary.
The keys are checked agaianst the DB.
TableName str. The name of the table to check
connection database conncetion.
Debug bool. If True then prints all SQL statements
Return: bool. True if the record exists in the table, False if not
"""
cursor = connection.cursor()
valuesStr = ''
for i in keyDic.keys():
key = str(i)
PreVal = keyDic[i]
if isinstance(PreVal, str) or isinstance(PreVal, dt.datetime):
valuesStr = valuesStr + key + ' = "' + str(PreVal) + '" and '
else:
valuesStr = valuesStr + key + ' = ' + str(PreVal) + ' and '
valuesStr = valuesStr[0:-4]
sql = 'SELECT * FROM ' + TableName + ' WHERE ' + valuesStr
if Debug:
print('CheckIfExists:\n' + sql)
cursor.execute(sql)
records = cursor.fetchall()
# check if the cursor is from pymysql using sqlalchemy
if str(type(cursor)) == "<class 'pymysql.cursors.Cursor'>":
Desc = cursor.description
header = []
for i in range(0, len(Desc)):
header.append(Desc[i][0])
else:
header = cursor.column_names
if len(records) > 0:
rec = dict(zip(header, records[0]))
for key in rec.keys():
if rec[key] is None:
rec[key] = 'null'
if Debug:
print('Rec after zip:\n' + str(rec))
return True, rec
else:
return False, 0
def sendSQL(SQL, connection):
"""
Gets an SQL atatement and a connection object and send it to the DB.
Inputs:
SQL str. SQL atatement
connection "mysql.connector object" for connecting to DB
"""
cursor = connection.cursor()
cursor.execute(SQL)
connection.commit()
def ArchiveRecord(Record, Archive_table, connection, Debug=False, DefaultTimeZone='Israel'):
"""
Take the new record and insert it to an archive table with ArchiveDate.
The archive table should contain all fields + "ArchiveDate" that contains the date of the update
:param Record dict. A dictionary that contains the header and the value for each record
:param Archive_table: string. The name of the archive table
:param connection: the connection to the database
:param Debug: bool. If True then it prints the SQL statements. Helps when trying to debug
:param DefaultTimeZone: string. To add the updated time we need the Timezone
:return:
"""
Rec = Record.copy()
Rec['ArchiveDate'] = str(dt.datetime.now(pytz.timezone(DefaultTimeZone)).strftime("%Y-%m-%d %H:%M:%S"))
if Debug:
print('Archive record:\n' + str(Rec))
SQL = CrtInsertSQL(Rec, Archive_table)
if Debug:
print('Archive SQL:\n' + SQL)
sendSQL(SQL, connection)
def BuildSQLAndSend(Rec, KeyRec, Table2Upd, connection, Archive_table=True, Debug=False, DefaultTimeZone='Israel'):
"""
Gets the record in dictionary format, the key record, the table name and the cursor
Update the record in the database, if the record exists it overwrite the given fields
param: Rec dict. A dictionary that contains the header and the value for each record
param: KeyRec dict. Dictionary that contains only the keys of the table header:value
param: Table2Upd string. The name of the table to update
param: connection database connection
param: Archive_table bool. If true then it send the old record to the
archive table which has the name: Table2Upd + "_archive"
param: Debug bool. If true then print all the SQL statements. False print nothing
param: DefaultTimeZone string. The timezone used to calculate now() only for the archive date
return the SQL as a string
"""
OldRecExist, OldRec = CheckIfExists(KeyRec, Table2Upd, connection, Debug)
if OldRecExist:
if Archive_table:
# send the same record to the archive table
ArchiveRecord(Rec, Table2Upd + '_archive', connection, Debug, DefaultTimeZone)
SQL = CrtUpdateSQL(Rec, KeyRec, Table2Upd)
else:
SQL = CrtInsertSQL(Rec, Table2Upd)
if Debug:
print('Main SQL:\n' + SQL)
sendSQL(SQL, connection)
print('SQL sent')
return SQL
def FromDicToSqlUpdStatement(TblName, Dic, WhereClause=None):
"""
Creates an SQL update statement from a dictionary
:param TblName: str. The name of the table in the DB
:param Dic: dict. The dictionary containing the fields and values to update
:param WhereClause: str. The where clause of the update statement (DON'T include "where")
:return: str. The SQL update statement
"""
SQL = "UPDATE " + TblName + " SET "
for key, value in Dic.items():
if isinstance(value, str):
SQL += str(key) + " = " + '"' + str(value) + '"' + ", "
else:
SQL += str(key) + " = " + str(value) + ", "
SQL = SQL[:-2]
if WhereClause is not None:
SQL += " WHERE " + WhereClause
return SQL
def OnlyNewRecords(df,TableName,connection, KeyFields2Check,AdjustDF2TablTypes=True):
"""
Gets a dataframe and a table name and returns only the records that are not in the table
:param AdjustDF2TablTypes bool. If True then the dataframe column types will adjust to the table column types
If there are fields in the dataframe and not in the table it will add them
to the table.
:param df: dataframe. The input dataframe
:param TableName: string. The name of the table to check
:param connection: database connection
:param KeyFields2Check: list. The list of fields to check if the record is in the table
:return: dataframe. The records that are not in the table
"""
df2=df
if AdjustDF2TablTypes:
df2=InsertMissingFields(df2,TableName,connection)
SQL = 'SELECT * FROM ' + TableName
TempDF = pd.read_sql(SQL, connection)
if len(TempDF) > 0:
df2 = df2.merge(TempDF, how='left', on=KeyFields2Check, indicator=True,suffixes=('','_y'))
df2 = df2[df2['_merge'] == 'left_only']
df2 = df2.drop(columns=['_merge'])
df2 = df2.loc[:, ~df2.columns.str.endswith('_y')]
return df2