Skip to content

Commit

Permalink
Merge pull request #14 from dimitri-yatsenko/main
Browse files Browse the repository at this point in the history
refactored for better package design
  • Loading branch information
dimitri-yatsenko authored Mar 5, 2024
2 parents 5177cae + 6d3ac18 commit e5406f0
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 168 deletions.
251 changes: 187 additions & 64 deletions examples/workbook.ipynb

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions src/poisson_numcodecs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .poisson import Poisson

__all__ = ["Poisson"]
from .codec import PoissonCodec

__all__ = ["PoissonCodec"]
98 changes: 98 additions & 0 deletions src/poisson_numcodecs/codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Numcodecs Codec implementation for Poisson noise calibration
"""
import numpy as np
import numcodecs
from numcodecs.abc import Codec

def make_anscombe_lookup(
sensitivity: float,
input_max: int=0x7fff,
zero_level: int=0,
beta: float=0.5,
output_type='uint8'):
"""
Compute the Anscombe lookup table.
The lookup converts a linear grayscale image into a uniform variance image.
:param sensitivity: the size of one photon in the linear input image.
:param input_max: the maximum value in the input
:param beta: the grayscale quantization step expressed in units of noise std dev
"""
xx = (np.r_[:input_max + 1] - zero_level) / sensitivity # input expressed in photon rates
zero_slope = 1 / beta / np.sqrt(3/8) # slope for negative values
offset = zero_level * zero_slope / sensitivity
lookup_table = np.round(offset +
(xx < 0) * (xx * zero_slope) +
(xx >= 0) * (2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8))))
lookup = lookup_table.astype(output_type)
assert np.diff(lookup_table).min() >= 0, "non-monotonic lookup generated"
return lookup


def make_inverse_lookup(lookup_table: np.ndarray, output_type='int16') -> np.ndarray:
"""Compute the inverse lookup table for a monotonic forward lookup table."""
_, inv1 = np.unique(lookup_table, return_index=True) # first entry
_, inv2 = np.unique(lookup_table[::-1], return_index=True) # last entry
inverse = (inv1 + lookup_table.size - 1 - inv2)/2
return inverse.astype(output_type)


def lookup(movie: np.ndarray, lookup_table: np.ndarray) -> np.ndarray:
"""Apply lookup table to movie"""
return lookup_table[np.maximum(0, np.minimum(movie, lookup_table.size-1))]


class PoissonCodec(Codec):
"""Codec for 3-dimensional Filter. The codec assumes that input data are of shape:
(time, x, y).
Parameters
----------
zero_level : float
Signal level when no photons are recorded.
This should pre-computed or measured directly on the instrument.
photon_sensitivity : float
Conversion scalor to convert the measure signal into absolute photon numbers.
This should pre-computed or measured directly on the instrument.
"""
codec_id = "poisson"

def __init__(self,
zero_level,
photon_sensitivity,
encoded_dtype='int8',
decoded_dtype='int16',
):
self.zero_level = zero_level
self.photon_sensitivity = photon_sensitivity
self.encoded_dtype = encoded_dtype
self.decoded_dtype = decoded_dtype

def encode(self, buf: np.ndarray) -> np.ndarray:
lookup_table = make_anscombe_lookup(
self.photon_sensitivity,
output_type=self.encoded_dtype,
zero_level=self.zero_level,
)
encoded = lookup(buf, lookup_table)
shape = [encoded.ndim] + list(encoded.shape)
shape = np.array(shape, dtype='uint8')
return shape.tobytes() + encoded.astype(self.encoded_dtype).tobytes()

def decode(self, buf: bytes, out=None) -> np.ndarray:
lookup_table = make_anscombe_lookup(
self.photon_sensitivity,
output_type=self.encoded_dtype,
zero_level=self.zero_level,
)
inverse_table = make_inverse_lookup(
lookup_table,
output_type=self.decoded_dtype
)
ndims = int(buf[0])
shape = [int(_) for _ in buf[1:ndims+1]]
decoded = np.frombuffer(buf[ndims+1:], dtype=self.encoded_dtype).reshape(shape)
return lookup(decoded, inverse_table).astype(self.decoded_dtype)


numcodecs.register_codec(PoissonCodec)
45 changes: 4 additions & 41 deletions src/poisson_numcodecs/estimate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np
from sklearn.linear_model import HuberRegressor as Regressor
import imageio as io


def _longest_run(bool_array: np.ndarray) -> slice:
Expand All @@ -22,7 +21,7 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
"""Calculate photon sensitivity
Args:
movie (np.array): A movie in the format (height, width, time).
movie (np.array): A movie in the format (time, height, width).
count_weight_gamma: 0.00001=weigh each intensity level equally,
1.0=weigh each intensity in proportion to pixel counts.
Expand All @@ -40,10 +39,10 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
), f"A three dimensional (Height, Width, Time) grayscale movie is expected, got {movie.ndim}"

movie = np.maximum(0, movie.astype(np.int32, copy=False))
intensity = (movie[:, :, :-1] + movie[:, :, 1:] + 1) // 2
difference = movie[:, :, :-1].astype(np.float32) - movie[:, :, 1:]
intensity = (movie[:-1, :, :] + movie[1:, :, :] + 1) // 2
difference = movie[:-1, :, :].astype(np.float32) - movie[1:, :, :]

select = intensity > 0
select = intensity > 0 # discard non-positive values
intensity = intensity[select]
difference = difference[select]

Expand Down Expand Up @@ -77,39 +76,3 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict:
sensitivity=sensitivity,
zero_level=zero_level,
)

def make_anscombe_lookup(sensitivity: float, input_max: int=0x7fff, beta: float=0.5):
"""
Compute the Anscombe lookup table.
The lookup converts a linear grayscale image into a uniform variance image.
:param sensitivity: the size of one photon in the linear input image.
:param input_max: the maximum value in the input
:param beta: the grayscale quantization step expressed in units of noise std dev
"""
# produce anscombe lookup_table
xx = np.r_[:input_max + 1] / sensitivity
lookup = np.uint8(
2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8)))
return lookup


def make_inverse_lookup(lookup):
_, inverse = np.unique(lookup, return_index=True)
inverse += (np.r_[:inverse.size] / inverse.size * (inverse[-1] - inverse[-2])/2).astype(np.int16)
return inverse


def lookup(movie, LUT):
"""
Apply lookup table LUT to input movie
"""
return LUT[np.maximum(0, np.minimum(movie, LUT.size-1))]


def save_movie(movie, path, scale=1, format='gif'):
if format == "gif":
with io.get_writer(path, mode='I', duration=.01, loop=False) as f:
for frame in movie:
f.append_data(scale * frame)
else:
raise NotImplementedError(f"Format {format} is not implemented")
53 changes: 0 additions & 53 deletions src/poisson_numcodecs/poisson.py

This file was deleted.

16 changes: 9 additions & 7 deletions tests/test_poisson_codec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from poisson_numcodecs import Poisson
from poisson_numcodecs import PoissonCodec
import numpy as np
import pytest

Expand All @@ -11,28 +11,30 @@ def make_poisson_ramp_signals(shape=(10, 1, 1), min_rate=1, max_rate=5, dtype="i
output_array = np.zeros(shape, dtype=dtype)
for x_ind in range(x):
for y_ind in range(y):
output_array[x_ind, y_ind, :] = np.random.poisson(
output_array[x_ind, y_ind, :] = sensitivity * np.random.poisson(
np.arange(min_rate, max_rate, (max_rate-min_rate)/times))
return output_array
return output_array.astype('int16')

sensitivity = 100.0

@pytest.fixture
def test_data(dtype="int16"):
test2d = make_poisson_ramp_signals(shape=(50, 1, 1), min_rate=1, max_rate=5, dtype=dtype)
test2d = make_poisson_ramp_signals(shape=(50, 1, 1), min_rate=1, max_rate=5, dtype=dtype)
test2d_long = make_poisson_ramp_signals(shape=(1, 50, 1), min_rate=1, max_rate=5, dtype=dtype)
return [test2d, test2d_long]

def test_poisson_encode_decode(test_data):
poisson_codec = Poisson(
poisson_codec = PoissonCodec(
zero_level=0,
photon_sensitivity=1.0,
photon_sensitivity=sensitivity,
encoded_dtype='uint8',
decoded_dtype='int16'
)

for example_data in test_data:
encoded = poisson_codec.encode(example_data)
decoded = poisson_codec.decode(encoded)
np.testing.assert_allclose(decoded, example_data, atol=1e-3)
assert np.abs(decoded - example_data).max() < sensitivity / 2

if __name__ == '__main__':
list_data = test_data("int16")
Expand Down

0 comments on commit e5406f0

Please sign in to comment.