Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ml cryo from ioSPI #17

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
docstring-convention = numpy
import_order_style = smarkets
max-line-length = 88
extend-ignore = E203
extend-ignore = I202, E203
exclude = reduceSPI/__init__.py,tests/__init__.py
177 changes: 177 additions & 0 deletions reduceSPI/ml_cryo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""Open datasets and process them to be used by a neural network."""

import functools
import json
import os

import h5py
import numpy as np
import torch
from PIL import Image
from torch.utils.data import DataLoader, random_split

CUDA = torch.cuda.is_available()

KWARGS = {"num_workers": 1, "pin_memory": True} if CUDA else {}


def open_dataset(path, size, is_3d):
"""Open datasets and process data in order to make tensors.

Parameters
----------
path : string
Path (myfile.h5 or myfile.npy).
size : int
Length of the image side.
is_3d : boolean
If 2d or 3d.

Returns
-------
dataset: torch,
Greyscale images.
"""
if not os.path.exists(path):
raise OSError
if path.lower().endswith(".h5"):
data_dict = h5py.File(path, "r")
all_datasets = data_dict["particles"][:]
else:
all_datasets = np.load(path)
dataset = np.asarray(all_datasets)
img_shape = dataset.shape
n_imgs = img_shape[0]
new_dataset = []
if is_3d:
dataset = torch.Tensor(dataset)
dataset = normalize_torch(dataset)
if len(dataset.shape) == 4:
dataset = dataset.reshape((len(dataset),) + (1,) + img_shape[1:])
else:
if len(img_shape) == 3:
for i in range(n_imgs):
image = Image.fromarray(dataset[i]).resize([size, size])
new_dataset.append(np.asarray(image))
elif len(img_shape) == 4:
for i in range(n_imgs):
image = Image.fromarray(dataset[i][0]).resize([size, size])
new_dataset.append(np.asarray(image))
dataset = torch.Tensor(new_dataset)
dataset = normalize_torch(dataset)
if len(img_shape) != 4:
dataset = dataset.reshape((img_shape[0], 1, size, size))
return dataset


def normalize_torch(dataset, scale="linear"):
"""Normalize a tensor.

Parameters
----------
dataset : torch tensor
Images.
scale : string
Methods of normalization.

Returns
-------
dataset : torch tensor
Normalized images.
"""
if scale == "linear":
for i, data in enumerate(dataset):
min_data = torch.min(data)
max_data = torch.max(data)
if max_data == min_data:
raise ZeroDivisionError
dataset[i] = (data - min_data) / (max_data - min_data)
return dataset


def split_dataset(dataset, batch_size, frac_val):
"""Separate data in train and validation sets.

Parameters
----------
dataset : torch tensor
Images.
batch_size : int
Batch_size.
frac_val : float
Ratio between validation and training datasets.

Returns
-------
trainset : tensor
Training images.
testset : tensor
Test images.
trainloader : tensor
Ready to be used by the NN for training images.
testloader : tensor
Ready to be used by the NN for test images.
"""
n_imgs = len(dataset)
n_val = int(n_imgs * frac_val)
trainset, testset = random_split(dataset, [n_imgs - n_val, n_val])

trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, **KWARGS)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, **KWARGS)
return trainset, testset, trainloader, testloader


def hinted_tuple_hook(obj):
"""Transform a list into tuple.

Parameters
----------
obj : *
Value of a dic.

Returns
-------
tuple,
Transform the value of a dic into dic.
obj : *
Value of a dic.
"""
if "__tuple__" in obj:
return tuple(obj["items"])
return obj


def load_parameters(path):
"""Load metadata for the VAE.

Parameters
----------
path : string
Path to the file( myfile.json).

Returns
-------
paths : dic
Path to the data.
shapes: dic
Shape of every dataset.
constants: dic
Meta information for the vae.
search_space: dic
Meta information for the vae.
meta_param_names: dic
Names of meta parameters.
"""
with open(path) as json_file:
parameters = json.load(json_file, object_hook=hinted_tuple_hook)
paths = parameters["paths"]
shapes = parameters["shape"]
constants = parameters["constants"]
search_space = parameters["search_space"]
meta_param_names = parameters["meta_param_names"]
constants["conv_dim"] = len(constants["img_shape"][1:])
constants["dataset_name"] = paths["simulated_2d"]
constants["dim_data"] = functools.reduce(
(lambda x, y: x * y), constants["img_shape"]
)
return paths, shapes, constants, search_space, meta_param_names
74 changes: 74 additions & 0 deletions tests/test_ml_cryo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Test ml_cryos."""

import numpy as np
import torch

from reduceSPI import ml_cryo


class TestDataset:
"""Test Dataset."""

@staticmethod
def test_normalize_torch():
"""Test test_normalize_torch."""
dataset = torch.Tensor(
[[3.0, 7.0, 2.0, 7.0], [3.0, 0.0, 8.0, 3.0], [6.0, 7.0, 4.0, 2.0]]
)
dataset = dataset.reshape((1, 4, 3))
result = ml_cryo.normalize_torch(dataset)
expected = torch.Tensor(
[
[0.375, 0.875, 0.25, 0.875],
[0.375, 0.0, 1.0, 0.375],
[0.75, 0.875, 0.5, 0.25],
]
).reshape((1, 4, 3))

assert torch.equal(result, expected)
assert type(result) is torch.Tensor

@staticmethod
def test_split_dataset():
"""Test test_split_dataset."""
frac_val = 0.2
batch_size = 20
dataset = torch.Tensor(np.ones((2000, 1, 64, 64)))
tr_s, ts_s, tr_l, ts_l = ml_cryo.split_dataset(dataset, batch_size, frac_val)
assert len(tr_s) == 1600
assert len(ts_s) == 400
assert len(tr_l) == 80
assert len(ts_l) == 20
assert type(tr_l) is torch.utils.data.dataloader.DataLoader
assert type(ts_l) is torch.utils.data.dataloader.DataLoader
assert type(tr_s) is torch.utils.data.dataset.Subset
assert type(ts_s) is torch.utils.data.dataset.Subset

@staticmethod
def test_hinted_tuple_hook():
"""Test test_hinted_tuple_hook."""
dic1 = {"items": [4, 6], "__tuple__": True}
list1 = [4, 6]
assert ml_cryo.hinted_tuple_hook(dic1) == (4, 6)
assert ml_cryo.hinted_tuple_hook(list1) == [4, 6]

@staticmethod
def test_open_dataset():
"""Test test_open_dataset."""
path = "./tests/data/test_ml_cryo.npy"
dataset1 = ml_cryo.open_dataset(path, size=64, is_3d=False)
dataset2 = ml_cryo.open_dataset(path, size=32, is_3d=False)
assert type(dataset1) is torch.Tensor
assert dataset1.shape == torch.Size([1, 1, 64, 64])
assert dataset2.shape == torch.Size([1, 1, 32, 32])

@staticmethod
def test_load_parameters():
"""Test test_load_parameters."""
path = "./tests/vae_parameters.json"
parameters = ml_cryo.load_parameters(path)
assert len(parameters) == 5
assert "skip_z" in parameters[2].keys()
assert "enc_c" in parameters[2].keys()
assert "is_3d" in parameters[2].keys()
assert "img_shape" in parameters[2].keys()