diff --git a/pyod/models/vae.py b/pyod/models/vae.py index 4938cbc8c..ac77cb4e6 100644 --- a/pyod/models/vae.py +++ b/pyod/models/vae.py @@ -25,6 +25,11 @@ from sklearn.utils import check_array from sklearn.utils.validation import check_is_fitted +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.utils.data import TensorDataset, DataLoader + from .base import BaseDetector from .base_dl import _get_tensorflow_version from ..utils.stat_models import pairwise_distances_no_broadcast @@ -384,3 +389,143 @@ def decision_function(self, X): # Predict on X and return the reconstruction errors pred_scores = self.model_.predict(X_norm) return pairwise_distances_no_broadcast(X_norm, pred_scores) + + +class TorchVAE(nn.Module): + def __init__(self, encoder_neurons=None, decoder_neurons=None, + latent_dim=2, + epochs=100, batch_size=32, dropout_rate=0.2, + l2_regularizer=0.1, validation_size=0.1, preprocessing=True, + verbose=1, random_state=None, contamination=0.1, + gamma=1.0, capacity=0.0): + super(TorchVAE, self).__init__() + self.encoder_neurons = encoder_neurons + self.decoder_neurons = decoder_neurons + self.loss = nn.MSELoss() + self.epochs = epochs + self.batch_size = batch_size + self.dropout_rate = dropout_rate + self.l2_regularizer = l2_regularizer + self.validation_size = validation_size + self.preprocessing = preprocessing + self.verbose = verbose + self.random_state = random_state + self.latent_dim = latent_dim + self.gamma = gamma + self.capacity = capacity + + # default values + if self.encoder_neurons is None: + self.encoder_neurons = [128, 64, 32] + + if self.decoder_neurons is None: + self.decoder_neurons = [32, 64, 128] + + self.encoder = nn.Sequential( + nn.Linear(self.n_features_, self.encoder_neurons[0]), + nn.ReLU(), + nn.Dropout(self.dropout_rate) + ) + for i in range(1, len(self.encoder_neurons)): + self.encoder.add_module('encoder_{}'.format(i), nn.Sequential( + nn.Linear(self.encoder_neurons[i-1], self.encoder_neurons[i]), + nn.ReLU(), + nn.Dropout(self.dropout_rate) + )) + self.z_mean = nn.Linear(self.encoder_neurons[-1], self.latent_dim) + self.z_log = nn.Linear(self.encoder_neurons[-1], self.latent_dim) + + self.decoder = nn.Sequential( + nn.Linear(self.latent_dim, self.decoder_neurons[0]), + nn.Sigmoid(), + nn.Dropout(self.dropout_rate) + ) + for i in range(1, len(self.decoder_neurons)): + self.decoder.add_module('decoder_{}'.format(i), nn.Sequential( + nn.Linear(self.decoder_neurons[i-1], self.decoder_neurons[i]), + nn.Sigmoid(), + nn.Dropout(self.dropout_rate) + )) + self.output = nn.Linear(self.decoder_neurons[-1], self.n_features_) + + def sampling(self, args): + z_mean, z_log = args + batch = z_mean.size(0) + dim = z_mean.size(1) + epsilon = torch.randn(batch, dim) + return z_mean + torch.exp(0.5 * z_log) * epsilon + + def vae_loss(self, inputs, outputs, z_mean, z_log): + reconstruction_loss = self.loss(inputs, outputs) + reconstruction_loss *= self.n_features_ + kl_loss = 1 + z_log - z_mean.pow(2) - z_log.exp() + kl_loss = -0.5 * kl_loss.sum(dim=-1) + kl_loss = self.gamma * torch.abs(kl_loss - self.capacity) + return torch.mean(reconstruction_loss + kl_loss) + + def forward(self, x): + x = self.encoder(x) + z_mean = self.z_mean(x) + z_log = self.z_log(x) + z = self.sampling((z_mean, z_log)) + x = self.decoder(z) + x = self.output(x) + return x, z_mean, z_log + + def fit(self, X, y=None): + # validate inputs X and y (optional) + X = torch.from_numpy(X) + self._set_n_classes(y) + + # Verify and construct the hidden units + self.n_samples_, self.n_features_ = X.size(0), X.size(1) + + # Standardize data for better performance + if self.preprocessing: + self.scaler_ = StandardScaler() + X_norm = torch.from_numpy(self.scaler_.fit_transform(X.numpy())) + else: + X_norm = X + + # Shuffle the data for validation as PyTorch do not shuffling for + # Validation Split + X_norm = X_norm.shuffle() + + # Build VAE model & fit with X + dataset = TensorDataset(X_norm) + dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True) + + optimizer = torch.optim.Adam(self.parameters()) + + for epoch in range(self.epochs): + for data in dataloader: + optimizer.zero_grad() + outputs, z_mean, z_log = self(data[0]) + loss = self.vae_loss(data[0], outputs, z_mean, z_log) + loss.backward() + optimizer.step() + + # Predict on X itself and calculate the reconstruction error as + # the outlier scores. Noted X_norm was shuffled has to recreate + if self.preprocessing: + X_norm = torch.from_numpy(self.scaler_.transform(X.numpy())) + else: + X_norm = X + + pred_scores = self.forward(X_norm)[0] + self.decision_scores_ = pairwise_distances_no_broadcast(X_norm.numpy(), pred_scores.numpy()) + self._process_decision_scores() + return self + + def decision_function(self, X): + check_is_fitted(self, ['model_', 'history_']) + X = torch.from_numpy(X) + + if self.preprocessing: + X_norm = torch.from_numpy(self.scaler_.transform(X.numpy())) + else: + X_norm = X + + # Predict on X and return the reconstruction errors + pred_scores = self.forward(X_norm)[0] + return pairwise_distances_no_broadcast(X_norm.numpy(), pred_scores.numpy())