-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathnalu_implementation.py
187 lines (129 loc) · 5.62 KB
/
nalu_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import numpy as np
import tensorflow as tf
import keras
import matplotlib.pyplot as plt
# Define a Neural Accumulator (NAC) for addition/subtraction -> Useful to learn the addition/subtraction operation
def nac_simple_single_layer(x_in, out_units):
'''
Define a Neural Accumulator (NAC) for addition/subtraction -> Useful to learn the addition/subtraction operation
Attributes:
x_in -> Input vector
out_units -> number of output neurons
Return:
Output tensor of mentioned shsape and associated weights
'''
in_features = x_in.shape[1]
# define W_hat and M_hat
W_hat = tf.get_variable(name = "W_hat", initializer=tf.initializers.random_uniform(minval=-2, maxval=2),shape=[in_features, out_units], trainable=True)
M_hat = tf.get_variable(name = "M_hat", initializer=tf.initializers.random_uniform(minval=-2, maxval=2), shape=[in_features, out_units], trainable=True)
# Get W
W = tf.nn.tanh(W_hat) * tf.nn.sigmoid(M_hat)
y_out = tf.matmul(x_in,W)
return y_out,W
# define a complex nac in log space -> for more complex arithmetic functions such as
# multiplication, division and power
def nac_complex_single_layer(x_in, out_units, epsilon = 0.000001):
'''
:param x_in: input feature vector
:param out_units: number of output units of the cell
:param epsilon: small value to avoid log(0) in the output result
:return: associated weight matrix and output tensor
'''
in_shape = x_in.shape[1]
W_hat = tf.get_variable(shape=[in_shape, out_units],
initializer= tf.initializers.random_uniform(minval=-2, maxval=2),
trainable=True, name="W_hat2")
M_hat = tf.get_variable(shape=[in_shape, out_units],
initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
trainable=True, name="M_hat2")
W = tf.nn.tanh(W_hat) * tf.nn.sigmoid(M_hat)
# Express Input feature in log space to learn complex functions
x_modified = tf.log(tf.abs(x_in) + epsilon)
m = tf.exp( tf.matmul(x_modified, W) )
return m, W
# Define a NALU having combination of NAC1 and NAC2
def nalu(x_in, out_units, epsilon=0.000001, get_weights=False):
'''
:param x_in: input feature vector
:param out_units: number of output units of the cell
:param epsilon: small value to avoid log(0) in the output result
:param get_weights: True if want to get the weights of the model
in return
:return: output tensor
:return: Gate weight matrix
:return: NAC1 (simple NAC) weight matrix
:return: NAC2 (complex NAC) weight matrix
'''
in_shape = x_in.shape[1]
# Get output of NAC1
a, W_simple = nac_simple_single_layer(x_in, out_units)
# Get output of NAC2
m, W_complex = nac_complex_single_layer(x_in, out_units, epsilon= epsilon)
# Gate signal layer
G = tf.get_variable(initializer=tf.random_normal_initializer(stddev=1.0),
shape=[in_shape, out_units], name="Gate_weights")
g = tf.nn.sigmoid( tf.matmul(x_in, G) )
y_out = g * a + (1 - g) * m
if(get_weights):
return y_out, G, W_simple, W_complex
else:
return y_out
# Test the Network by learning the adition
# Generate a series of input number X1,X2 and X3 for training
x1 = np.arange(1000,11000, step=5, dtype= np.float32)
x2 = np.arange(500, 6500 , step=3, dtype= np.float32)
x3 = np.arange(0, 2000, step = 1, dtype= np.float32)
# Make any function of x1,x2 and x3 to try the network on
y_train = (x1/4) + (x2/2) + x3**2
#y_train = x1 + x2 + x3
x_train = np.column_stack( (x1,x2,x3) )
print(x_train.shape)
print(y_train.shape)
# Generate a series of input number X1,X2 and X3 for testing
x1 = np.random.randint(0,1000, size= 200).astype(np.float32)
x2 = np.random.randint(1, 500, size=200).astype(np.float32)
x3 = np.random.randint(50, 150 , size=200).astype(np.float32)
x_test = np.column_stack((x1,x2,x3))
y_test = (x1/4) + (x2/2) + x3**2
#y_test = x1 + x2 + x3
print()
print(x_test.shape)
print(y_test.shape)
# Define the placeholder to feed the value at run time
X = tf.placeholder(dtype=tf.float32, shape =[None , 3]) # Number of samples x Number of features (number of inputs to be added)
Y = tf.placeholder(dtype=tf.float32, shape=[None,])
# define the network
# Here the network contains only one NAC cell (for testing)
y_pred = nalu(X, out_units=1)
y_pred = tf.squeeze(y_pred) # Remove extra dimensions if any
# Mean Square Error (MSE)
loss = tf.reduce_mean( (y_pred - Y) **2)
#loss= tf.losses.mean_squared_error(labels=y_train, predictions=y_pred)
# training parameters
alpha = 0.005 # learning rate
epochs = 30000
optimize = tf.train.AdamOptimizer(learning_rate=alpha).minimize(loss)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# pre training evaluate
print("Pre training MSE: ", sess.run (loss, feed_dict={X: x_test, Y:y_test}))
print()
cost_history = []
for i in range(epochs):
_, cost = sess.run([optimize, loss], feed_dict={X: x_train, Y: y_train})
print("epoch: {}, MSE: {}".format(i, cost))
cost_history.append(cost)
# plot the MSE over each iteration
plt.plot(np.arange(epochs),np.log(cost_history)) # Plot MSE on log scale
plt.xlabel("Epoch")
plt.ylabel("MSE")
plt.show()
print()
#print(W.eval())
#print()
# post training loss
print("Post training MSE: ", sess.run(loss, feed_dict={X: x_test, Y: y_test}))
print("Actual sum: ", y_test[0:10])
print()
y_hat = sess.run(y_pred, feed_dict={X: x_test, Y: y_test})
print("Predicted sum: ", y_hat[0:10] )