diff --git a/validator.py b/validator.py index d25ab65..3ab43d5 100644 --- a/validator.py +++ b/validator.py @@ -1,93 +1,180 @@ # Requires Python 3.7+ +from __future__ import annotations import dataclasses import struct import sys import argparse import datetime +from typing import BinaryIO, List, Tuple +from pathlib import Path +# Constants MEMORY_LIMIT = 3500 +MAGIC_BYTES = b'PSEQ' +MIN_START_OFFSET = 24 +MIN_STEP_TIME = 15 +MAX_DURATION_SECONDS = 5 * 60 +VALID_CHANNEL_COUNTS = {48, 200} +SUPPORTED_VERSIONS = {(2, 0), (2, 2)} -class ValidationError(Exception): - pass +@dataclasses.dataclass +class FileHeader: + """Represents the header structure of an FSEQ file.""" + major_version: int + minor_version: int + start_offset: int + channel_count: int + frame_count: int + step_time: int + compression_type: int + + @classmethod + def from_file(cls, file: BinaryIO) -> FileHeader: + """Parse file header from binary file.""" + magic = file.read(4) + if magic != MAGIC_BYTES: + raise ValidationError(f"Invalid magic bytes: expected {MAGIC_BYTES}, got {magic}") + + start, minor, major = struct.unpack(" Frame: + """Create a Frame instance from raw bytes.""" + light_state = [(b > 127) for b in lights] + ramp_state = [min((((255 - b) if (b > 127) else b) // 13 + 1) // 2, 3) for b in lights[:14]] + closure_state = [((b // 32 + 1) // 2) for b in closures] + return cls(light_state, ramp_state, closure_state) @dataclasses.dataclass class ValidationResults: + """Results of file validation.""" frame_count: int step_time: int duration_s: int memory_usage: float -def validate(file): - """Calculates the memory usage of the provided .fseq file""" - magic = file.read(4) - start, minor, major = struct.unpack(" bool: + """Check if validation results indicate a valid file.""" + return self.memory_usage <= 1 - if (magic != b'PSEQ') or (start < 24) or (frame_count < 1) or (step_time < 15): - raise ValidationError("Unknown file format, expected FSEQ v2.0") - if channel_count != 48 and channel_count != 200: - raise ValidationError(f"Expected 48 or 200 channels, got {channel_count}") - if compression_type != 0: - raise ValidationError("Expected file format to be V2 Uncompressed") - duration_s = (frame_count * step_time / 1000) - if duration_s > 5*60: - raise ValidationError(f"Expected total duration to be less than 5 minutes, got {datetime.timedelta(seconds=duration_s)}") - if ((minor != 0) and (minor != 2)) or (major != 2): - print("") - print(f"WARNING: FSEQ version is {major}.{minor}. Only version 2.0 and 2.2 have been validated.") - print(f"If the car fails to read this file, download and older version of XLights at https://github.com/smeighan/xLights/releases") - print(f"Please report this message at https://github.com/teslamotors/light-show/issues") - print("") + def format_results(self) -> str: + """Format validation results as human-readable string.""" + return ( + f"Found {self.frame_count} frames, step time of {self.step_time} ms " + f"for a total duration of {datetime.timedelta(seconds=self.duration_s)}.\n" + f"Used {self.memory_usage*100:.2f}% of the available memory" + ) - file.seek(start) +class ValidationError(Exception): + """Raised when file validation fails.""" + pass + +def validate_header(header: FileHeader) -> None: + """Validate file header values.""" + if header.start_offset < MIN_START_OFFSET: + raise ValidationError(f"Start offset too small: {header.start_offset}") + + if header.frame_count < 1: + raise ValidationError("Frame count must be positive") + + if header.step_time < MIN_STEP_TIME: + raise ValidationError(f"Step time too small: {header.step_time}") + + if header.channel_count not in VALID_CHANNEL_COUNTS: + raise ValidationError(f"Invalid channel count: {header.channel_count}") + + if header.compression_type != 0: + raise ValidationError("Only uncompressed V2 format is supported") + + duration_s = (header.frame_count * header.step_time / 1000) + if duration_s > MAX_DURATION_SECONDS: + raise ValidationError( + f"Duration exceeds limit: {datetime.timedelta(seconds=duration_s)} " + f"(max: {datetime.timedelta(seconds=MAX_DURATION_SECONDS)})" + ) - prev_light = None - prev_ramp = None - prev_closure_1 = None - prev_closure_2 = None - count = 0 +def check_version_compatibility(header: FileHeader) -> None: + """Check file version compatibility and print warnings if needed.""" + version = (header.major_version, header.minor_version) + if version not in SUPPORTED_VERSIONS: + print( + f"\nWARNING: FSEQ version is {header.major_version}.{header.minor_version}. " + "Only version 2.0 and 2.2 have been validated.\n" + "If the car fails to read this file, download an older version of XLights " + "at https://github.com/smeighan/xLights/releases\n" + "Please report this message at https://github.com/teslamotors/light-show/issues\n" + ) - for frame_i in range(frame_count): +def validate(file: BinaryIO) -> ValidationResults: + """Validate an FSEQ file and calculate memory usage.""" + header = FileHeader.from_file(file) + validate_header(header) + check_version_compatibility(header) + + file.seek(header.start_offset) + state_changes = 0 + prev_frame = None + + for _ in range(header.frame_count): lights = file.read(30) closures = file.read(16) - file.seek(channel_count - 30 - 16, 1) - - light_state = [(b > 127) for b in lights] - ramp_state = [min((((255 - b) if (b > 127) else (b)) // 13 + 1) // 2, 3) for b in lights[:14]] - closure_state = [((b // 32 + 1) // 2) for b in closures] - - if light_state != prev_light: - prev_light = light_state - count += 1 - if ramp_state != prev_ramp: - prev_ramp = ramp_state - count += 1 - if closure_state[:10] != prev_closure_1: - prev_closure_1 = closure_state[:10] - count += 1 - if closure_state[10:] != prev_closure_2: - prev_closure_2 = closure_state[10:] - count += 1 - - return ValidationResults(frame_count, step_time, duration_s, count / MEMORY_LIMIT) + file.seek(header.channel_count - 46, 1) # Skip remaining channels + + current_frame = Frame.from_bytes(lights, closures) + + if prev_frame is None or any([ + current_frame.light_state != prev_frame.light_state, + current_frame.ramp_state != prev_frame.ramp_state, + current_frame.closure_state[:10] != prev_frame.closure_state[:10], + current_frame.closure_state[10:] != prev_frame.closure_state[10:] + ]): + state_changes += 1 + + prev_frame = current_frame + + duration_s = (header.frame_count * header.step_time / 1000) + return ValidationResults( + frame_count=header.frame_count, + step_time=header.step_time, + duration_s=duration_s, + memory_usage=state_changes / MEMORY_LIMIT + ) -if __name__ == "__main__": - # Expected usage: python3 validator.py lightshow.fseq +def main() -> None: + """Main entry point for the validator script.""" parser = argparse.ArgumentParser(description="Validate .fseq file for Tesla Light Show use") - parser.add_argument("file") + parser.add_argument("file", type=Path, help="Path to the FSEQ file to validate") args = parser.parse_args() - - with open(args.file, "rb") as file: - try: + + try: + with open(args.file, "rb") as file: results = validate(file) - except ValidationError as e: - print(e) - sys.exit(1) - - print(f"Found {results.frame_count} frames, step time of {results.step_time} ms for a total duration of {datetime.timedelta(seconds=results.duration_s)}.") - print(f"Used {results.memory_usage*100:.2f}% of the available memory") - if results.memory_usage > 1: + print(results.format_results()) + sys.exit(0 if results.is_valid() else 1) + except (ValidationError, IOError) as e: + print(f"Error: {e}") sys.exit(1) + +if __name__ == "__main__": + main()