Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create vae on pytorch #572

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions pyod/models/vae.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

from .base import BaseDetector
from .base_dl import _get_tensorflow_version
from ..utils.stat_models import pairwise_distances_no_broadcast
Expand Down Expand Up @@ -384,3 +389,143 @@ def decision_function(self, X):
# Predict on X and return the reconstruction errors
pred_scores = self.model_.predict(X_norm)
return pairwise_distances_no_broadcast(X_norm, pred_scores)


class TorchVAE(nn.Module):
def __init__(self, encoder_neurons=None, decoder_neurons=None,
latent_dim=2,
epochs=100, batch_size=32, dropout_rate=0.2,
l2_regularizer=0.1, validation_size=0.1, preprocessing=True,
verbose=1, random_state=None, contamination=0.1,
gamma=1.0, capacity=0.0):
super(TorchVAE, self).__init__()
self.encoder_neurons = encoder_neurons
self.decoder_neurons = decoder_neurons
self.loss = nn.MSELoss()
self.epochs = epochs
self.batch_size = batch_size
self.dropout_rate = dropout_rate
self.l2_regularizer = l2_regularizer
self.validation_size = validation_size
self.preprocessing = preprocessing
self.verbose = verbose
self.random_state = random_state
self.latent_dim = latent_dim
self.gamma = gamma
self.capacity = capacity

# default values
if self.encoder_neurons is None:
self.encoder_neurons = [128, 64, 32]

if self.decoder_neurons is None:
self.decoder_neurons = [32, 64, 128]

self.encoder = nn.Sequential(
nn.Linear(self.n_features_, self.encoder_neurons[0]),
nn.ReLU(),
nn.Dropout(self.dropout_rate)
)
for i in range(1, len(self.encoder_neurons)):
self.encoder.add_module('encoder_{}'.format(i), nn.Sequential(
nn.Linear(self.encoder_neurons[i-1], self.encoder_neurons[i]),
nn.ReLU(),
nn.Dropout(self.dropout_rate)
))
self.z_mean = nn.Linear(self.encoder_neurons[-1], self.latent_dim)
self.z_log = nn.Linear(self.encoder_neurons[-1], self.latent_dim)

self.decoder = nn.Sequential(
nn.Linear(self.latent_dim, self.decoder_neurons[0]),
nn.Sigmoid(),
nn.Dropout(self.dropout_rate)
)
for i in range(1, len(self.decoder_neurons)):
self.decoder.add_module('decoder_{}'.format(i), nn.Sequential(
nn.Linear(self.decoder_neurons[i-1], self.decoder_neurons[i]),
nn.Sigmoid(),
nn.Dropout(self.dropout_rate)
))
self.output = nn.Linear(self.decoder_neurons[-1], self.n_features_)

def sampling(self, args):
z_mean, z_log = args
batch = z_mean.size(0)
dim = z_mean.size(1)
epsilon = torch.randn(batch, dim)
return z_mean + torch.exp(0.5 * z_log) * epsilon

def vae_loss(self, inputs, outputs, z_mean, z_log):
reconstruction_loss = self.loss(inputs, outputs)
reconstruction_loss *= self.n_features_
kl_loss = 1 + z_log - z_mean.pow(2) - z_log.exp()
kl_loss = -0.5 * kl_loss.sum(dim=-1)
kl_loss = self.gamma * torch.abs(kl_loss - self.capacity)
return torch.mean(reconstruction_loss + kl_loss)

def forward(self, x):
x = self.encoder(x)
z_mean = self.z_mean(x)
z_log = self.z_log(x)
z = self.sampling((z_mean, z_log))
x = self.decoder(z)
x = self.output(x)
return x, z_mean, z_log

def fit(self, X, y=None):
# validate inputs X and y (optional)
X = torch.from_numpy(X)
self._set_n_classes(y)

# Verify and construct the hidden units
self.n_samples_, self.n_features_ = X.size(0), X.size(1)

# Standardize data for better performance
if self.preprocessing:
self.scaler_ = StandardScaler()
X_norm = torch.from_numpy(self.scaler_.fit_transform(X.numpy()))
else:
X_norm = X

# Shuffle the data for validation as PyTorch do not shuffling for
# Validation Split
X_norm = X_norm.shuffle()

# Build VAE model & fit with X
dataset = TensorDataset(X_norm)
dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

optimizer = torch.optim.Adam(self.parameters())

for epoch in range(self.epochs):
for data in dataloader:
optimizer.zero_grad()
outputs, z_mean, z_log = self(data[0])
loss = self.vae_loss(data[0], outputs, z_mean, z_log)
loss.backward()
optimizer.step()

# Predict on X itself and calculate the reconstruction error as
# the outlier scores. Noted X_norm was shuffled has to recreate
if self.preprocessing:
X_norm = torch.from_numpy(self.scaler_.transform(X.numpy()))
else:
X_norm = X

pred_scores = self.forward(X_norm)[0]
self.decision_scores_ = pairwise_distances_no_broadcast(X_norm.numpy(), pred_scores.numpy())
self._process_decision_scores()
return self

def decision_function(self, X):
check_is_fitted(self, ['model_', 'history_'])
X = torch.from_numpy(X)

if self.preprocessing:
X_norm = torch.from_numpy(self.scaler_.transform(X.numpy()))
else:
X_norm = X

# Predict on X and return the reconstruction errors
pred_scores = self.forward(X_norm)[0]
return pairwise_distances_no_broadcast(X_norm.numpy(), pred_scores.numpy())
Loading