-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRBM.py
181 lines (147 loc) · 7.94 KB
/
RBM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import logging
import numpy as np
from tqdm import tqdm
from tqdm import trange
import pickle
class RBM():
"""Bernoulli Restricted Boltzmann Machine (RBM)
Parameters:
-----------
n_hidden: int:
The number of processing nodes (neurons) in the hidden layer.
learning_rate: float
The step length that will be used when updating the weights.
batch_size: int
The size of the mini-batch used to calculate each weight update.
n_iterations: float
The number of training iterations the algorithm will tune the weights for.
Reference:
A Practical Guide to Training Restricted Boltzmann Machines
URL: https://www.cs.toronto.edu/~hinton/absps/guideTR.pdf
"""
def sigmoid(self, x):
return 1.0 / (1 + np.exp(-x))
def batch_iterator(self, iterable, batch_size=1):
#l = len(iterable)
l = iterable.shape[0]
for ndx in range(0, l, batch_size):
yield iterable[ndx:min(ndx + batch_size, l)]
def __init__(self, n_hidden=128, learning_rate=0.1, batch_size=10, n_iterations=100):
self.n_iterations = n_iterations
self.batch_size = batch_size
self.lr = learning_rate
self.n_hidden = n_hidden
#self.progressbar = progressbar.ProgressBar()
def _initialize_weights(self, X):
n_visible = X.shape[1]
self.W = np.random.normal(scale=0.1, size=(n_visible, self.n_hidden))
self.v0 = np.zeros(n_visible) # Bias visible
self.h0 = np.zeros(self.n_hidden) # Bias hidden
self.grads_first_moment_W = np.zeros((n_visible, self.n_hidden))
self.grads_second_moment_W = np.zeros((n_visible, self.n_hidden))
self.grads_first_moment_v = np.zeros(n_visible)
self.grads_second_moment_v = np.zeros(n_visible)
self.grads_first_moment_h = np.zeros(self.n_hidden)
self.grads_second_moment_h = np.zeros(self.n_hidden)
self.beta1 = 0.9
self.beta2 = 0.999
self.epsilon = 1e-8
'''
self.beta1 = 0.1
self.beta2 = 0.4
self.epsilon = 1e-1
'''
def fit(self, X, y=None):
'''Contrastive Divergence training procedure'''
self._initialize_weights(X)
self.training_errors = []
self.training_reconstructions = []
#for _ in self.progressbar(range(self.n_iterations)):
t = trange(self.n_iterations, desc='Error: ',leave=True)
for time in t:
time += 1
batch_errors = []
for batch in self.batch_iterator(X, batch_size=self.batch_size):
# Positive phase
positive_hidden = self.sigmoid(batch.dot(self.W) + self.h0)
hidden_states = self._sample(positive_hidden)
positive_associations = batch.T.dot(positive_hidden)
# Negative phase
negative_visible = self.sigmoid(hidden_states.dot(self.W.T) + self.v0)
negative_visible = self._sample(negative_visible)
negative_hidden = self.sigmoid(negative_visible.dot(self.W) + self.h0)
negative_associations = negative_visible.T.dot(negative_hidden)
self.grads_first_moment_W = self.beta1 * self.grads_first_moment_W + \
(1. - self.beta1) * (positive_associations - negative_associations)
self.grads_second_moment_W = self.beta2 * self.grads_second_moment_W + \
(1. - self.beta2) * (positive_associations - negative_associations)**2
grads_first_moment_unbiased = self.grads_first_moment_W / (1. - self.beta1**time)
grads_second_moment_unbiased = self.grads_second_moment_W / (1. - self.beta2**time)
self.W += self.lr * grads_first_moment_unbiased /(np.sqrt(grads_second_moment_unbiased) + self.epsilon)
self.grads_first_moment_h = self.beta1 * self.grads_first_moment_h + \
(1. - self.beta1) * (positive_hidden.sum(axis=0) - negative_hidden.sum(axis=0))
self.grads_second_moment_h = self.beta2 * self.grads_second_moment_h + \
(1. - self.beta2) * (positive_hidden.sum(axis=0) - negative_hidden.sum(axis=0))**2
grads_first_moment_unbiased = self.grads_first_moment_h / (1. - self.beta1**time)
grads_second_moment_unbiased = self.grads_second_moment_h / (1. - self.beta2**time)
self.h0 += self.lr * grads_first_moment_unbiased /(np.sqrt(grads_second_moment_unbiased) + self.epsilon)
self.grads_first_moment_v = self.beta1 * self.grads_first_moment_v + \
(1. - self.beta1) * (batch.sum(axis=0) - negative_visible.sum(axis=0))
self.grads_second_moment_v = self.beta2 * self.grads_second_moment_v + \
(1. - self.beta2) * (batch.sum(axis=0) - negative_visible.sum(axis=0))**2
grads_first_moment_unbiased = self.grads_first_moment_v / (1. - self.beta1**time)
grads_second_moment_unbiased = self.grads_second_moment_v / (1. - self.beta2**time)
self.v0 += self.lr * grads_first_moment_unbiased /(np.sqrt(grads_second_moment_unbiased) + self.epsilon)
batch_errors.append(np.mean((batch - negative_visible) ** 2))
self.training_errors.append(np.mean(batch_errors))
t.set_description('Error: {e}'.format(e = self.training_errors[-1]))
t.refresh() # to show immediately the update
# Reconstruct a batch of images from the training set
idx = np.random.choice(range(X.shape[0]), self.batch_size)
self.training_reconstructions.append(self.reconstruct(X[idx]))
# Implemented by me, does the same as reconstruct. But some computations are not needd.
def predict(self, X):
# Positive phase
positive_hidden = self.sigmoid(X.dot(self.W) + self.h0)
hidden_states = self._sample(positive_hidden)
positive_associations = X.T.dot(positive_hidden)
# Negative phase
negative_visible = self.sigmoid(hidden_states.dot(self.W.T) + self.v0)
#negative_visible = self._sample(negative_visible)
negative_hidden = self.sigmoid(negative_visible.dot(self.W) + self.h0)
negative_associations = negative_visible.T.dot(negative_hidden)
return negative_visible
def predict_count(self, X):
# Positive phase
positive_hidden = self.sigmoid(X.dot(self.W) + self.h0)
hidden_states = self._sample(positive_hidden)
negative_visible = self.sigmoid(hidden_states.dot(self.W.T) + self.v0)
return negative_visible, hidden_states
def _sample(self, X):
return X > np.random.random_sample(size=X.shape)
def reconstruct(self, X):
positive_hidden = self.sigmoid(X.dot(self.W) + self.h0)
hidden_states = self._sample(positive_hidden)
negative_visible = self.sigmoid(hidden_states.dot(self.W.T) + self.v0)
return negative_visible
def compress(self, X):
positive_hidden = self.sigmoid(X.dot(self.W) + self.h0)
hidden_states = self._sample(positive_hidden)
return positive_hidden
def save(self, name):
"""save class as self.name.txt"""
with open(name, 'wb') as handle:
pickle.dump(self.__dict__, handle, protocol=pickle.HIGHEST_PROTOCOL)
def load(self, file):
"""try load self.name.txt"""
with open(file, 'rb') as handle:
tmp_dict = pickle.load(handle)
self.__dict__.update(tmp_dict)
if __name__ == "__main__":
X = np.array([[1,1,1,0,0,0],[1,0,1,0,0,0],[1,1,1,0,0,0],[0,0,1,1,1,0], [0,0,1,1,0,0],[0,0,1,1,1,0]])
rbm = RBM(n_hidden=100, n_iterations=1000, batch_size=25, learning_rate=0.1)
rbm.fit(X)
print('Actual')
print(X)
print('Predicted')
print(np.around(rbm.predict(X), decimals=2))