-
Notifications
You must be signed in to change notification settings - Fork 4
/
Reconstruction.py
107 lines (88 loc) · 4.11 KB
/
Reconstruction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy as np
import tensorflow as tf
from typing import List, TYPE_CHECKING
from tensorflow.keras.layers import InputLayer, Dense, Conv1D, MaxPooling1D, Conv1DTranspose, Dropout
from tensorflow.keras.models import Sequential, Model
from globals import SPECLENGTH
if TYPE_CHECKING:
from tensorflow.python.framework.ops import EagerTensor
def normalizeSpecSet(specSet: np.ndarray) -> np.ndarray:
"""
Normalizing Specset to 0.0 -> 1.0 range, for each spectrum individually
:param specSet: (N x M) array of N spectra with M wavenumbers
:return: normalized specset
"""
for i in range(specSet.shape[0]):
intens: np.ndarray = specSet[i, :]
intens -= intens.min()
if intens.max() != 0:
intens /= intens.max()
specSet[i, :] = intens
return specSet
def prepareSpecSet(specSet: np.ndarray, transpose: bool = True, addDimension: bool = False, normalize: bool = True):
if transpose:
specSet = specSet.transpose()
if normalize:
specSet = normalizeSpecSet(specSet)
if addDimension:
specSet = specSet.reshape(specSet.shape[0], specSet.shape[1], 1)
specSet = tf.cast(specSet, tf.float32)
return specSet
def getConvReconstructor() -> 'Reconstructor':
model: Reconstructor = Reconstructor()
model.encoder.add(InputLayer(input_shape=(SPECLENGTH, 1)))
model.encoder.add(Conv1D(32, 4, padding='same', activation="relu"))
model.encoder.add(MaxPooling1D(2, padding='same'))
model.encoder.add(Conv1D(32, 4, activation="relu", padding="same"))
model.encoder.add(MaxPooling1D(2, padding="same"))
model.decoder.add(Conv1DTranspose(32, 4, activation="relu", padding="same"))
model.decoder.add(Conv1DTranspose(32, 4, activation='relu', padding="same"))
model.decoder.add(Conv1D(1, 1, activation='relu', padding='same'))
model.compile(optimizer='adam', loss='mse')
return model
def getDenseReconstructor(dropout: float = 0.0) -> 'Reconstructor':
rec: Reconstructor = Reconstructor()
rec.encoder.add(InputLayer(input_shape=(SPECLENGTH)))
if dropout > 0.0:
rec.encoder.add(Dropout(dropout))
rec.encoder.add(Dense(128, activation="relu"))
rec.decoder.add(InputLayer(128))
if dropout:
rec.decoder.add(Dropout(dropout))
rec.decoder.add(Dense(SPECLENGTH, activation="relu"))
rec.compile(optimizer='adam', loss='mse')
return rec
class Reconstructor(Model):
def __init__(self):
super(Reconstructor, self).__init__()
self.encoder: Sequential = Sequential()
self.decoder: Sequential = Sequential()
self._encodedTrainData: np.ndarray = None
def call(self, inputs, training=None, mask=None):
if training:
self._encodedTrainData = None
encoded = self.encoder(inputs)
decoded = self.decoder(encoded)
return decoded
def calculateEncodedTrainingData(self, inputs) -> None:
self._encodedTrainData = self.encoder(inputs).numpy()
def getPoorlyRepresentedIndices(self, inputs: 'EagerTensor', maxDist: float = 0.1) -> np.ndarray:
"""
Calculates distance of encoded inputs to encoded training data. It returns indices of samples
having a distance > the specified maxDist that is condsidered save for good reconstruction.
An a-priori of finding a good maxDist is necessary...
:param inputs: Eager Tensor of inputs for network inference
:param maxDist: Maximum Distance value (in encoded space dimension)
:return: array of indices with training data further away than maxDist
"""
if self._encodedTrainData is None:
print("Call 'calculateEncodedTrainingData' with the training data first.")
return None
encodedInputs: np.ndarray = self.encoder(inputs).numpy()
invalidIndices: List[int] = []
for i in range(encodedInputs.shape[0]):
distances = np.linalg.norm(self._encodedTrainData - encodedInputs[i, :], axis=1)
avgMinDist = np.mean(np.sort(distances)[:5])
if avgMinDist > maxDist:
invalidIndices.append(i)
return np.array(invalidIndices)