Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactored for better package design #14

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 187 additions & 64 deletions examples/workbook.ipynb

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions src/poisson_numcodecs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .poisson import Poisson

__all__ = ["Poisson"]
from .codec import PoissonCodec

__all__ = ["PoissonCodec"]
98 changes: 98 additions & 0 deletions src/poisson_numcodecs/codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Numcodecs Codec implementation for Poisson noise calibration
"""
import numpy as np
import numcodecs
from numcodecs.abc import Codec

def make_anscombe_lookup(
sensitivity: float,
input_max: int=0x7fff,
zero_level: int=0,
beta: float=0.5,
output_type='uint8'):
"""
Compute the Anscombe lookup table.
The lookup converts a linear grayscale image into a uniform variance image.
:param sensitivity: the size of one photon in the linear input image.
:param input_max: the maximum value in the input
:param beta: the grayscale quantization step expressed in units of noise std dev
"""
xx = (np.r_[:input_max + 1] - zero_level) / sensitivity # input expressed in photon rates
zero_slope = 1 / beta / np.sqrt(3/8) # slope for negative values
offset = zero_level * zero_slope / sensitivity
lookup_table = np.round(offset +
(xx < 0) * (xx * zero_slope) +
(xx >= 0) * (2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8))))
lookup = lookup_table.astype(output_type)
assert np.diff(lookup_table).min() >= 0, "non-monotonic lookup generated"
return lookup


def make_inverse_lookup(lookup_table: np.ndarray, output_type='int16') -> np.ndarray:
"""Compute the inverse lookup table for a monotonic forward lookup table."""
_, inv1 = np.unique(lookup_table, return_index=True) # first entry
_, inv2 = np.unique(lookup_table[::-1], return_index=True) # last entry
inverse = (inv1 + lookup_table.size - 1 - inv2)/2
return inverse.astype(output_type)


def lookup(movie: np.ndarray, lookup_table: np.ndarray) -> np.ndarray:
"""Apply lookup table to movie"""
return lookup_table[np.maximum(0, np.minimum(movie, lookup_table.size-1))]


class PoissonCodec(Codec):
"""Codec for 3-dimensional Filter. The codec assumes that input data are of shape:
(time, x, y).

Parameters
----------
zero_level : float
Signal level when no photons are recorded.
This should pre-computed or measured directly on the instrument.
photon_sensitivity : float
Conversion scalor to convert the measure signal into absolute photon numbers.
This should pre-computed or measured directly on the instrument.
"""
codec_id = "poisson"

def __init__(self,
zero_level,
photon_sensitivity,
encoded_dtype='int8',
decoded_dtype='int16',
):
self.zero_level = zero_level
self.photon_sensitivity = photon_sensitivity
self.encoded_dtype = encoded_dtype
self.decoded_dtype = decoded_dtype

def encode(self, buf: np.ndarray) -> np.ndarray:
lookup_table = make_anscombe_lookup(
self.photon_sensitivity,
output_type=self.encoded_dtype,
zero_level=self.zero_level,
)
encoded = lookup(buf, lookup_table)
shape = [encoded.ndim] + list(encoded.shape)
shape = np.array(shape, dtype='uint8')
return shape.tobytes() + encoded.astype(self.encoded_dtype).tobytes()

def decode(self, buf: bytes, out=None) -> np.ndarray:
lookup_table = make_anscombe_lookup(
self.photon_sensitivity,
output_type=self.encoded_dtype,
zero_level=self.zero_level,
)
inverse_table = make_inverse_lookup(
lookup_table,
output_type=self.decoded_dtype
)
ndims = int(buf[0])
shape = [int(_) for _ in buf[1:ndims+1]]
decoded = np.frombuffer(buf[ndims+1:], dtype=self.encoded_dtype).reshape(shape)
return lookup(decoded, inverse_table).astype(self.decoded_dtype)


numcodecs.register_codec(PoissonCodec)
45 changes: 4 additions & 41 deletions src/poisson_numcodecs/estimate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np
from sklearn.linear_model import HuberRegressor as Regressor
import imageio as io


def _longest_run(bool_array: np.ndarray) -> slice:
Expand All @@ -22,7 +21,7 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
"""Calculate photon sensitivity

Args:
movie (np.array): A movie in the format (height, width, time).
movie (np.array): A movie in the format (time, height, width).
count_weight_gamma: 0.00001=weigh each intensity level equally,
1.0=weigh each intensity in proportion to pixel counts.

Expand All @@ -40,10 +39,10 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
), f"A three dimensional (Height, Width, Time) grayscale movie is expected, got {movie.ndim}"

movie = np.maximum(0, movie.astype(np.int32, copy=False))
intensity = (movie[:, :, :-1] + movie[:, :, 1:] + 1) // 2
difference = movie[:, :, :-1].astype(np.float32) - movie[:, :, 1:]
intensity = (movie[:-1, :, :] + movie[1:, :, :] + 1) // 2
difference = movie[:-1, :, :].astype(np.float32) - movie[1:, :, :]

select = intensity > 0
select = intensity > 0 # discard non-positive values
intensity = intensity[select]
difference = difference[select]

Expand Down Expand Up @@ -77,39 +76,3 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
sensitivity=sensitivity,
zero_level=zero_level,
)

def make_anscombe_lookup(sensitivity: float, input_max: int=0x7fff, beta: float=0.5):
"""
Compute the Anscombe lookup table.
The lookup converts a linear grayscale image into a uniform variance image.
:param sensitivity: the size of one photon in the linear input image.
:param input_max: the maximum value in the input
:param beta: the grayscale quantization step expressed in units of noise std dev
"""
# produce anscombe lookup_table
xx = np.r_[:input_max + 1] / sensitivity
lookup = np.uint8(
2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8)))
return lookup


def make_inverse_lookup(lookup):
_, inverse = np.unique(lookup, return_index=True)
inverse += (np.r_[:inverse.size] / inverse.size * (inverse[-1] - inverse[-2])/2).astype(np.int16)
return inverse


def lookup(movie, LUT):
"""
Apply lookup table LUT to input movie
"""
return LUT[np.maximum(0, np.minimum(movie, LUT.size-1))]


def save_movie(movie, path, scale=1, format='gif'):
if format == "gif":
with io.get_writer(path, mode='I', duration=.01, loop=False) as f:
for frame in movie:
f.append_data(scale * frame)
else:
raise NotImplementedError(f"Format {format} is not implemented")
53 changes: 0 additions & 53 deletions src/poisson_numcodecs/poisson.py

This file was deleted.

16 changes: 9 additions & 7 deletions tests/test_poisson_codec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from poisson_numcodecs import Poisson
from poisson_numcodecs import PoissonCodec
import numpy as np
import pytest

Expand All @@ -11,28 +11,30 @@ def make_poisson_ramp_signals(shape=(10, 1, 1), min_rate=1, max_rate=5, dtype="i
output_array = np.zeros(shape, dtype=dtype)
for x_ind in range(x):
for y_ind in range(y):
output_array[x_ind, y_ind, :] = np.random.poisson(
output_array[x_ind, y_ind, :] = sensitivity * np.random.poisson(
np.arange(min_rate, max_rate, (max_rate-min_rate)/times))
return output_array
return output_array.astype('int16')

sensitivity = 100.0

@pytest.fixture
def test_data(dtype="int16"):
test2d = make_poisson_ramp_signals(shape=(50, 1, 1), min_rate=1, max_rate=5, dtype=dtype)
test2d = make_poisson_ramp_signals(shape=(50, 1, 1), min_rate=1, max_rate=5, dtype=dtype)
test2d_long = make_poisson_ramp_signals(shape=(1, 50, 1), min_rate=1, max_rate=5, dtype=dtype)
return [test2d, test2d_long]

def test_poisson_encode_decode(test_data):
poisson_codec = Poisson(
poisson_codec = PoissonCodec(
zero_level=0,
photon_sensitivity=1.0,
photon_sensitivity=sensitivity,
encoded_dtype='uint8',
decoded_dtype='int16'
)

for example_data in test_data:
encoded = poisson_codec.encode(example_data)
decoded = poisson_codec.decode(encoded)
np.testing.assert_allclose(decoded, example_data, atol=1e-3)
assert np.abs(decoded - example_data).max() < sensitivity / 2

if __name__ == '__main__':
list_data = test_data("int16")
Expand Down
Loading