From add62a4d93801b36671bf17b5a2911293314e6b7 Mon Sep 17 00:00:00 2001 From: Maxine Hartnett Date: Fri, 30 Aug 2024 09:17:46 -0600 Subject: [PATCH] Updating ruff and mypy errors --- examples/Dockerfile.processing | 3 +- imap_processing/glows/l1b/glows_l1b_data.py | 4 +- imap_processing/mag/l0/mag_l0_data.py | 16 +- imap_processing/mag/l1a/mag_l1a_data.py | 267 +++++++++++--------- imap_processing/tests/mag/test_mag_decom.py | 7 - imap_processing/tests/mag/test_mag_l1a.py | 127 ++++++++-- 6 files changed, 255 insertions(+), 169 deletions(-) diff --git a/examples/Dockerfile.processing b/examples/Dockerfile.processing index 6c2ad65a0..4e2c1604b 100644 --- a/examples/Dockerfile.processing +++ b/examples/Dockerfile.processing @@ -4,7 +4,8 @@ FROM public.ecr.aws/docker/library/python:3.10-slim # TODO: delete this section once imap_processing is released ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y git -RUN pip install git+https://github.com/IMAP-Science-Operations-Center/imap_processing.git@dev +# RUN pip install git+https://github.com/IMAP-Science-Operations-Center/imap_processing.git@dev +RUN pip install git+https://github.com/maxinelasp/imap_processing.git@mag_l1a_compression # Uncomment this once imap_processing is released # RUN pip install imap_processing diff --git a/imap_processing/glows/l1b/glows_l1b_data.py b/imap_processing/glows/l1b/glows_l1b_data.py index b615431c5..3c776c97c 100644 --- a/imap_processing/glows/l1b/glows_l1b_data.py +++ b/imap_processing/glows/l1b/glows_l1b_data.py @@ -495,8 +495,8 @@ class HistogramL1B: spin_period_ground_std_dev: np.double = field(init=False) # Spin table position_angle_offset_average: np.double = field(init=False) # retrieved from SPICE position_angle_offset_std_dev: np.double = field(init=False) # retrieved from SPICE - spin_axis_orientation_std_dev: np.double = field(init=False) # retrieved from SPICE - CK - spin_axis_orientation_average: np.double = field(init=False) # retrieved from SPICE - CK + spin_axis_orientation_std_dev: np.double = field(init=False) # SPICE - CK + spin_axis_orientation_average: np.double = field(init=False) # SPICE - CK spacecraft_location_average: np.ndarray = field(init=False) # retrieved from SPICE spacecraft_location_std_dev: np.ndarray = field(init=False) # retrieved from SPICE spacecraft_velocity_average: np.ndarray = field(init=False) # retrieved from SPICE diff --git a/imap_processing/mag/l0/mag_l0_data.py b/imap_processing/mag/l0/mag_l0_data.py index b8c524e4d..927fe092c 100644 --- a/imap_processing/mag/l0/mag_l0_data.py +++ b/imap_processing/mag/l0/mag_l0_data.py @@ -106,15 +106,13 @@ def __post_init__(self) -> None: Also convert encoded "VECSEC" (vectors per second) into proper vectors per second values. """ - # Convert string output from space_packet_parser to numpy array of - # big-endian bytes - self.VECTORS = np.frombuffer( - int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big"), - # type: ignore[arg-type] - # TODO Check MYPY Error: Argument 1 to "int" has incompatible type - # "Union[ndarray[Any, Any], str]"; expected "Union[str, bytes, bytearray]" - dtype=np.dtype(">B"), - ) + if isinstance(self.VECTORS, str): + # Convert string output from space_packet_parser to numpy array of + # big-endian bytes + self.VECTORS = np.frombuffer( + int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big"), + dtype=np.dtype(">B"), + ) # Remove buffer from end of vectors. Vector data needs to be in 50 bit chunks, # and may have an extra byte at the end from CCSDS padding. diff --git a/imap_processing/mag/l1a/mag_l1a_data.py b/imap_processing/mag/l1a/mag_l1a_data.py index 2df9f5e74..757e365c8 100644 --- a/imap_processing/mag/l1a/mag_l1a_data.py +++ b/imap_processing/mag/l1a/mag_l1a_data.py @@ -73,7 +73,7 @@ def to_seconds(self) -> float: seconds : float Time in seconds. """ - return self.coarse_time + self.fine_time / MAX_FINE_TIME + return float(self.coarse_time + self.fine_time / MAX_FINE_TIME) @dataclass @@ -289,27 +289,27 @@ def calculate_vector_time( @staticmethod def process_vector_data( - vector_data: np.ndarray, - primary_count: int, - secondary_count: int, - compression: int, + vector_data: np.ndarray, + primary_count: int, + secondary_count: int, + compression: int, ) -> tuple[np.ndarray, np.ndarray]: """ - Process raw vector data into Vectors. + Transform raw vector data into Vectors. Vectors are grouped into primary sensor and secondary sensor, and returned as a tuple (primary sensor vectors, secondary sensor vectors). Parameters ---------- - vector_data: numpy.ndarray + vector_data : numpy.ndarray Raw vector data, in bytes. Contains both primary and secondary vector data. Can be either compressed or uncompressed. - primary_count: int + primary_count : int Count of the number of primary vectors. - secondary_count: int + secondary_count : int Count of the number of secondary vectors. - compression: int + compression : int Flag indicating if the data is compressed (1) or uncompressed (0). Returns @@ -317,7 +317,6 @@ def process_vector_data( (primary, secondary): (numpy.ndarray, numpy.ndarray) Two arrays, each containing tuples of (x, y, z, sample_range) for each vector sample. - """ if compression: return MagL1a.process_compressed_vectors( @@ -330,7 +329,7 @@ def process_vector_data( @staticmethod def process_uncompressed_vectors( - vector_data: np.ndarray, primary_count: int, secondary_count: int + vector_data: np.ndarray, primary_count: int, secondary_count: int ) -> tuple[np.ndarray, np.ndarray]: """ Given raw uncompressed packet data, process into Vectors. @@ -393,74 +392,74 @@ def to_signed16(n: int) -> int: if i % 4 == 0: # start at bit 0, take 8 bits + 8bits # pos = 0, 25, 50... x = ( - ((vector_data[pos + 0] & 0xFF) << 8) - | ((vector_data[pos + 1] & 0xFF) << 0) - ) & 0xFFFF + ((vector_data[pos + 0] & 0xFF) << 8) + | ((vector_data[pos + 1] & 0xFF) << 0) + ) & 0xFFFF y = ( - ((vector_data[pos + 2] & 0xFF) << 8) - | ((vector_data[pos + 3] & 0xFF) << 0) - ) & 0xFFFF + ((vector_data[pos + 2] & 0xFF) << 8) + | ((vector_data[pos + 3] & 0xFF) << 0) + ) & 0xFFFF z = ( - ((vector_data[pos + 4] & 0xFF) << 8) - | ((vector_data[pos + 5] & 0xFF) << 0) - ) & 0xFFFF + ((vector_data[pos + 4] & 0xFF) << 8) + | ((vector_data[pos + 5] & 0xFF) << 0) + ) & 0xFFFF rng = (vector_data[pos + 6] >> 6) & 0x3 pos += 6 elif i % 4 == 1: # start at bit 2, take 6 bits, 8 bit, 2 bits per vector # pos = 6, 31... x = ( - ((vector_data[pos + 0] & 0x3F) << 10) - | ((vector_data[pos + 1] & 0xFF) << 2) - | ((vector_data[pos + 2] >> 6) & 0x03) - ) & 0xFFFF + ((vector_data[pos + 0] & 0x3F) << 10) + | ((vector_data[pos + 1] & 0xFF) << 2) + | ((vector_data[pos + 2] >> 6) & 0x03) + ) & 0xFFFF y = ( - ((vector_data[pos + 2] & 0x3F) << 10) - | ((vector_data[pos + 3] & 0xFF) << 2) - | ((vector_data[pos + 4] >> 6) & 0x03) - ) & 0xFFFF + ((vector_data[pos + 2] & 0x3F) << 10) + | ((vector_data[pos + 3] & 0xFF) << 2) + | ((vector_data[pos + 4] >> 6) & 0x03) + ) & 0xFFFF z = ( - ((vector_data[pos + 4] & 0x3F) << 10) - | ((vector_data[pos + 5] & 0xFF) << 2) - | ((vector_data[pos + 6] >> 6) & 0x03) - ) & 0xFFFF + ((vector_data[pos + 4] & 0x3F) << 10) + | ((vector_data[pos + 5] & 0xFF) << 2) + | ((vector_data[pos + 6] >> 6) & 0x03) + ) & 0xFFFF rng = (vector_data[pos + 6] >> 4) & 0x3 pos += 6 elif i % 4 == 2: # start at bit 4, take 4 bits, 8 bits, 4 bits per vector # pos = 12, 37... x = ( - ((vector_data[pos + 0] & 0x0F) << 12) - | ((vector_data[pos + 1] & 0xFF) << 4) - | ((vector_data[pos + 2] >> 4) & 0x0F) - ) & 0xFFFF + ((vector_data[pos + 0] & 0x0F) << 12) + | ((vector_data[pos + 1] & 0xFF) << 4) + | ((vector_data[pos + 2] >> 4) & 0x0F) + ) & 0xFFFF y = ( - ((vector_data[pos + 2] & 0x0F) << 12) - | ((vector_data[pos + 3] & 0xFF) << 4) - | ((vector_data[pos + 4] >> 4) & 0x0F) - ) & 0xFFFF + ((vector_data[pos + 2] & 0x0F) << 12) + | ((vector_data[pos + 3] & 0xFF) << 4) + | ((vector_data[pos + 4] >> 4) & 0x0F) + ) & 0xFFFF z = ( - ((vector_data[pos + 4] & 0x0F) << 12) - | ((vector_data[pos + 5] & 0xFF) << 4) - | ((vector_data[pos + 6] >> 4) & 0x0F) - ) & 0xFFFF + ((vector_data[pos + 4] & 0x0F) << 12) + | ((vector_data[pos + 5] & 0xFF) << 4) + | ((vector_data[pos + 6] >> 4) & 0x0F) + ) & 0xFFFF rng = (vector_data[pos + 6] >> 2) & 0x3 pos += 6 elif i % 4 == 3: # start at bit 6, take 2 bits, 8 bits, 6 bits per vector # pos = 18, 43... x = ( - ((vector_data[pos + 0] & 0x03) << 14) - | ((vector_data[pos + 1] & 0xFF) << 6) - | ((vector_data[pos + 2] >> 2) & 0x3F) - ) & 0xFFFF + ((vector_data[pos + 0] & 0x03) << 14) + | ((vector_data[pos + 1] & 0xFF) << 6) + | ((vector_data[pos + 2] >> 2) & 0x3F) + ) & 0xFFFF y = ( - ((vector_data[pos + 2] & 0x03) << 14) - | ((vector_data[pos + 3] & 0xFF) << 6) - | ((vector_data[pos + 4] >> 2) & 0x3F) - ) & 0xFFFF + ((vector_data[pos + 2] & 0x03) << 14) + | ((vector_data[pos + 3] & 0xFF) << 6) + | ((vector_data[pos + 4] >> 2) & 0x3F) + ) & 0xFFFF z = ( - ((vector_data[pos + 4] & 0x03) << 14) - | ((vector_data[pos + 5] & 0xFF) << 6) - | ((vector_data[pos + 6] >> 2) & 0x3F) - ) & 0xFFFF + ((vector_data[pos + 4] & 0x03) << 14) + | ((vector_data[pos + 5] & 0xFF) << 6) + | ((vector_data[pos + 6] >> 2) & 0x3F) + ) & 0xFFFF rng = (vector_data[pos + 6] >> 0) & 0x3 pos += 7 @@ -476,8 +475,8 @@ def to_signed16(n: int) -> int: ) @staticmethod - def process_compressed_vectors( - vector_data: np.ndarray, primary_count: int, secondary_count: int + def process_compressed_vectors( # noqa: PLR0912, PLR0915 + vector_data: np.ndarray, primary_count: int, secondary_count: int ) -> tuple[np.ndarray, np.ndarray]: """ Given raw compressed packet data, process into Vectors. @@ -511,11 +510,11 @@ def process_compressed_vectors( Parameters ---------- - vector_data: numpy.ndarray + vector_data : numpy.ndarray Raw vector data, in bytes. Contains both primary and secondary vector data. - primary_count: int + primary_count : int Count of the number of primary vectors. - secondary_count: int + secondary_count : int Count of the number of secondary vectors. Returns @@ -524,7 +523,6 @@ def process_compressed_vectors( Two arrays, each containing tuples of (x, y, z, sample_range) for each vector sample. """ - bit_array = np.unpackbits(vector_data) # The first 8 bits are a header - 6 bits to indicate the compression width, # 1 bit to indicate if there is a range data section, and 1 bit spare. @@ -541,11 +539,13 @@ def process_compressed_vectors( bit_array[8:first_vector_width], compression_width, True ) - end_vector = len(bit_array) - ( - primary_count + secondary_count) * 2 * has_range_data_section + end_vector = ( + len(bit_array) + - (primary_count + secondary_count) * 2 * has_range_data_section + ) # Cut off the first vector width and the end range data section if it exists. - vector_bits = bit_array[first_vector_width - 1: end_vector] + vector_bits = bit_array[first_vector_width - 1 : end_vector] # Shift the bit array over one to the left, then sum them up. This is used to # find all the places where two 1s occur next to each other, because the sum @@ -568,7 +568,7 @@ def process_compressed_vectors( primary_boundaries = [sequential_ones[0] + 1] secondary_boundaries = [] vector_count = 1 - end_primary_vector = None + end_primary_vector = 0 for seq_val in sequential_ones: if vector_count > primary_count + secondary_count: @@ -576,8 +576,9 @@ def process_compressed_vectors( # Add the end indices of each primary vector to primary_boundaries # If we have 3 ones in a row, we should skip that index - if (vector_count < primary_count and - (seq_val - primary_boundaries[-1] - 1 > 1)): + if vector_count < primary_count and ( + seq_val - primary_boundaries[-1] - 1 > 1 + ): primary_boundaries.append(seq_val + 1) # 3 boundaries equal one vector @@ -585,14 +586,17 @@ def process_compressed_vectors( vector_count += 1 # If the vector length is >60 bits, we switch to uncompressed. # So we skip past all the remaining seq_ones. - if (len(primary_boundaries) > 4) and ( - primary_boundaries[-1] - primary_boundaries[ - -4] > 60) or (vector_count == 2 and primary_boundaries[-1] > 60): + if ( + (len(primary_boundaries) > 4) + and (primary_boundaries[-1] - primary_boundaries[-4] > 60) + or (vector_count == 2 and primary_boundaries[-1] > 60) + ): # Since we know how long each uncompressed vector is, # we can determine the end of the primary vectors. - end_primary_vector = (primary_boundaries[-1] + - (primary_count - vector_count) * - uncompressed_vector_size) + end_primary_vector = ( + primary_boundaries[-1] + + (primary_count - vector_count) * uncompressed_vector_size + ) vector_count = primary_count # If the vector count is equal to the primary count, we are in the first @@ -601,8 +605,11 @@ def process_compressed_vectors( # We won't have assigned end_primary_vector unless we hit uncompressed # vectors in the primary path. If there are no uncompressed values, # we can use the end of primary_boundaries. - end_primary_vector = primary_boundaries[ - -1] if end_primary_vector is None else end_primary_vector + end_primary_vector = ( + primary_boundaries[-1] + if end_primary_vector == 0 + else end_primary_vector + ) if seq_val >= end_primary_vector + uncompressed_vector_size: # We have found the first secondary vector secondary_boundaries = [seq_val] @@ -610,8 +617,10 @@ def process_compressed_vectors( # If we're greater than primary_count, we are in the secondary vectors. # Like before, we skip indices with 3 ones. - if (vector_count > primary_count and - seq_val - secondary_boundaries[-1] - 1 > 1): + if ( + vector_count > primary_count + and seq_val - secondary_boundaries[-1] - 1 > 1 + ): secondary_boundaries.append(seq_val + 1) # We have the start of the secondary vectors in # secondary_boundaries, so we need to subtract one to determine @@ -625,7 +634,7 @@ def process_compressed_vectors( # Split along the boundaries of the primary vectors. This gives us a list of # bit arrays, each corresponding to a primary value (1/3 of a vector). primary_split_bits = np.split( - vector_bits[:primary_boundaries[-1]], + vector_bits[: primary_boundaries[-1]], primary_boundaries[:-1], ) @@ -640,12 +649,14 @@ def process_compressed_vectors( primary_vector_missing = primary_count - len(primary_split_bits) // 3 - 1 vector_index = primary_count - primary_vector_missing if primary_vector_missing: - uncompressed_vectors = vector_bits[ - primary_boundaries[-1]:primary_boundaries[ - -1] + uncompressed_vector_size * primary_vector_missing] + primary_end = ( + primary_boundaries[-1] + + uncompressed_vector_size * primary_vector_missing + ) + uncompressed_vectors = vector_bits[primary_boundaries[-1] : primary_end] for i in range(0, len(uncompressed_vectors), uncompressed_vector_size): decoded_vector = MagL1a.unpack_one_vector( - uncompressed_vectors[i:i + uncompressed_vector_size], + uncompressed_vectors[i : i + uncompressed_vector_size], compression_width, True, ) @@ -655,7 +666,8 @@ def process_compressed_vectors( # Secondary vector processing first_secondary_vector = MagL1a.unpack_one_vector( vector_bits[ - end_primary_vector:end_primary_vector + uncompressed_vector_size], + end_primary_vector : end_primary_vector + uncompressed_vector_size + ], compression_width, True, ) @@ -663,7 +675,7 @@ def process_compressed_vectors( # Split up the bit array, skipping past the primary vector and uncompressed # starting vector secondary_split_bits = np.split( - vector_bits[:secondary_boundaries[-1]], secondary_boundaries[:-1] + vector_bits[: secondary_boundaries[-1]], secondary_boundaries[:-1] )[1:] vector_diffs = list(map(MagL1a.decode_fib_zig_zag, secondary_split_bits)) @@ -675,12 +687,14 @@ def process_compressed_vectors( secondary_vector_missing = secondary_count - len(secondary_split_bits) // 3 - 1 if secondary_vector_missing: vector_index = secondary_count - secondary_vector_missing - uncompressed_vectors = vector_bits[ - secondary_boundaries[-1]:secondary_boundaries[ - -1] + uncompressed_vector_size * secondary_vector_missing] + secondary_end = ( + secondary_boundaries[-1] + + uncompressed_vector_size * secondary_vector_missing + ) + uncompressed_vectors = vector_bits[secondary_boundaries[-1] : secondary_end] for i in range(0, len(uncompressed_vectors), uncompressed_vector_size): decoded_vector = MagL1a.unpack_one_vector( - uncompressed_vectors[i:i + uncompressed_vector_size], + uncompressed_vectors[i : i + uncompressed_vector_size], compression_width, True, ) @@ -690,29 +704,33 @@ def process_compressed_vectors( # TODO: should the range vectors include the first vector? if has_range_data_section: primary_vectors = MagL1a.process_range_data_section( - bit_array[end_vector:end_vector + primary_count * 2], primary_vectors + bit_array[end_vector : end_vector + primary_count * 2], primary_vectors ) secondary_vectors = MagL1a.process_range_data_section( - bit_array[end_vector + primary_count * 2:end_vector + ( - primary_count + secondary_count) * 2], secondary_vectors) + bit_array[ + end_vector + primary_count * 2 : end_vector + + (primary_count + secondary_count) * 2 + ], + secondary_vectors, + ) return primary_vectors, secondary_vectors @staticmethod - def process_range_data_section(range_data: np.ndarray, - vectors: np.ndarray) -> np.ndarray: + def process_range_data_section( + range_data: np.ndarray, vectors: np.ndarray + ) -> np.ndarray: """ Given a range data section and vectors, return an updated vector array. Each range value has 2 bits. range_data will have a length of n*2, where n is the number of vectors in vectors. - Parameters ---------- - range_data: numpy.ndarray + range_data : numpy.ndarray Array of range values, where each value is one bit. - vectors: + vectors : numpy.ndarray Array of vectors, where each vector is a tuple of (x, y, z, range). The range value will be overwritten by range_data, and x, y, z will remain the same. @@ -728,19 +746,19 @@ def process_range_data_section(range_data: np.ndarray, "Incorrect length for range_data, there should be two bits per vector." ) - updated_vectors = np.copy(vectors) + updated_vectors: np.ndarray = np.copy(vectors) range_str = "".join([str(i) for i in range_data]) - for i, vector in enumerate(vectors): - range_int = int(range_str[i * 2:i * 2 + 2], 2) + for i in range(len(vectors)): + range_int = int(range_str[i * 2 : i * 2 + 2], 2) updated_vectors[i][3] = range_int return updated_vectors @staticmethod def accumulate_vectors( - first_vector: np.ndarray, - vector_differences: list[int], - vector_count: int, + first_vector: np.ndarray, + vector_differences: list[int], + vector_count: int, ) -> np.ndarray: """ Given a list of differences and the first vector, return calculated vectors. @@ -756,13 +774,13 @@ def accumulate_vectors( Parameters ---------- - first_vector: numpy.ndarray + first_vector : numpy.ndarray A numpy array of 3 signed integers and a range value, representing the start vector. - vector_differences: numpy.ndarray + vector_differences : numpy.ndarray A numpy array of shape (expected_vector_count, 4) of signed integers, representing the differences between vectors. - vector_count: int + vector_count : int The expected number of vectors in the output. Returns @@ -771,7 +789,7 @@ def accumulate_vectors( A numpy array of shape (expected_vector_count, 4) of signed integers, representing the calculated vectors. """ - vectors = np.empty((vector_count, 4), dtype=np.int32) + vectors: np.ndarray = np.empty((vector_count, 4), dtype=np.int32) vectors[0] = first_vector index = 0 @@ -789,7 +807,7 @@ def accumulate_vectors( @staticmethod def unpack_one_vector( - vector_data: np.ndarray, width: int, has_range: int + vector_data: np.ndarray, width: int, has_range: int ) -> np.ndarray: """ Unpack a single vector from the vector data. @@ -799,13 +817,13 @@ def unpack_one_vector( Parameters ---------- - vector_data: numpy.ndarray + vector_data : numpy.ndarray Vector data for the vector to unpack. This is uncompressed data as a numpy array of bits (the output of np.unpackbits). - width: int + width : int The width of each vector component in bits. This needs to be a multiple of 8 (including only whole bytes). - has_range: int + has_range : int 1 if the vector data includes range data, 0 if not. The first vector always has range data, it is only if the compression fails and we revert to uncompressed data partway through that we skip the range. @@ -823,22 +841,23 @@ def unpack_one_vector( if len(vector_data) != width * 3 + 2 * has_range: raise ValueError( - f"Invalid length {len(vector_data)} for vector data. Expected {width * 3}" - f" or {width * 3 + 2} if has_range.") + f"Invalid length {len(vector_data)} for vector data. Expected " + f"{width * 3} or {width * 3 + 2} if has_range." + ) padding = np.zeros(8 - (width % 8), dtype=np.uint8) # take slices of the input data and pack from an array of bits to an array of # uint8 bytes x = np.packbits(np.concatenate((padding, vector_data[:width]))) - y = np.packbits(np.concatenate((padding, vector_data[width: 2 * width]))) - z = np.packbits(np.concatenate((padding, vector_data[2 * width: 3 * width]))) + y = np.packbits(np.concatenate((padding, vector_data[width : 2 * width]))) + z = np.packbits(np.concatenate((padding, vector_data[2 * width : 3 * width]))) range_string = "".join([str(i) for i in vector_data[-2:]]) rng = int(range_string, 2) if has_range else 0 # Convert to signed integers using twos complement - return np.array( + signed_vals: np.ndarray = np.array( [ MagL1a.twos_complement(x, width), MagL1a.twos_complement(y, width), @@ -847,10 +866,12 @@ def unpack_one_vector( ], dtype=np.int32, ) + return signed_vals @staticmethod def twos_complement(value: np.ndarray, bits: int) -> np.int32: - """Compute the two's complement of an integer. + """ + Compute the two's complement of an integer. This function will return the two's complement of a given bytearray value. The input value should be a bytearray or a numpy array of uint8 values. @@ -869,7 +890,7 @@ def twos_complement(value: np.ndarray, bits: int) -> np.int32: Returns ------- numpy.int32 - two's complement of the input value, as a signed int. + Two's complement of the input value, as a signed int. """ integer_value = int.from_bytes(value, "big") if (integer_value & (1 << (bits - 1))) != 0: @@ -885,10 +906,10 @@ def decode_fib_zig_zag(code: np.ndarray) -> int: Parameters ---------- - code: numpy.ndarray + code : numpy.ndarray The code to decode, in the form of an array of bits (eg [0, 1, 0, 1, 1]). This should always end in 2 ones (which indicates the end of a fibonacci - encoding.) + encoding). Returns ------- @@ -903,9 +924,9 @@ def decode_fib_zig_zag(code: np.ndarray) -> int: # Fibonacci decoding code = code[:-1] - value = sum(FIBONACCI_SEQUENCE[:len(code)] * code) - 1 + value: int = sum(FIBONACCI_SEQUENCE[: len(code)] * code) - 1 # Zig-zag decode (to go from uint to signed int) - value = (value >> 1) ^ (-(value & 1)) + value = int((value >> 1) ^ (-(value & 1))) return value diff --git a/imap_processing/tests/mag/test_mag_decom.py b/imap_processing/tests/mag/test_mag_decom.py index 4616860bd..0e790b528 100644 --- a/imap_processing/tests/mag/test_mag_decom.py +++ b/imap_processing/tests/mag/test_mag_decom.py @@ -1,6 +1,5 @@ from pathlib import Path -import numpy as np import pandas as pd import pytest @@ -25,12 +24,6 @@ def test_mag_decom(): packets = decom_packets(str(burst_test_file)) l0 = packets["burst"] + packets["norm"] - print(str(packets["norm"][0].VECTORS).replace(' ', ' ').replace(' ', ' ').replace(' ', ', ')) - bit_string = [str(x) for x in np.unpackbits(packets["norm"][0].VECTORS)] - print(''.join(bit_string)) - - print(packets["norm"][0]) - expected_output = pd.read_csv(current_directory / "mag_l0_test_output.csv") for index, test in enumerate(l0): diff --git a/imap_processing/tests/mag/test_mag_l1a.py b/imap_processing/tests/mag/test_mag_l1a.py index 336f9fa84..fcb54079e 100644 --- a/imap_processing/tests/mag/test_mag_l1a.py +++ b/imap_processing/tests/mag/test_mag_l1a.py @@ -371,8 +371,15 @@ def test_compressed_vector_data(expected_vectors): # TODO: if there is a number of vectors that is not a multiple of 8, # is there also padding at the end of the range section? input_data = np.array( - [int(i) for i in headers + primary_compressed + secondary_compressed + - padding + range_primary + range_secondary], + [ + int(i) + for i in headers + + primary_compressed + + secondary_compressed + + padding + + range_primary + + range_secondary + ], dtype=np.uint8, ) @@ -382,7 +389,9 @@ def test_compressed_vector_data(expected_vectors): primary_expected[i][3] = expected_range_primary[i] secondary_expected[i][3] = expected_range_secondary[i] - (primary_with_range, secondary_with_range) = MagL1a.process_compressed_vectors(input_data, 16, 16) + (primary_with_range, secondary_with_range) = MagL1a.process_compressed_vectors( + input_data, 16, 16 + ) assert primary_with_range.shape[0] == 16 assert secondary_with_range.shape[0] == 16 @@ -391,7 +400,9 @@ def test_compressed_vector_data(expected_vectors): assert np.array_equal(secondary_with_range, secondary_expected) -def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vector_bytearray): +def test_switch_to_uncompressed_vector_data( + expected_vectors, uncompressed_vector_bytearray +): primary_compressed = ( "000000100000010000001000000100000001000000100000110101110010" "011100101011010111001001110010101101011100100110000000011100" @@ -399,14 +410,17 @@ def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vecto "010111000111001001110010101101011100100110000000011010111001" "001110010101101011100100111001010111000110000101100000000110" "101110010011100101011010111001001110010101110001110010011000" - "000001100000000000000000000010111000000000000000000000010011000000000000000000000000100101011" + "000001100000000000000000000010111000000000000000000000010011" + "000000000000000000000000100101011" ) # 4 uncompressed vectors from uncompressed_vector_bytearray - uncompressed_bits = ("00000010000001000000100000010000000100000010000011" - "00000010000001110000100000011101000100000011101011" - "00000010000010100000100000101010000100000101010011" - "00000010000011010000100000110111000100000110111111") + uncompressed_bits = ( + "00000010000001000000100000010000000100000010000011" + "00000010000001110000100000011101000100000011101011" + "00000010000010100000100000101010000100000101010011" + "00000010000011010000100000110111000100000110111111" + ) secondary_compressed = ( "0000001000000011000010000000111100010000000111111110001110" @@ -415,8 +429,8 @@ def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vecto "0111001010110101110010011100101011100011000010110000000011" "0101110010011100101011010111001001100000000111000111001001" "1100101011010111001001110010101101011000010110000000011100" - "011100100111001010110000000000000010111000000000000000000000010011100101" - "000000000011" + "0111001001110010101100000000000000101110000000000000000000" + "00010011100101000000000011" ) uncompressed_expected_vectors = expected_vectors[0][:4] @@ -424,7 +438,14 @@ def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vecto headers = "01000000" input_data = np.array( - [int(i) for i in headers + primary_compressed + uncompressed_bits + secondary_compressed + uncompressed_bits], + [ + int(i) + for i in headers + + primary_compressed + + uncompressed_bits + + secondary_compressed + + uncompressed_bits + ], dtype=np.uint8, ) @@ -440,12 +461,21 @@ def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vecto # Test if first primary vector is too long primary_first_vector = "00000010000001000000100000010000000100000010000011" - primary_long_second_vector = ("0000000000000000000001011100000000000000000000001" - "0011000000000000000000000000100101011") + primary_long_second_vector = ( + "0000000000000000000001011100000000000000000000001" + "0011000000000000000000000000100101011" + ) input_data = np.array( - [int(i) for i in headers + primary_first_vector + primary_long_second_vector + uncompressed_bits + secondary_compressed], - dtype=np.uint8 + [ + int(i) + for i in headers + + primary_first_vector + + primary_long_second_vector + + uncompressed_bits + + secondary_compressed + ], + dtype=np.uint8, ) input_data = np.packbits(input_data) @@ -454,6 +484,7 @@ def test_switch_to_uncompressed_vector_data(expected_vectors, uncompressed_vecto assert np.array_equal(primary[0], expected_vectors[0][0]) assert np.array_equal(primary[2:], uncompressed_expected_vectors) + def test_different_compression_width(): # Compression headers - indicating a 12 bit width and no range section headers = "00110000" @@ -487,14 +518,20 @@ def test_different_compression_width(): padding = "00000" # Pad to byte boundary input_data = np.array( - [int(i) for i in headers + first_primary_vector + primary_compressed + - first_secondary_vector + secondary_compressed + padding], + [ + int(i) + for i in headers + + first_primary_vector + + primary_compressed + + first_secondary_vector + + secondary_compressed + + padding + ], dtype=np.uint8, ) input_data = np.packbits(input_data) - (primary, secondary) = MagL1a.process_compressed_vectors(input_data, 16, - 16) + (primary, secondary) = MagL1a.process_compressed_vectors(input_data, 16, 16) assert np.array_equal(primary[0], expected_first_vector) assert np.array_equal(secondary[0], expected_second_vector) @@ -522,10 +559,9 @@ def test_accumulate_vectors(): diff_vectors = [1, 1, 1, 3, 0, -3, -1, -10, 1] - expected_vectors = np.array([[1, 2, 3, 4], - [2, 3, 4, 4], - [5, 3, 1, 4], - [4, -7, 2, 4]]) + expected_vectors = np.array( + [[1, 2, 3, 4], [2, 3, 4, 4], [5, 3, 1, 4], [4, -7, 2, 4]] + ) test_vectors = MagL1a.accumulate_vectors(start_vector, diff_vectors, 4) @@ -547,14 +583,51 @@ def test_unpack_one_vector(uncompressed_vector_bytearray, expected_vectors): assert all(test_output == expected_vectors) test_12bit_vector = np.array( - [0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 1, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + ] + ) test_output = MagL1a.unpack_one_vector(test_12bit_vector, 12, 0) expected_vectors = [22, 0, -1, 0] assert all(test_output == expected_vectors) + def test_twos_complement(): # -19 in binary input_test = np.array([1, 1, 1, 0, 1, 1, 0, 1], dtype=np.uint8)