diff --git a/LICENSE b/LICENSE index d8f2a22..e9f5af8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 Allen Institute for Neural Dynamics +Copyright (c) 2024, Allen Institute for Neural Dynamics, DataJoint Inc, CatalystNeuro, and contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/examples/workbook.ipynb b/examples/workbook.ipynb index 077ea28..0cbc48b 100644 --- a/examples/workbook.ipynb +++ b/examples/workbook.ipynb @@ -9,12 +9,12 @@ "source": [ "%matplotlib inline\n", "from matplotlib import pyplot as plt\n", - "\n", + "from numcodecs import Blosc\n", "import os\n", "import numpy as np\n", "from poisson_numcodecs.Poisson import Poisson\n", "from poisson_numcodecs import estimate\n", - "import zarr" + "import zarr\n" ] }, { @@ -79,92 +79,90 @@ "source": [ "# make compression lookup tables\n", "zero = np.int16(np.round(qs['zero_level']))\n", - "LUT1, LUT2 = estimate.make_luts(\n", - " zero_level=0, \n", - " sensitivity=qs['sensitivity'],\n", - " input_max=data.max() - zero,\n", - " beta=0.5\n", - ")" + "LUT1 = estimate.make_anscombe_lookup(sensitivity)\n", + "LUT2 = estimate.make_inverse_lookup(LUT1)" ] }, { "cell_type": "code", "execution_count": null, - "id": "cf56b085", + "id": "a6d75d83", "metadata": {}, "outputs": [], "source": [ - "plt.plot(LUT1)" + "# save compressed video as .gif\n", + "compressed = estimate.lookup(data - zero, LUT1)\n", + "gif_path = 'test.gif'\n", + "scale = 255//np.max(compressed) # this makes the gif brighter. Use scale=1 normally\n", + "estimate.save_movie(compressed, gif_path, scale=scale) \n", + "print(f'Compression ratio: {np.prod(data.shape)*2 / os.path.getsize(gif_path):0.2f}')" ] }, { "cell_type": "code", "execution_count": null, - "id": "a6d75d83", + "id": "6645b70f", "metadata": {}, "outputs": [], "source": [ - "# save compressed video as .gif\n", - "compressed = estimate.lookup(data - zero, LUT1)\n", - "gif_path = 'test.gif'\n", - "scale = 255//np.max(compressed) # this makes the gif brighter. Use scale=1 normally\n", - "estimate.save_movie(compressed, gif_path, scale=scale) \n", - "print(f'Compression ratio: {np.prod(data.shape)*2 / os.path.getsize(gif_path):0.2f}')" + "# instantiate Poisson object\n", + "poisson_filter = Poisson(zero_level, sensitivity)" ] }, { "cell_type": "code", "execution_count": null, - "id": "f5dbd468", + "id": "560e02fa", "metadata": {}, "outputs": [], "source": [ - "!open test.gif" + "# using default Zarr compressor\n", + "img = zarr.array(data, filters=[poisson_filter], compressor=Blosc(cname='zstd', clevel=1))\n", + "img.info" ] }, { "cell_type": "code", "execution_count": null, - "id": "c58b5a86", + "id": "0243f913", "metadata": {}, "outputs": [], "source": [ - "dark_signal = 0\n", - "signal_to_photon_gain = 96.0\n", - "encoded_dtype = np.int16\n", - "decoded_dtype = np.int8\n", - "integer_per_photon = 2" + "img = zarr.open('im.zarr')" ] }, { "cell_type": "code", "execution_count": null, - "id": "7f3535d2", + "id": "87de828b", "metadata": {}, "outputs": [], "source": [ - "# instantiate Poisson object\n", - "poisson_filter = Poisson(zero_level, sensitivity)\n", - "\n", - "# using default Zarr compressor\n", - "photon_data = zarr.array(data, filters=[poisson_filter])\n", - "data_read = photon_data[:]" + "plt.imshow(img[10,:,:])" ] }, { "cell_type": "code", "execution_count": null, - "id": "328f8c2f", + "id": "e283c924", "metadata": {}, "outputs": [], + "source": [ + "ls -l" + ] + }, + { + "cell_type": "markdown", + "id": "ae009d7c", + "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { - "display_name": "benv", + "display_name": "Python 3", "language": "python", - "name": "benv" + "name": "python3" }, "language_info": { "codemirror_mode": { diff --git a/src/poisson_numcodecs/Poisson.py b/src/poisson_numcodecs/Poisson.py index 49f3082..da0a385 100644 --- a/src/poisson_numcodecs/Poisson.py +++ b/src/poisson_numcodecs/Poisson.py @@ -5,6 +5,7 @@ import numcodecs from numcodecs.abc import Codec from numcodecs.compat import ndarray_copy +from . import estimate ### NUMCODECS Codec ### class Poisson(Codec): @@ -13,50 +14,41 @@ class Poisson(Codec): Parameters ---------- - dark_signal : float + zero_level : float Signal level when no photons are recorded. This should pre-computed or measured directly on the instrument. - signal_to_photon_gain : float + photon_sensitivity : float Conversion scalor to convert the measure signal into absolute photon numbers. This should pre-computed or measured directly on the instrument. """ codec_id = "poisson" - def __init__(self, dark_signal, - signal_to_photon_gain, + def __init__(self, + zero_level, + photon_sensitivity, encoded_dtype='int8', decoded_dtype='int16', - integer_per_photon=4, - ): - - self.dark_signal = dark_signal - self.signal_to_photon_gain = signal_to_photon_gain + ): + self.zero_level = zero_level + self.photon_sensitivity = photon_sensitivity self.encoded_dtype = encoded_dtype self.decoded_dtype = decoded_dtype - self.integer_per_photon = integer_per_photon - def encode(self, buf): - enc = np.zeros(buf.shape, dtype=self.encoded_dtype) - centered = (buf.astype('float') - self.dark_signal) / self.signal_to_photon_gain - enc = self.integer_per_photon * (np.sqrt(np.maximum(0, centered))) - enc = enc.astype(self.encoded_dtype) - return enc + def encode(self, buf: np.array) -> bytes: + lookup = estimate.make_anscombe_lookup(self.photon_sensitivity) + encoded = estimate.lookup(buf, lookup) + shape = [encoded.ndim] + list(encoded.shape) + shape = np.array(shape, dtype='uint8') + return shape.tobytes() + encoded.astype(self.encoded_dtype).tobytes() - def decode(self, buf, out=None): - dec = ((buf.astype('float') / self.integer_per_photon)**2) * self.signal_to_photon_gain + self.dark_signal - outarray = np.round(dec) - outarray = ndarray_copy(outarray, out) - return outarray.astype(self.decoded_dtype) + def decode(self, buf: bytes, out=None) -> np.array: + lookup = estimate.make_anscombe_lookup(self.photon_sensitivity) + inverse = estimate.make_inverse_lookup(lookup) + ndims = int(buf[0]) + shape = [int(_) for _ in buf[1:ndims+1]] + arr = np.frombuffer(buf[ndims+1:], dtype='uint8').reshape(shape) + decoded = estimate.lookup(arr, inverse) + return decoded.astype(self.decoded_dtype) - def get_config(self): - # override to handle encoding dtypes - return dict( - id=self.codec_id, - dark_signal=self.dark_signal, - signal_to_photon_gain=self.signal_to_photon_gain, - encoded_dtype=self.encoded_dtype, - decoded_dtype=self.decoded_dtype, - integer_per_photon=self.integer_per_photon - ) numcodecs.register_codec(Poisson) \ No newline at end of file diff --git a/src/poisson_numcodecs/estimate.py b/src/poisson_numcodecs/estimate.py index c8397ff..f99e3c3 100644 --- a/src/poisson_numcodecs/estimate.py +++ b/src/poisson_numcodecs/estimate.py @@ -78,26 +78,25 @@ def compute_sensitivity(movie: np.array, count_weight_gamma: float=0.2) -> dict: zero_level=zero_level, ) -def make_luts(zero_level: int, sensitivity: float, input_max: int, beta: float=0.5): +def make_anscombe_lookup(sensitivity: float, input_max: int=0x7fff, beta: float=0.5): """ - Compute lookup tables LUT1 and LUT2. - LUT1 converts a linear grayscale image into a uniform variance image. - LUT2 is the inverse of LUT1 - :param zero_level: the input level correspoding to zero photon rate + Compute the Anscombe lookup table. + The lookup converts a linear grayscale image into a uniform variance image. :param sensitivity: the size of one photon in the linear input image. - :param beta: the grayscale quantization step in units of noise std dev + :param input_max: the maximum value in the input + :param beta: the grayscale quantization step expressed in units of noise std dev """ - # produce anscombe LUT1 and LUT2 - xx = (np.r_[:input_max + 1] - zero_level) / sensitivity - zero_slope = 1 / beta / np.sqrt(3/8) - offset = -xx.min() * zero_slope - LUT1 = np.uint8( - offset + - (xx < 0) * (xx * zero_slope) + - (xx >= 0) * (2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8)))) - _, LUT2 = np.unique(LUT1, return_index=True) - LUT2 += (np.r_[:LUT2.size] / LUT2.size * (LUT2[-1] - LUT2[-2])/2).astype(np.int16) - return LUT1, LUT2.astype(np.int16) + # produce anscombe lookup_table + xx = np.r_[:input_max + 1] / sensitivity + lookup = np.uint8( + 2.0 / beta * (np.sqrt(np.maximum(0, xx) + 3/8) - np.sqrt(3/8))) + return lookup + + +def make_inverse_lookup(lookup): + _, inverse = np.unique(lookup, return_index=True) + inverse += (np.r_[:inverse.size] / inverse.size * (inverse[-1] - inverse[-2])/2).astype(np.int16) + return inverse def lookup(movie, LUT): diff --git a/tests/test_poisson_codec.py b/tests/test_poisson_codec.py index de80cdd..97b411b 100644 --- a/tests/test_poisson_codec.py +++ b/tests/test_poisson_codec.py @@ -11,25 +11,28 @@ def make_poisson_ramp_signals(shape=(10, 1, 1), min_rate=1, max_rate=5, dtype="i output_array = np.zeros(shape, dtype=dtype) for x_ind in range(x): for y_ind in range(y): - output_array[x_ind, y_ind, :] = np.random.poisson(np.arange(min_rate, max_rate, (max_rate-min_rate)/times)) + output_array[x_ind, y_ind, :] = np.random.poisson( + np.arange(min_rate, max_rate, (max_rate-min_rate)/times)) return output_array @pytest.fixture def test_data(dtype="int16"): test2d = make_poisson_ramp_signals(shape=(50, 1, 1), min_rate=1, max_rate=5, dtype=dtype) test2d_long = make_poisson_ramp_signals(shape=(1, 50, 1), min_rate=1, max_rate=5, dtype=dtype) - return [test2d, test2d_long] def test_poisson_encode_decode(test_data): - for integer_per_photon in [8, 9, 10]: - poisson_codec = Poisson(dark_signal=0, signal_to_photon_gain=1.0 - ,encoded_dtype='int16', decoded_dtype='int16', - integer_per_photon=integer_per_photon) - for example_data in test_data: - encoded = poisson_codec.encode(example_data) - decoded = poisson_codec.decode(encoded) - np.testing.assert_allclose(decoded, example_data, atol=1e-3) + poisson_codec = Poisson( + zero_level=0, + photon_sensitivity=1.0, + encoded_dtype='uint8', + decoded_dtype='int16' + ) + + for example_data in test_data: + encoded = poisson_codec.encode(example_data) + decoded = poisson_codec.decode(encoded) + np.testing.assert_allclose(decoded, example_data, atol=1e-3) if __name__ == '__main__': list_data = test_data("int16")