-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathml.py
190 lines (142 loc) · 6.39 KB
/
ml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#general
import io
# data
import numpy as np
import pandas as pd
# machine learning
import keras
# data visualization
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import seaborn as sns
import matplotlib.pyplot as plt
def headers(filename):
with open(filename, 'r') as f:
headers = f.readlines()[0].split(',')
for i in range(len(headers)):
headers[i] = headers[i].replace('\n', '')
print("Headers:", str(headers).replace("'", ""))
return headers
def make_plots(df, feature_names, label_name, model_output, sample_size=200):
num_rows = len(df)
if sample_size > num_rows:
print("Changed DataFrame size to correspond number of lines: {0} -> {1}".format(sample_size, num_rows))
sample_size = num_rows
random_sample = df.sample(n=sample_size).copy()
random_sample.reset_index()
weights, bias, epochs, rmse = model_output
is_2d_plot = len(feature_names) == 1
model_plot_type = "scatter" if is_2d_plot else "surface"
fig = make_subplots(rows=1, cols=2,
subplot_titles=("Loss Curve", "Model Plot"),
specs=[[{"type": "scatter"}, {"type": model_plot_type}]])
plot_data(random_sample, feature_names, label_name, fig)
plot_model(random_sample, feature_names, weights, bias, fig, label_name)
plot_loss_curve(epochs, rmse, fig)
fig.show()
return
def plot_loss_curve(epochs, rmse, fig):
curve = px.line(x=epochs, y=rmse)
curve.update_traces(line_color='#ff0000', line_width=3)
fig.append_trace(curve.data[0], row=1, col=1)
fig.update_xaxes(title_text="Epoch", row=1, col=1)
fig.update_yaxes(title_text="Root Mean Squared Error", row=1, col=1, range=[rmse.min()*0.8, rmse.max()])
return
def plot_data(df, features, label, fig):
if len(features) == 1:
scatter = px.scatter(df, x=features[0], y=label)
else:
scatter = px.scatter_3d(df, x=features[0], y=features[1], z=label)
fig.append_trace(scatter.data[0], row=1, col=2)
if len(features) == 1:
fig.update_xaxes(title_text=features[0], row=1, col=2)
fig.update_yaxes(title_text=label, row=1, col=2)
else:
fig.update_layout(scene1=dict(xaxis_title=features[0], yaxis_title=features[1], zaxis_title=label))
return
def plot_model(df, features, weights, bias, fig, label):
df[label.upper() + "_PREDICTED"] = bias[0]
for index, feature in enumerate(features):
df[label.upper() + "_PREDICTED"] = df[label.upper() + "_PREDICTED"] + weights[index][0] * df[feature]
if len(features) == 1:
model = px.line(df, x=features[0], y=label.upper() + "_PREDICTED")
model.update_traces(line_color='#ff0000', line_width=3)
else:
z_name, y_name = label.upper() + "_PREDICTED", features[1]
z = [df[z_name].min(), (df[z_name].max() - df[z_name].min()) / 2, df[z_name].max()]
y = [df[y_name].min(), (df[y_name].max() - df[y_name].min()) / 2, df[y_name].max()]
x = []
for i in range(len(y)):
x.append((z[i] - weights[1][0] * y[i] - bias[0]) / weights[0][0])
plane=pd.DataFrame({'x':x, 'y':y, 'z':[z] * 3})
light_yellow = [[0, '#89CFF0'], [1, '#FFDB58']]
model = go.Figure(data=go.Surface(x=plane['x'], y=plane['y'], z=plane['z'],
colorscale=light_yellow))
fig.add_trace(model.data[0], row=1, col=2)
return
def model_info(feature_names, label_name, model_output):
weights = model_output[0]
bias = model_output[1]
nl = "\n"
header = "-" * 80
banner = header + nl + "|" + "MODEL INFO".center(78) + "|" + nl + header
info = ""
equation = label_name + " = "
for index, feature in enumerate(feature_names):
info = info + "Weight for feature[{}]: {:.3f}\n".format(feature, weights[index][0])
equation = equation + "{:.3f} * {} + ".format(weights[index][0], feature)
info = info + "Bias: {:.3f}\n".format(bias[0])
equation = equation + "{:.3f}\n".format(bias[0])
return banner + nl + info + nl + equation
def build_model(my_learning_rate, num_features):
"""Create and compile a simple linear regression model."""
# Describe the topography of the model.
# The topography of a simple linear regression model
# is a single node in a single layer.
inputs = keras.Input(shape=(num_features,))
outputs = keras.layers.Dense(units=1)(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
# Compile the model topography into code that Keras can efficiently
# execute. Configure training to minimize the model's mean squared error.
model.compile(optimizer=keras.optimizers.RMSprop(learning_rate=my_learning_rate),
loss="mean_squared_error",
metrics=[keras.metrics.RootMeanSquaredError()])
return model
def train_model(model, df, features, label, epochs, batch_size):
"""Train the model by feeding it data."""
# Feed the model the feature and the label.
# The model will train for the specified number of epochs.
# input_x = df.iloc[:,1:3].values
# df[feature]
history = model.fit(x=features,
y=label,
batch_size=batch_size,
epochs=epochs)
# Gather the trained model's weight and bias.
trained_weight = model.get_weights()[0]
trained_bias = model.get_weights()[1]
# The list of epochs is stored separately from the rest of history.
epochs = history.epoch
# Isolate the error for each epoch.
hist = pd.DataFrame(history.history)
# To track the progression of training, we're going to take a snapshot
# of the model's root mean squared error at each epoch.
rmse = hist["root_mean_squared_error"]
return trained_weight, trained_bias, epochs, rmse
def run_experiment(df, feature_names, label_name, learning_rate, epochs, batch_size):
if type(feature_names)==str:
feature_names = [feature_names]
learning_rate = float(learning_rate)
print('INFO: starting training experiment with features={} and label={}\n'.format(feature_names, label_name))
num_features = len(feature_names)
features = df.loc[:, feature_names].values
label = df[label_name].values
model = build_model(learning_rate, num_features)
model_output = train_model(model, df, features, label, epochs, batch_size)
print('\nSUCCESS: training experiment complete\n')
print('{}'.format(model_info(feature_names, label_name, model_output)))
make_plots(df, feature_names, label_name, model_output)
return model
print("SUCCESS: defining plotting functions complete.")
print("SUCCESS: defining linear regression functions complete.\n\n")