diff --git a/.gitignore b/.gitignore index 5ed2c21..0945eec 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ .signac .signac_shell_history .signac_sp_cache.json.gz +*__pycache__ notebooks/projects notebooks/static signac.rc diff --git a/environment.yml b/environment.yml index 5152abe..c0cc7f9 100644 --- a/environment.yml +++ b/environment.yml @@ -22,3 +22,6 @@ dependencies: - signac-dashboard>=0.5 - signac-flow>=0.25 - signac>=2.0 + - pytorch + - torchvision + - tqdm diff --git a/projects/flow.pytorch.HyperparameterOptimize/.gitignore b/projects/flow.pytorch.HyperparameterOptimize/.gitignore new file mode 100644 index 0000000..676c406 --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/.gitignore @@ -0,0 +1 @@ +source/data diff --git a/projects/flow.pytorch.HyperparameterOptimize/README.md b/projects/flow.pytorch.HyperparameterOptimize/README.md new file mode 100644 index 0000000..cef8b85 --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/README.md @@ -0,0 +1,80 @@ +An Example of Using signac with [Pytorch] to train [Variational Autoencoder] (VAE). + +This example project demonstrate how to use signac to optimize the hyperparameters of VAE trained on the popular [MNIST] datasets. + +[Variational Autoencoder]: https://arxiv.org/pdf/1312.6114.pdf +[Pytorch]: https://pytorch.org/ +[MNIST]: https://pytorch.org/vision/main/generated/torchvision.datasets.MNIST.html + +## Prerequisites + +This example use the following python packages: + +```conda create --name pytorch python=3.10 matplotlib numpy signac-flow signac-dashboard h5py umap-learn -c conda-forge``` +* [Numpy](https://github.com/numpy/numpy) +* [matplotlib](https://github.com/matplotlib/matplotlib) +* [signac](https://github.com/glotzerlab/signac) +* [signac-flow](https://github.com/glotzerlab/signac-flow) +* [signac-dashboard](https://github.com/glotzerlab/signac-dashboard) + +```conda activate pytorch``` +```conda install pytorch torchvision==0.13.0 -c pytorch``` +* [Pytorch](https://github.com/pytorch/pytorch) +* [torchvision](https://github.com/pytorch/vision) + + +Conda users can install these from [conda-forge](https://conda-forge.org/): + +# Usage + +1. Initialize the project with + + ``` + python init.py + ``` + + In `init.py`, you can define VAE's hyperparameters you would like to try. + +- This checks if MNIST dataset has been downloaded yet. If it's not, it will automatically download it and store in `/source/data/MNIST`. +- This creates the `workspace/`, which holds all of our `jobs`. Each `job` has its own directory, named by the job's unique `id` (something like `87c7fccdea3531da704bbae95e95e914`).\ +- If you look in these directories, you'll see `signac_statepoint.json`. This is a json file that contains the statepoint parameters (hyperparameters of VAE) for that job. +- NOTE: The job's `id` is generated specifically for the dict containing the statepoint parameters, so do not edit the directory name or `signac_statepoint.json`. + +2. All of the operations we will be performing on [MNIST] dataset are defined in `project.py`. An operation is a function with `job` as its only argument that signac-flow recognizes as a part of your workflow. + - You can tell which methods are operations because they will have the `@Project.operation` decorator, which tells signac-flow that this method is to be treated as an operation associated with `Project`. + - `operation`s typically have pre- and post-conditions. This is how signac-flow knows when the operation should be run. For example, the `train()` function has no pre-condition because it must be run first, but has `@Project.post(labels.check_train_complete)` as a post-condition. + +3. All of the functions used in `project.py` are defined in three python scripts: `source/labels.py`, `source/workflow.py`, and `source/vae.py`. + - `source/labels.py` defines the functions that are used to identify whether an signac project level operation is done or not in `project.py`. + - `source/workflow.py` defines the functions that used to manage the workflow of training protocols including post-evaluation on the training. + - `source/vae.py` defines any functions that relate to the training and post-evaluation of VAE using pytorch, which includes functions for, e.g. download [MNIST] dataset, and plot learning curve, etc. + +4. Now let's run the operations: + +- First run a status check: + + ``` + python project.py status -d + ``` + +- Now run eligible jobs: + + ``` + python project.py run -o train + ``` + +- After the training is done, you can run post-evaluation: + + ``` + python project.py run -o evaluation + ``` + +5. For visualizing the post-evaluation, you can launch the [signac-dashbboard](https://github.com/glotzerlab/signac-dashboard): + + ``` + python dashborad.py run + ``` + + And open `http://localhost:8888/` in your web browser. + +**NOTE**: If you want to run this tutorial from scratch, just run `rm -rf workspace/` to delete the workspace. diff --git a/projects/flow.pytorch.HyperparameterOptimize/dashboard.py b/projects/flow.pytorch.HyperparameterOptimize/dashboard.py new file mode 100644 index 0000000..56c5cdc --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/dashboard.py @@ -0,0 +1,34 @@ +from signac_dashboard import Dashboard +from signac_dashboard.modules import ( + DocumentList, + ImageViewer, + Notes, + StatepointList, + VideoViewer, +) + + +class MyDashboard(Dashboard): + def job_title(self, job): + return ( + f"Total epochs={job.sp.epochs}, Learning rate={job.sp.lr}, " + f"Latent space dimension={job.sp.latent_dim}" + ) + + def job_sorter(self, job): + return (-job.sp.lr, job.sp.hidden_dim, job.sp.epochs) + + +if __name__ == "__main__": + MyDashboard( + modules=[ + ImageViewer(img_globs=["Loss.jpg"], name="Learning Curve"), + ImageViewer(img_globs=["latent.jpg"], name="Latent Space"), + ImageViewer(img_globs=["digits_recon.jpg"], name="Reconstructed"), + ImageViewer(img_globs=["digits_orig.jpg"], name="Original"), + StatepointList(), + VideoViewer(), + DocumentList(), + Notes(), + ] + ).main() diff --git a/projects/flow.pytorch.HyperparameterOptimize/init.py b/projects/flow.pytorch.HyperparameterOptimize/init.py new file mode 100644 index 0000000..fc8873c --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/init.py @@ -0,0 +1,48 @@ +import itertools +import os + +import signac +import torchvision.transforms as transforms +from torchvision import datasets +from tqdm import tqdm + +PR = signac.init_project() +HYPER_PARAMS = { + "seed": [1], + "epochs": [10, 20, 30, 40], + "batch_size": [64], + "lr": [0.0001, 0.0005, 0.001, 0.005, 0.01], + "hidden_dim": [128, 256, 512], + "latent_dim": [16], +} + + +def download_MNIST(): + # transforms + transform = transforms.Compose( + [ + transforms.ToTensor(), + ] + ) + + # train and validation data + datasets.MNIST(root="./source/data", train=True, download=True, transform=transform) + datasets.MNIST( + root="./source/data", train=False, download=True, transform=transform + ) + + +def generate_workspace(pr, hyper_params): + for sp in cartesian(**hyper_params): + pr.open_job(sp).init() + + +def cartesian(**kwargs): + for combo in tqdm(itertools.product(*kwargs.values())): + yield dict(zip(kwargs.keys(), combo)) + + +if __name__ == "__main__": + generate_workspace(PR, HYPER_PARAMS) + if not os.path.exists("./source/data/MNIST"): + download_MNIST() diff --git a/projects/flow.pytorch.HyperparameterOptimize/project.py b/projects/flow.pytorch.HyperparameterOptimize/project.py new file mode 100644 index 0000000..54e6b35 --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/project.py @@ -0,0 +1,67 @@ +import torch +from flow import FlowProject +from source import vae + + +class Project(FlowProject): + pass + + +@Project.label +def status_label(job): + return ", ".join( + [ + f"{check_point}_completed" + for check_point in ("train", "evaluation") + if job.doc.get(check_point + "_done", False) + ] + ) + + +def gpu_directives(walltime: float = 0.5, n_gpu: int = 1): + return {"nranks": n_gpu, "ngpu": n_gpu, "walltime": walltime} + + +def cpu_directives(walltime: float = 0.5, n_cpu: int = 1): + return {"nranks": n_cpu, "walltime": walltime} + + +def store_success_to_doc(operation_name, job): + job.doc.update({f"{operation_name}_done": True}) + + +training_group = Project.make_group(name="trainings") + +TRAIN_WALLTIME = 1 +EVAL_WALLTIME = 0.5 + + +@training_group +@Project.operation_hooks.on_success(store_success_to_doc) +@Project.post.true("train_done") +@Project.operation(directives=gpu_directives(walltime=TRAIN_WALLTIME)) +def train(job): + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + train_loader, val_loader = vae.load_data(job) + vae.fit(job=job, train_loader=train_loader, val_loader=val_loader, device=device) + + +@training_group +@Project.operation_hooks.on_success(store_success_to_doc) +@Project.post.true("evaluation_done") +@Project.pre.after(train) +@Project.operation(directives=gpu_directives(walltime=EVAL_WALLTIME)) +def evaluation(job): + vae.plot_loss(job) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + _, val_loader = vae.load_data(job) + dataset = val_loader.dataset + vae.plot_reconstruction( + job=job, dataset=dataset, plot_arrangement=(3, 3), device=device + ) + vae.plot_latent(job=job, dataset=dataset, device=device) + + +if __name__ == "__main__": + Project().main() diff --git a/projects/flow.pytorch.HyperparameterOptimize/source/vae.py b/projects/flow.pytorch.HyperparameterOptimize/source/vae.py new file mode 100644 index 0000000..7bb0871 --- /dev/null +++ b/projects/flow.pytorch.HyperparameterOptimize/source/vae.py @@ -0,0 +1,257 @@ +import matplotlib as mpl +import matplotlib.pyplot as plt +import numpy as np +import torch +import torchvision + + +class LinearVAE(torch.nn.Module): + def __init__(self, features_dim, latent_dim, hidden_dim): + super().__init__() + + self.latent_dim = latent_dim + # encoder + self.enc1 = torch.nn.Linear(in_features=features_dim, out_features=hidden_dim) + self.enc2 = torch.nn.Linear(in_features=hidden_dim, out_features=latent_dim * 2) + + # decoder + self.dec1 = torch.nn.Linear(in_features=latent_dim, out_features=hidden_dim) + self.dec2 = torch.nn.Linear(in_features=hidden_dim, out_features=features_dim) + + def reparameterize(self, mu, log_var): + """ + :param mu: mean from the encoder's latent space + :param log_var: log variance from the encoder's latent space + """ + std = torch.exp(0.5 * log_var) # standard deviation + eps = torch.randn_like(std) # "randn_like" as we need the same size + sample = mu + (eps * std) # sampling as if coming from the input space + + return sample + + def sample(self, x): + with torch.no_grad(): + return self.reparameterize(*self.encode(x)) + + def forward(self, x): + mu, log_var = self.encode(x) + z = self.reparameterize(mu, log_var) + reconstruction = self.decode(z) + + return reconstruction, mu, log_var + + def encode(self, x): + x = torch.nn.functional.relu(self.enc1(x)) + x = self.enc2(x).view(-1, self.latent_dim, 2) + + mu = x[:, :, 0] # the first feature values as mean + log_var = x[:, :, 1] # the other feature values as variance + return mu, log_var + + def decode(self, z): + x = torch.nn.functional.relu(self.dec1(z)) + reconstruction = torch.sigmoid(self.dec2(x)) + return reconstruction + + @classmethod + def from_job(cls, job, device, state_file="model.pth"): + model = cls( + job.doc["features_dim"], job.sp["latent_dim"], job.sp["hidden_dim"] + ).to(device) + if job.isfile("model.pth"): + model.load_state_dict(torch.load(job.fn("model.pth"), map_location=device)) + return model + + +class VAELoss: + def __init__(self): + self.bce = torch.nn.BCELoss() + + def __call__(self, input, reconstruction, mu, logvar): + kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) + return self.bce(reconstruction, input) + kl_divergence + + +def load_data(job): + torch.manual_seed(job.sp["seed"]) + batch_size = job.sp.get("batch_size", 64) + + transform = torchvision.transforms.Compose( + [ + torchvision.transforms.ToTensor(), + ] + ) + train_data = torchvision.datasets.MNIST( + root=job.fn("../../source/data"), + train=True, + download=False, + transform=transform, + ) + val_data = torchvision.datasets.MNIST( + root=job.fn("../../source/data"), + train=False, + download=False, + transform=transform, + ) + job.doc.setdefault( + "features_dim", train_data[0][0].shape[1] * train_data[0][0].shape[2] + ) + + train_loader = torch.utils.data.DataLoader( + train_data, batch_size=batch_size, shuffle=True + ) + val_loader = torch.utils.data.DataLoader( + val_data, batch_size=batch_size, shuffle=False + ) + return train_loader, val_loader + + +def fit(job, train_loader, val_loader, device): + import time + + torch.manual_seed(job.sp["seed"]) + print(f"Job id: {job.id}") + start_time = time.time() + epochs = job.sp.get("epochs", 1) + + model = LinearVAE.from_job(job, device) + optimizer = torch.optim.Adam(model.parameters(), lr=job.sp.get("lr", 0.000001)) + loss_compute = VAELoss() + + train_loss = [] + val_loss = [] + for epoch in range(epochs): + print("--------------------------------\n") + print(f"Current epoch: {epoch+1}/{epochs}\n") + train_epoch_loss = training( + job, train_loader, model, device, optimizer, loss_compute + ) + val_epoch_loss = validate(val_loader, model, device, loss_compute) + train_loss.append(train_epoch_loss) + val_loss.append(val_epoch_loss) + + with job.data: + job.data["training/loss"] = np.hstack(train_loss) + job.data["validation/loss"] = np.hstack(val_loss) + if torch.cuda.is_available(): + torch.cuda.synchronize() + end_time = time.time() + job.doc["elapsed_time"] = end_time - start_time + + +def get_feature(data, device): + feature, label = data + return feature.to(device).view(feature.size(0), -1) + + +def step(data, device, model, loss_compute): + feature = get_feature(data, device) + reconstruction, mu, logvar = model(feature) + return loss_compute(feature, reconstruction, mu, logvar) + + +def training(job, dataloader, model, device, optimizer, loss_compute): + model.train() + running_loss = 0.0 + for i, data in enumerate(dataloader): + optimizer.zero_grad() + loss = step(data, device, model, loss_compute) + loss.backward() + optimizer.step() + running_loss += loss.item() + + torch.save(model.state_dict(), job.fn("model.pth")) + torch.save(optimizer.state_dict(), job.fn("optimizer.ptpyth")) + + train_loss = running_loss / len(dataloader.dataset) + print(f"Training set: Avg. loss {train_loss:.4f}\n") + return train_loss + + +def validate(dataloader, model, device, loss_compute): + model.eval() + running_loss = 0.0 + with torch.no_grad(): + for i, data in enumerate(dataloader): + running_loss += step(data, device, model, loss_compute).item() + + val_loss = running_loss / len(dataloader.dataset) + print(f"Validation set: Avg. loss {val_loss:.4f}\n") + return val_loss + + +def plot_loss(job): + with job.data: + training_loss = job.data["training/loss"][:] + val_loss = job.data["validation/loss"][:] + + epochs = np.arange(1, job.sp["epochs"] + 1) + fig, ax = plt.subplots(figsize=(10, 8)) + ax.plot(epochs, training_loss, label="Training") + ax.plot(epochs, val_loss, linestyle="--", label="Validation") + ax.legend(fontsize=20) + ax.set_xlabel("Epochs") + ax.set_ylabel("KL-divergence + Reconstruction loss", fontsize=20) + ax.set_title("Loss") + fig.savefig(job.fn("Loss.jpg")) + + +def plot_latent(job, dataset, device): + torch.manual_seed(job.sp["seed"]) + model = LinearVAE.from_job(job, device) + + x_arr = [] + label_arr = [] + for x, label in dataset: + x_arr.append(x) + label_arr.append(label) + + z_arr = np.vstack( + [ + model.sample(xi.to(device)[0].view(1, -1)).detach().cpu().numpy()[0] + for xi in x_arr + ] + ) + + colormap = mpl.cm.get_cmap("tab10") + sm = plt.cm.ScalarMappable(cmap=colormap) + sm.set_array([]) + + # use 10 colors to represent a given digit + colors = np.linspace(0, 1, 10, endpoint=False) + c_arr = colormap(colors[np.asarray(label_arr)]) + + fig, ax = plt.subplots(figsize=(12, 10)) + ax.scatter(z_arr[:, 0], z_arr[:, 1], c=c_arr, s=5) + plt.colorbar(sm, ax=ax, ticks=np.arange(10), label="Ground truth") + ax.set_xlabel(r"$z_1$") + ax.set_ylabel(r"$z_2$") + ax.set_title("Latent space of val set") + fig.savefig(job.fn("latent.jpg")) + + +def plot_reconstruction(job, dataset, plot_arrangement, device): + torch.manual_seed(job.sp["seed"]) + model = LinearVAE.from_job(job, device) + + fig_orig, ax_orig = plt.subplots(*plot_arrangement, figsize=(14, 10)) + fig_orig.suptitle("Ground Truth") + fig_recon, ax_recon = plt.subplots(*plot_arrangement, figsize=(14, 10)) + fig_recon.suptitle("VAE Reconstruction") + + def plot_image(torch_arr, ax): + ax.imshow( + torch_arr.reshape(28, 28).to("cpu").detach().numpy(), alpha=0.8, cmap="grey" + ) + + rng = np.random.default_rng(job.sp["seed"]) + samples = rng.integers(0, len(dataset) - 1, np.product(plot_arrangement)) + for i, ax1, ax2 in zip(samples, ax_orig.flat, ax_recon.flat): + feature, _ = dataset[i] + plot_image(feature, ax1) + with torch.no_grad(): + feature = feature.to(device).view(feature.size(0), -1) + reconstruction, _, _ = model(feature) + plot_image(reconstruction, ax2) + fig_orig.savefig(job.fn("digits_orig.jpg")) + fig_recon.savefig(job.fn("digits_recon.jpg"))