diff --git a/Code/Server_WebUI/Thread.py b/Code/Server_WebUI/Thread.py new file mode 100644 index 00000000..a8463a5b --- /dev/null +++ b/Code/Server_WebUI/Thread.py @@ -0,0 +1,36 @@ +import threading +import time +import inspect +import ctypes + + +def _async_raise(tid, exctype): + """raises the exception, performs cleanup if needed""" + tid = ctypes.c_long(tid) + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + +def stop_thread(thread): + for i in range(7): + _async_raise(thread.ident, SystemExit) + + +def test(): + while True: + print('-------') + time.sleep(1) + + +if __name__ == "__main__": + t = threading.Thread(target=test) + t.start() + time.sleep(5) + print("main thread sleep finish") + stop_thread(t) diff --git a/Code/Server_WebUI/adc.py b/Code/Server_WebUI/adc.py new file mode 100644 index 00000000..d13aa630 --- /dev/null +++ b/Code/Server_WebUI/adc.py @@ -0,0 +1,56 @@ +import smbus # Import the smbus module for I2C communication +import time # Import the time module for sleep functionality +from parameter import ParameterManager # Import the ParameterManager class from the parameter module + +class ADC: + def __init__(self): + """Initialize the ADC class.""" + self.I2C_ADDRESS = 0x48 # Set the I2C address of the ADC + self.ADS7830_COMMAND = 0x84 # Set the command byte for ADS7830 + self.parameter_manager = ParameterManager() # Create an instance of ParameterManager + self.pcb_version = self.parameter_manager.get_pcb_version() # Get the PCB version + self.adc_voltage_coefficient = 3.3 if self.pcb_version == 1 else 5.2 # Set the ADC voltage coefficient based on the PCB version + self.i2c_bus = smbus.SMBus(1) # Initialize the I2C bus + + def _read_stable_byte(self) -> int: + """Read a stable byte from the ADC.""" + while True: + value1 = self.i2c_bus.read_byte(self.I2C_ADDRESS) # Read the first byte from the ADC + value2 = self.i2c_bus.read_byte(self.I2C_ADDRESS) # Read the second byte from the ADC + if value1 == value2: + return value1 # Return the value if both reads are the same + + def read_adc(self, channel: int) -> float: + """Read the ADC value for the specified channel using ADS7830.""" + command_set = self.ADS7830_COMMAND | ((((channel << 2) | (channel >> 1)) & 0x07) << 4) # Calculate the command set for the specified channel + self.i2c_bus.write_byte(self.I2C_ADDRESS, command_set) # Write the command set to the ADC + value = self._read_stable_byte() # Read a stable byte from the ADC + voltage = value / 255.0 * self.adc_voltage_coefficient # Convert the ADC value to voltage + return round(voltage, 2) # Return the voltage rounded to 2 decimal places + + def scan_i2c_bus(self) -> None: + """Scan the I2C bus for connected devices.""" + print("Scanning I2C bus...") # Print a message indicating the start of I2C bus scanning + for device in range(128): # Iterate over possible I2C addresses (0 to 127) + try: + self.i2c_bus.read_byte_data(device, 0) # Try to read data from the current device address + print(f"Device found at address: 0x{device:02X}") # Print the address of the found device + except OSError: + pass # Ignore any OSError exceptions + + def close_i2c(self) -> None: + """Close the I2C bus.""" + self.i2c_bus.close() # Close the I2C bus + +if __name__ == '__main__': + print('Program is starting ... ') # Print a message indicating the start of the program + adc = ADC() # Create an instance of the ADC class + try: + while True: + left_idr = adc.read_adc(0) # Read the left photoresistor value + right_idr = adc.read_adc(1) # Read the right photoresistor value + power = adc.read_adc(2) * (3 if adc.pcb_version == 1 else 2) # Calculate the power value based on the PCB version + print(f"Left IDR: {left_idr}V, Right IDR: {right_idr}V, Power: {power}V") # Print the values of left IDR, right IDR, and power + time.sleep(1) # Wait for 1 second + except KeyboardInterrupt: + adc.close_i2c() # Close the I2C bus when the program is interrupted \ No newline at end of file diff --git a/Code/Server_WebUI/buzzer.py b/Code/Server_WebUI/buzzer.py new file mode 100644 index 00000000..7f33a28d --- /dev/null +++ b/Code/Server_WebUI/buzzer.py @@ -0,0 +1,28 @@ +import time +from gpiozero import OutputDevice + +class Buzzer: + def __init__(self): + """Initialize the Buzzer class.""" + self.PIN = 17 # Set the GPIO pin for the buzzer + self.buzzer_pin = OutputDevice(self.PIN) # Initialize the buzzer pin + + def set_state(self, state: bool) -> None: + """Set the state of the buzzer.""" + self.buzzer_pin.on() if state else self.buzzer_pin.off() # Turn on or off the buzzer based on the state + + def close(self) -> None: + """Close the buzzer pin.""" + self.buzzer_pin.close() # Close the buzzer pin to release the GPIO resource + +if __name__ == '__main__': + print('Program is starting ... ') # Print a message indicating the start of the program + buzzer = Buzzer() # Create an instance of the Buzzer class + try: + for _ in range(3): + buzzer.set_state(True) # Turn on the buzzer + time.sleep(0.1) # Wait for 0.1 second + buzzer.set_state(False) # Turn off the buzzer + time.sleep(0.1) # Wait for 0.1 second + finally: + buzzer.close() # Ensure the buzzer pin is closed when the program is interrupted \ No newline at end of file diff --git a/Code/Server_WebUI/camera.py b/Code/Server_WebUI/camera.py new file mode 100644 index 00000000..a429602b --- /dev/null +++ b/Code/Server_WebUI/camera.py @@ -0,0 +1,120 @@ +import time +from picamera2 import Picamera2, Preview +from picamera2.encoders import H264Encoder, JpegEncoder +from picamera2.outputs import FileOutput +from libcamera import Transform +from threading import Condition +import io + +class StreamingOutput(io.BufferedIOBase): + def __init__(self): + """Initialize the StreamingOutput class.""" + self.frame = None + self.condition = Condition() # Initialize the condition variable for thread synchronization + + def write(self, buf: bytes) -> int: + """Write a buffer to the frame and notify all waiting threads.""" + with self.condition: + self.frame = buf # Update the frame buffer with new data + self.condition.notify_all() # Notify all waiting threads that new data is available + return len(buf) + +class Camera: + def __init__(self, preview_size: tuple = (640, 480), hflip: bool = True, vflip: bool = True, stream_size: tuple = (400, 300)): + """Initialize the Camera class.""" + self.camera = Picamera2() # Initialize the Picamera2 object + self.transform = Transform(hflip=1 if hflip else 0, vflip=1 if vflip else 0) # Set the transformation for flipping the image + preview_config = self.camera.create_preview_configuration(main={"size": preview_size}, transform=self.transform) # Create the preview configuration + self.camera.configure(preview_config) # Configure the camera with the preview settings + + # Configure video stream + self.stream_size = stream_size # Set the size of the video stream + self.stream_config = self.camera.create_video_configuration(main={"size": stream_size}, transform=self.transform) # Create the video configuration + self.streaming_output = StreamingOutput() # Initialize the streaming output object + self.streaming = False # Initialize the streaming flag + + def start_image(self) -> None: + """Start the camera preview and capture.""" + self.camera.start_preview(Preview.QTGL) # Start the camera preview using the QTGL backend + self.camera.start() # Start the camera + + def save_image(self, filename: str) -> dict: + """Capture and save an image to the specified file.""" + try: + metadata = self.camera.capture_file(filename) # Capture an image and save it to the specified file + return metadata # Return the metadata of the captured image + except Exception as e: + print(f"Error capturing image: {e}") # Print error message if capturing fails + return None # Return None if capturing fails + + def start_stream(self, filename: str = None) -> None: + """Start the video stream or recording.""" + if not self.streaming: + if self.camera.started: + self.camera.stop() # Stop the camera if it is currently running + + self.camera.configure(self.stream_config) # Configure the camera with the video stream settings + if filename: + encoder = H264Encoder() # Use H264 encoder for video recording + output = FileOutput(filename) # Set the output file for the recorded video + else: + encoder = JpegEncoder() # Use Jpeg encoder for streaming + output = FileOutput(self.streaming_output) # Set the streaming output object + self.camera.start_recording(encoder, output) # Start recording or streaming + self.streaming = True # Set the streaming flag to True + + def stop_stream(self) -> None: + """Stop the video stream or recording.""" + if self.streaming: + try: + self.camera.stop_recording() # Stop the recording or streaming + self.streaming = False # Set the streaming flag to False + except Exception as e: + print(f"Error stopping stream: {e}") # Print error message if stopping fails + + def get_frame(self) -> bytes: + """Get the current frame from the streaming output.""" + with self.streaming_output.condition: + self.streaming_output.condition.wait() # Wait for a new frame to be available + return self.streaming_output.frame # Return the current frame + + def save_video(self, filename: str, duration: int = 10) -> None: + """Save a video for the specified duration.""" + self.start_stream(filename) # Start the video recording + time.sleep(duration) # Record for the specified duration + self.stop_stream() # Stop the video recording + + def close(self) -> None: + """Close the camera.""" + if self.streaming: + self.stop_stream() # Stop the streaming if it is active + self.camera.close() # Close the camera + +if __name__ == '__main__': + print('Program is starting ... ') # Print a message indicating the start of the program + camera = Camera() # Create a Camera instance + + print("View image...") + camera.start_image() # Start the camera preview + time.sleep(10) # Wait for 10 seconds + + print("Capture image...") + camera.save_image(filename="image.jpg") # Capture and save an image + time.sleep(1) # Wait for 1 second + + ''' + print("Stream video...") + camera.start_stream() # Start the video stream + time.sleep(3) # Stream for 3 seconds + + print("Stop video...") + camera.stop_stream() # Stop the video stream + time.sleep(1) # Wait for 1 second + + print("Save video...") + camera.save_video("video.h264", duration=3) # Save a video for 3 seconds + time.sleep(1) # Wait for 1 second + + print("Close camera...") + camera.close() # Close the camera + ''' \ No newline at end of file diff --git a/Code/Server_WebUI/car.py b/Code/Server_WebUI/car.py new file mode 100644 index 00000000..bc909ab9 --- /dev/null +++ b/Code/Server_WebUI/car.py @@ -0,0 +1,191 @@ +from ultrasonic import Ultrasonic +from motor import Ordinary_Car +from servo import Servo +from infrared import Infrared +from adc import ADC +import time +import math + +class Car: + def __init__(self): + self.servo = None + self.sonic = None + self.motor = None + self.infrared = None + self.adc = None + self.car_record_time = time.time() + self.car_sonic_servo_angle = 30 + self.car_sonic_servo_dir = 1 + self.car_sonic_distance = [30, 30, 30] + self.time_compensate = 3 #Depend on your own car,If you want to get the best out of the rotation mode, change the value by experimenting. + self.start() + def start(self): + if self.servo is None: + self.servo = Servo() + if self.sonic is None: + self.sonic = Ultrasonic() + if self.motor is None: + self.motor = Ordinary_Car() + if self.infrared is None: + self.infrared = Infrared() + if self.adc is None: + self.adc = ADC() + + def close(self): + self.motor.set_motor_model(0,0,0,0) + self.sonic.close() + self.motor.close() + self.infrared.close() + self.adc.close_i2c() + self.servo = None + self.sonic = None + self.motor = None + self.infrared = None + self.adc = None + + def run_motor_ultrasonic(self, distance): + if (distance[0] < 30 and distance[1] < 30 and distance[2] <30) or distance[1] < 30 : + self.motor.set_motor_model(-1450,-1450,-1450,-1450) + time.sleep(0.1) + if distance[0] < distance[2]: + self.motor.set_motor_model(1450,1450,-1450,-1450) + else : + self.motor.set_motor_model(-1450,-1450,1450,1450) + elif distance[0] < 30 and distance[1] < 30: + self.motor.set_motor_model(1500,1500,-1500,-1500) + elif distance[2] < 30 and distance[1] < 30: + self.motor.set_motor_model(-1500,-1500,1500,1500) + elif distance[0] < 20 : + self.motor.set_motor_model(2000,2000,-500,-500) + if distance[0] < 10 : + self.motor.set_motor_model(1500,1500,-1000,-1000) + elif distance[2] < 20 : + self.motor.set_motor_model(-500,-500,2000,2000) + if distance[2] < 10 : + self.motor.set_motor_model(-1500,-1500,1500,1500) + else : + self.motor.set_motor_model(600,600,600,600) + + def mode_ultrasonic(self): + if (time.time() - self.car_record_time) > 0.2: + self.car_record_time = time.time() + self.servo.set_servo_pwm('0', self.car_sonic_servo_angle) + if self.car_sonic_servo_angle == 30: + self.car_sonic_distance[0] = self.sonic.get_distance() + elif self.car_sonic_servo_angle == 90: + self.car_sonic_distance[1] = self.sonic.get_distance() + elif self.car_sonic_servo_angle == 150: + self.car_sonic_distance[2] = self.sonic.get_distance() + #print("L:{}, M:{}, R:{}".format(self.car_sonic_distance[0], self.car_sonic_distance[1], self.car_sonic_distance[2])) + self.run_motor_ultrasonic(self.car_sonic_distance) + if self.car_sonic_servo_angle <= 30: + self.car_sonic_servo_dir = 1 + elif self.car_sonic_servo_angle >= 150: + self.car_sonic_servo_dir = 0 + if self.car_sonic_servo_dir == 1: + self.car_sonic_servo_angle += 60 + elif self.car_sonic_servo_dir == 0: + self.car_sonic_servo_angle -= 60 + + def mode_infrared(self): + if (time.time() - self.car_record_time) > 0.2: + self.car_record_time = time.time() + infrared_value = self.infrared.read_all_infrared() + #print("infrared_value: " + str(infrared_value)) + if infrared_value == 2: + self.motor.set_motor_model(800,800,800,800) + elif infrared_value == 4: + self.motor.set_motor_model(-1500,-1500,2500,2500) + elif infrared_value == 6: + self.motor.set_motor_model(-2000,-2000,4000,4000) + elif infrared_value == 1: + self.motor.set_motor_model(2500,2500,-1500,-1500) + elif infrared_value == 3: + self.motor.set_motor_model(4000,4000,-2000,-2000) + elif infrared_value == 7: + self.motor.set_motor_model(0,0,0,0) + + def mode_light(self): + if (time.time() - self.car_record_time) > 0.2: + self.car_record_time = time.time() + self.motor.set_motor_model(0,0,0,0) + L = self.adc.read_adc(0) + R = self.adc.read_adc(1) + #print("L: {}, R: {}".format(L, R)) + if L < 2.99 and R < 2.99 : + self.motor.set_motor_model(600,600,600,600) + elif abs(L-R)<0.15: + self.motor.set_motor_model(0,0,0,0) + elif L > 3 or R > 3: + if L > R : + self.motor.set_motor_model(-1200,-1200,1400,1400) + elif R > L : + self.motor.set_motor_model(1400,1400,-1200,-1200) + + def mode_rotate(self, n): + angle = n + bat_compensate = 7.5 / (self.adc.read_adc(2) * (3 if self.adc.pcb_version == 1 else 2)) + while True: + W = 2000 + VY = int(2000 * math.cos(math.radians(angle))) + VX = -int(2000 * math.sin(math.radians(angle))) + FR = VY - VX + W + FL = VY + VX - W + BL = VY - VX - W + BR = VY + VX + W + print("rotating") + self.motor.set_motor_model(FL, BL, FR, BR) + time.sleep(5*self.time_compensate*bat_compensate/1000) + angle -= 5 + +def test_car_sonic(): + car = Car() + try: + while True: + car.mode_ultrasonic() + except KeyboardInterrupt: + car.close() + print("\nEnd of program") + +def test_car_infrared(): + car = Car() + try: + while True: + car.mode_infrared() + except KeyboardInterrupt: + car.close() + print("\nEnd of program") + +def test_car_light(): + car = Car() + try: + print("Program is starting...") + while True: + car.mode_light() + except KeyboardInterrupt: + car.close() + print("\nEnd of program") + +def test_car_rotate(): + car = Car() + print("Program is starting...") + try: + car.mode_rotate(0) + except KeyboardInterrupt: + print ("\nEnd of program") + car.motor.set_motor_model(0,0,0,0) + car.close() + +if __name__ == '__main__': + import sys + if len(sys.argv) < 2: + print("Parameter error: Please assign the device") + exit() + if sys.argv[1] == 'Sonic' or sys.argv[1] == 'sonic': + test_car_sonic() + elif sys.argv[1] == 'Infrared' or sys.argv[1] == 'infrared': + test_car_infrared() + elif sys.argv[1] == 'Light' or sys.argv[1] == 'light': + test_car_light() + elif sys.argv[1] == 'Rotate' or sys.argv[1] == 'rotate': + test_car_rotate() \ No newline at end of file diff --git a/Code/Server_WebUI/command.py b/Code/Server_WebUI/command.py new file mode 100644 index 00000000..1322ff42 --- /dev/null +++ b/Code/Server_WebUI/command.py @@ -0,0 +1,14 @@ +class Command: + def __init__(self): + self.CMD_MOTOR = "CMD_MOTOR" + self.CMD_M_MOTOR = "CMD_M_MOTOR" + self.CMD_CAR_ROTATE = "CMD_CAR_ROTATE" + self.CMD_LED = "CMD_LED" + self.CMD_LED_MOD = "CMD_LED_MOD" + self.CMD_SERVO = "CMD_SERVO" + self.CMD_BUZZER = "CMD_BUZZER" + self.CMD_SONIC = "CMD_SONIC" + self.CMD_LIGHT = "CMD_LIGHT" + self.CMD_POWER = "CMD_POWER" + self.CMD_MODE = "CMD_MODE" + self.CMD_LINE = "CMD_LINE" \ No newline at end of file diff --git a/Code/Server_WebUI/infrared.py b/Code/Server_WebUI/infrared.py new file mode 100644 index 00000000..43b65b09 --- /dev/null +++ b/Code/Server_WebUI/infrared.py @@ -0,0 +1,46 @@ +# Import the LineSensor class from gpiozero for reading infrared sensors +from gpiozero import LineSensor +import time + +# Define the Infrared class to manage infrared sensors +class Infrared: + def __init__(self): + # Define the GPIO pins for each infrared sensor + self.IR_PINS = { + 1: 14, + 2: 15, + 3: 23 + } + # Initialize LineSensor objects for each infrared sensor + self.sensors = {channel: LineSensor(pin) for channel, pin in self.IR_PINS.items()} + + def read_one_infrared(self, channel: int) -> int: + """Read the value of a single infrared sensor.""" + if channel in self.sensors: + return 1 if self.sensors[channel].value else 0 + else: + raise ValueError(f"Invalid channel: {channel}. Valid channels are {list(self.IR_PINS.keys())}.") + + def read_all_infrared(self) -> int: + """Combine the values of all three infrared sensors into a single integer.""" + return (self.read_one_infrared(1) << 2) | (self.read_one_infrared(2) << 1) | self.read_one_infrared(3) + + def close(self) -> None: + """Close each LineSensor object to release GPIO resources.""" + for sensor in self.sensors.values(): + sensor.close() + +# Main entry point for testing the Infrared class +if __name__ == '__main__': + # Create an Infrared object + infrared = Infrared() + try: + # Continuously read and print the combined value of all infrared sensors + while True: + infrared_value = infrared.read_all_infrared() + print(f"Infrared value: {infrared_value}") + time.sleep(0.5) + except KeyboardInterrupt: + # Close the Infrared object and print a message when interrupted + infrared.close() + print("\nEnd of program") \ No newline at end of file diff --git a/Code/Server_WebUI/led.py b/Code/Server_WebUI/led.py new file mode 100644 index 00000000..93f82968 --- /dev/null +++ b/Code/Server_WebUI/led.py @@ -0,0 +1,190 @@ +# -*-coding: utf-8 -*- +import time +from parameter import ParameterManager +from rpi_ledpixel import Freenove_RPI_WS281X +from spi_ledpixel import Freenove_SPI_LedPixel + +class Led: + def __init__(self): + """Initialize the Led class and set up LED strip based on PCB and Raspberry Pi versions.""" + # Initialize the ParameterManager instance + self.param = ParameterManager() + # Get the Connect version from the parameter file + self.connect_version = self.param.get_connect_version() + # Get the Raspberry Pi version from the parameter file + self.pi_version = self.param.get_raspberry_pi_version() + + # Set up the LED strip based on PCB and Raspberry Pi versions + if self.connect_version == 1 and self.pi_version == 1: + self.strip = Freenove_RPI_WS281X(8, 255, 'RGB') + self.is_support_led_function = True + + elif self.connect_version == 2 and (self.pi_version == 1 or self.pi_version == 2): + self.strip = Freenove_SPI_LedPixel(8, 255, 'GRB') + self.is_support_led_function = True + + elif self.connect_version == 1 and self.pi_version == 2: + # Print an error message and disable LED function if unsupported combination + print("Connect Version 1.0 is not supported on Raspberry PI 5.") + self.is_support_led_function = False + + self.start = time.time() + self.next = 0 + self.color_wheel_value = 100 + self.color_chase_rainbow_index = 0 + self.color_wipe_index = 0 + self.rainbowbreathing_brightness = 0 + + def colorBlink(self, state=1, wait_ms=300): + """Wipe color across display a pixel at a time.""" + if self.is_support_led_function == False: + return + else: + if state == 1: + color = [[255, 0, 0],[0, 0, 0],[0, 255, 0],[0, 0, 0],[0, 0, 255],[0, 0, 0]] + self.next = time.time() + if (self.next - self.start) > wait_ms / 1000.0: + self.start = self.next + for i in range(self.strip.get_led_count()): + self.strip.set_led_rgb_data(i, color[self.color_wipe_index%4]) + self.strip.show() + self.color_wipe_index += 1 + else: + self.strip.set_all_led_color(0, 0, 0) + self.strip.show() + + def wheel(self, pos): + """Generate rainbow colors across 0-255 positions.""" + if self.is_support_led_function == False: + return + else: + if pos < 0 or pos > 255: + r = g = b = 0 + elif pos < 85: + r = pos * 3 + g = 255 - pos * 3 + b = 0 + elif pos < 170: + pos -= 85 + r = 255 - pos * 3 + g = 0 + b = pos * 3 + else: + pos -= 170 + r = 0 + g = pos * 3 + b = 255 - pos * 3 + return (r, g, b) + + def rainbowbreathing(self, wait_ms=10): + """Draw rainbowbreathing that fades across all pixels at once.""" + if self.is_support_led_function == False: + return + else: + self.next = time.time() + if (self.next - self.start) > wait_ms / 1000.0: + self.start = self.next + color1 = self.wheel((self.color_wheel_value%255) & 255) + if (self.rainbowbreathing_brightness%200) > 100: + brightness = 200 - self.rainbowbreathing_brightness + else: + brightness = self.rainbowbreathing_brightness + color2 = [int(color1[0] * brightness / 100), int(color1[1] * brightness / 100), int(color1[2] * brightness / 100)] + for i in range(self.strip.get_led_count()): + self.strip.set_led_rgb_data(i, color2) + self.strip.show() + + self.rainbowbreathing_brightness += 1 + if self.rainbowbreathing_brightness >= 200: + self.rainbowbreathing_brightness = 0 + self.color_wheel_value += 32 + if self.color_wheel_value >= 256: + self.color_wheel_value = 0 + + + + def rainbowCycle(self, wait_ms = 20): + """Draw rainbow that uniformly distributes itself across all pixels.""" + if not self.is_support_led_function: + return + else: + self.next = time.time() + if (self.next - self.start > wait_ms / 1000.0): + self.start = self.next + for i in range(self.strip.get_led_count()): + self.strip.set_led_rgb_data(i, self.wheel((int(i * 256 / self.strip.get_led_count()) + self.color_wheel_value) & 255)) + self.strip.show() + self.color_wheel_value += 1 + if self.color_wheel_value >= 256: + self.color_wheel_value = 0 + + def following(self, wait_ms=50): + """Rainbow movie theater light style chaser animation.""" + if self.is_support_led_function == False: + return + else: + self.next = time.time() + if (self.next - self.start > wait_ms / 1000.0): + self.start = self.next + for i in range(self.strip.get_led_count()): + self.strip.set_led_rgb_data(i, [0, 0, 0]) + self.strip.set_led_rgb_data(self.color_chase_rainbow_index, self.wheel((self.color_wheel_value) & 255)) + self.strip.show() + self.color_chase_rainbow_index += 1 + if self.color_chase_rainbow_index >= self.strip.get_led_count(): + self.color_chase_rainbow_index = 0 + self.color_wheel_value += 5 + if self.color_wheel_value >= 256: + self.color_wheel_value = 0 + + + def ledIndex(self, index, R, G, B): + """Set the color of specific LEDs based on the index.""" + if self.is_support_led_function == False: + return + else: + color = (R, G, B) + for i in range(8): + if index & 0x01 == 1: + self.strip.set_led_rgb_data(i, color) + self.strip.show() + index = index >> 1 + +# Main program logic follows: +if __name__ == '__main__': + print ('Program is starting ... ') + led = Led() + try: + print ("colorBlink animation") + start = time.time() + while (time.time() - start) < 5: + led.colorBlink(1) + + print ("following animation") + start = time.time() + while (time.time() - start) < 5: + led.following(50) + + print ("rainbowbreathing animation") + start = time.time() + while (time.time() - start) < 5: + led.rainbowbreathing(10) + + print ("rainbowCycle animation") + start = time.time() + while (time.time() - start) < 10: + led.rainbowCycle(20) + + led.colorBlink(0) + except KeyboardInterrupt: # When 'Ctrl+C' is pressed, the child program destroy() will be executed. + led.colorBlink(0) + finally: + print ("\nEnd of program") + + + + + + + + diff --git a/Code/Server_WebUI/main.py b/Code/Server_WebUI/main.py new file mode 100644 index 00000000..471650fc --- /dev/null +++ b/Code/Server_WebUI/main.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 + +""" +Headless launcher for the web-based FreeKap 4WD controller. + +- No Qt +- No legacy TCP server +- Exposes: + * HTTP web UI on HTTP_PORT (default 8080) + * /video/mjpeg from Camera + * /api/telemetry, /api/drive/mecanum, /api/servo, /api/leds, etc. +""" + +import sys +import time + +from car import Car +from camera import Camera +from led import Led +from buzzer import Buzzer + +from server import ( + start_http_server, + set_app_context, + HTTP_PORT, +) + + +def main(): + print("[MAIN_WEB] Starting headless web server...") + + # --- Instantiate hardware objects --------------------------------------- + car = Car() + camera = Camera(stream_size=(400, 300)) + led = Led() + buzzer = Buzzer() + + # --- Expose them to server.py via APP_CONTEXT --------------------------- + set_app_context( + car=car, + camera=camera, + led=led, + buzzer=buzzer, + ) + + print(f"[MAIN_WEB] Web UI available at http://:{HTTP_PORT}/") + print("[MAIN_WEB] Press Ctrl+C to stop.") + + try: + # This blocks in the main thread; Ctrl+C raises KeyboardInterrupt here + start_http_server(port=HTTP_PORT) + except KeyboardInterrupt: + print("\n[MAIN_WEB] Caught Ctrl+C, shutting down...") + finally: + # Clean up hardware on exit + try: + camera.stop_stream() + camera.close() + except Exception: + pass + + try: + car.close() + except Exception: + pass + + try: + led.colorBlink(0) + except Exception: + pass + + try: + buzzer.set_state(0) + except Exception: + pass + + print("[MAIN_WEB] Shutdown complete.") + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/Code/Server_WebUI/main_web.py b/Code/Server_WebUI/main_web.py new file mode 100644 index 00000000..471650fc --- /dev/null +++ b/Code/Server_WebUI/main_web.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 + +""" +Headless launcher for the web-based FreeKap 4WD controller. + +- No Qt +- No legacy TCP server +- Exposes: + * HTTP web UI on HTTP_PORT (default 8080) + * /video/mjpeg from Camera + * /api/telemetry, /api/drive/mecanum, /api/servo, /api/leds, etc. +""" + +import sys +import time + +from car import Car +from camera import Camera +from led import Led +from buzzer import Buzzer + +from server import ( + start_http_server, + set_app_context, + HTTP_PORT, +) + + +def main(): + print("[MAIN_WEB] Starting headless web server...") + + # --- Instantiate hardware objects --------------------------------------- + car = Car() + camera = Camera(stream_size=(400, 300)) + led = Led() + buzzer = Buzzer() + + # --- Expose them to server.py via APP_CONTEXT --------------------------- + set_app_context( + car=car, + camera=camera, + led=led, + buzzer=buzzer, + ) + + print(f"[MAIN_WEB] Web UI available at http://:{HTTP_PORT}/") + print("[MAIN_WEB] Press Ctrl+C to stop.") + + try: + # This blocks in the main thread; Ctrl+C raises KeyboardInterrupt here + start_http_server(port=HTTP_PORT) + except KeyboardInterrupt: + print("\n[MAIN_WEB] Caught Ctrl+C, shutting down...") + finally: + # Clean up hardware on exit + try: + camera.stop_stream() + camera.close() + except Exception: + pass + + try: + car.close() + except Exception: + pass + + try: + led.colorBlink(0) + except Exception: + pass + + try: + buzzer.set_state(0) + except Exception: + pass + + print("[MAIN_WEB] Shutdown complete.") + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/Code/Server_WebUI/message.py b/Code/Server_WebUI/message.py new file mode 100644 index 00000000..788d2f3f --- /dev/null +++ b/Code/Server_WebUI/message.py @@ -0,0 +1,80 @@ +import queue + +class Message_Parse: + def __init__(self): + """Initialize the Message_Parse class with empty parameters.""" + self.input_string = None # The raw input string + self.string_parameter = [] # List to store string parameters + self.int_parameter = [] # List to store integer parameters + self.command_string = None # The command string extracted from the input + + def clear_parameters(self): + """Clear all parsed parameters.""" + self.input_string = None # Reset the input string + self.string_parameter.clear() # Clear the list of string parameters + self.int_parameter.clear() # Clear the list of integer parameters + self.command_string = None # Reset the command string + + def parse(self, msg: str) -> bool: + """ + Parse the input message and extract command and parameters. + Parameters: + msg (str): The input message to parse. + Returns: + bool: True if parsing is successful, False otherwise. + """ + try: + self.clear_parameters() # Clear any existing parameters + self.input_string = msg.strip() # Remove leading and trailing whitespace from the input message + self.string_parameter = self.input_string.split("#") # Split the input string by '#' to get parameters + self.command_string = self.string_parameter[0] # The first element is the command string + buf_string_parameter = self.string_parameter[1:] # Remaining elements are parameters + if len(buf_string_parameter) > 0: + for x in buf_string_parameter: + if x != "" and x != '': # Ensure the parameter is not an empty string + try: + self.int_parameter.append(round(float(x))) # Convert the parameter to an integer and append to the list + except: + if x == 'one': + self.int_parameter.append(0) + elif x == 'two': + self.int_parameter.append(1) + elif x == 'three': + self.int_parameter.append(3) + elif x == 'four': + self.int_parameter.append(2) + else: + self.int_parameter.append(0) + return True + except Exception as e: + print("Error: Invalid command or parameter.") # Print an error message if parsing fails + print("msg:{}".format(msg)) # Print the original message + self.clear_parameters() # Clear parameters in case of error + print("Error:", e) # Print the exception details + return False + +if __name__ == '__main__': + print('Program is starting ... ') # Print a message indicating the start of the program + msg_parse = Message_Parse() # Create an instance of the Message_Parse class + my_queue = queue.Queue() # Create a queue to hold messages + + print("Message Parse Test") # Print a test start message + print("Put message to queue") # Indicate that a message is being added to the queue + my_queue.put("CMD_LED#0#255#0#0#15#") # Add a test message to the queue + + print("Get message from queue\n") # Indicate that messages are being processed from the queue + while not my_queue.empty(): # Process messages until the queue is empty + print("Queue size: " + str(my_queue.qsize())) # Print the current size of the queue + if msg_parse.parse(my_queue.get()): # Parse the message from the queue + if len(msg_parse.int_parameter) > 0 and len(msg_parse.string_parameter) > 0: + print("msg.input_string: {}".format(msg_parse.input_string)) # Print the raw input string + print("msg.string_parameter: {}".format(msg_parse.string_parameter)) # Print the list of string parameters + print("msg.command_string: {}".format(msg_parse.command_string)) # Print the command string + print("msg.int_parameter:{}\n".format(msg_parse.int_parameter)) # Print the list of integer parameters + elif len(msg_parse.string_parameter) > 0 and len(msg_parse.int_parameter) == 0: + print("msg.input_string: {}".format(msg_parse.input_string)) # Print the raw input string + print("msg.command_string: {}\n".format(msg_parse.command_string)) # Print the command string + else: + print("msg.input_string: {}".format(msg_parse.input_string)) # Print the raw input string + + print("Test end") # Indicate the end of the test \ No newline at end of file diff --git a/Code/Server_WebUI/motor.py b/Code/Server_WebUI/motor.py new file mode 100644 index 00000000..b37ed743 --- /dev/null +++ b/Code/Server_WebUI/motor.py @@ -0,0 +1,93 @@ +import time +from pca9685 import PCA9685 + +class Ordinary_Car: + def __init__(self): + self.pwm = PCA9685(0x40, debug=True) + self.pwm.set_pwm_freq(50) + def duty_range(self, duty1, duty2, duty3, duty4): + if duty1 > 4095: + duty1 = 4095 + elif duty1 < -4095: + duty1 = -4095 + if duty2 > 4095: + duty2 = 4095 + elif duty2 < -4095: + duty2 = -4095 + if duty3 > 4095: + duty3 = 4095 + elif duty3 < -4095: + duty3 = -4095 + if duty4 > 4095: + duty4 = 4095 + elif duty4 < -4095: + duty4 = -4095 + return duty1,duty2,duty3,duty4 + def left_upper_wheel(self,duty): + if duty>0: + self.pwm.set_motor_pwm(0,0) + self.pwm.set_motor_pwm(1,duty) + elif duty<0: + self.pwm.set_motor_pwm(1,0) + self.pwm.set_motor_pwm(0,abs(duty)) + else: + self.pwm.set_motor_pwm(0,4095) + self.pwm.set_motor_pwm(1,4095) + def left_lower_wheel(self,duty): + if duty>0: + self.pwm.set_motor_pwm(3,0) + self.pwm.set_motor_pwm(2,duty) + elif duty<0: + self.pwm.set_motor_pwm(2,0) + self.pwm.set_motor_pwm(3,abs(duty)) + else: + self.pwm.set_motor_pwm(2,4095) + self.pwm.set_motor_pwm(3,4095) + def right_upper_wheel(self,duty): + if duty>0: + self.pwm.set_motor_pwm(6,0) + self.pwm.set_motor_pwm(7,duty) + elif duty<0: + self.pwm.set_motor_pwm(7,0) + self.pwm.set_motor_pwm(6,abs(duty)) + else: + self.pwm.set_motor_pwm(6,4095) + self.pwm.set_motor_pwm(7,4095) + def right_lower_wheel(self,duty): + if duty>0: + self.pwm.set_motor_pwm(4,0) + self.pwm.set_motor_pwm(5,duty) + elif duty<0: + self.pwm.set_motor_pwm(5,0) + self.pwm.set_motor_pwm(4,abs(duty)) + else: + self.pwm.set_motor_pwm(4,4095) + self.pwm.set_motor_pwm(5,4095) + def set_motor_model(self, duty1, duty2, duty3, duty4): + duty1,duty2,duty3,duty4=self.duty_range(duty1,duty2,duty3,duty4) + self.left_upper_wheel(duty1) + self.left_lower_wheel(duty2) + self.right_upper_wheel(duty3) + self.right_lower_wheel(duty4) + + def close(self): + self.set_motor_model(0,0,0,0) + self.pwm.close() + +if __name__=='__main__': + PWM = Ordinary_Car() + try: + PWM.set_motor_model(2000,2000,2000,2000) #Forward + time.sleep(1) + PWM.set_motor_model(-2000,-2000,-2000,-2000) #Back + time.sleep(1) + PWM.set_motor_model(-2000,-2000,2000,2000) #Left + time.sleep(1) + PWM.set_motor_model(2000,2000,-2000,-2000) #Right + time.sleep(1) + PWM.set_motor_model(0,0,0,0) #Stop + except KeyboardInterrupt: # When 'Ctrl+C' is pressed, the child program destroy() will be executed. + print ("\nEnd of program") + finally: + PWM.close() + diff --git a/Code/Server_WebUI/parameter.py b/Code/Server_WebUI/parameter.py new file mode 100644 index 00000000..b2af9282 --- /dev/null +++ b/Code/Server_WebUI/parameter.py @@ -0,0 +1,150 @@ +# Import necessary modules +import os +import json +import subprocess + +class ParameterManager: + # Define the default parameter file name + PARAM_FILE = 'params.json' + + def __init__(self): + # Initialize the file path to the default parameter file + if self.file_exists() == False or self.validate_params() == False: + self.deal_with_param() + + def file_exists(self, file_path: str = PARAM_FILE) -> bool: + """Check if the specified file exists.""" + return os.path.exists(file_path) + + def validate_params(self, file_path: str = PARAM_FILE) -> bool: + """Validate that the parameter file exists and contains valid parameters.""" + if not self.file_exists(file_path): + return False + try: + with open(file_path, 'r') as file: + params = json.load(file) + # Check if required parameters are present and valid + required_params = { + 'Connect_Version': [1, 2], + 'Pcb_Version': [1, 2], + 'Pi_Version': [1, 2] + } + for param, valid_values in required_params.items(): + if param not in params or params[param] not in valid_values: + return False + return True + except json.JSONDecodeError: + print("Error decoding JSON file.") + return False + except Exception as e: + print(f"Error reading file: {e}") + return False + + def get_param(self, param_name: str, file_path: str = PARAM_FILE) -> any: + """Get the value of a specified parameter from the parameter file.""" + if self.validate_params(file_path): + with open(file_path, 'r') as file: + params = json.load(file) + return params.get(param_name) + return None + + def set_param(self, param_name: str, value: any, file_path: str = PARAM_FILE) -> None: + """Set the value of a specified parameter in the parameter file.""" + params = {} + if self.file_exists(file_path): + with open(file_path, 'r') as file: + params = json.load(file) + params[param_name] = value + with open(file_path, 'w') as file: + json.dump(params, file, indent=4) + + def delete_param_file(self, file_path: str = PARAM_FILE) -> None: + """Delete the specified parameter file.""" + if self.file_exists(file_path): + os.remove(file_path) + print(f"Deleted {file_path}") + else: + print(f"File {file_path} does not exist") + + def create_param_file(self, file_path: str = PARAM_FILE) -> None: + """Create a parameter file and set default parameters.""" + default_params = { + 'Connect_Version': 2, + 'Pcb_Version': 1, + 'Pi_Version': self.get_raspberry_pi_version() + } + with open(file_path, 'w') as file: + json.dump(default_params, file, indent=4) + + def get_raspberry_pi_version(self) -> int: + """Get the version of the Raspberry Pi.""" + try: + result = subprocess.run(['cat', '/sys/firmware/devicetree/base/model'], capture_output=True, text=True) + if result.returncode == 0: + model = result.stdout.strip() + if "Raspberry Pi 5" in model: + return 2 + else: + return 1 + else: + print("Failed to get Raspberry Pi model information.") + return 1 + except Exception as e: + print(f"Error getting Raspberry Pi version: {e}") + return 1 + + def deal_with_param(self) -> None: + """Main function to manage parameter file.""" + if not self.file_exists() or not self.validate_params(): + print(f"Parameter file {self.PARAM_FILE} does not exist or contains invalid parameters.") + user_input_required = True + else: + user_choice = input("Do you want to re-enter the parameters? (yes/no): ").strip().lower() + user_input_required = user_choice == 'yes' + + if user_input_required: + connect_version = self.get_valid_input("Enter Connect Version (1 or 2): ", [1, 2]) + pcb_version = self.get_valid_input("Enter PCB Version (1 or 2): ", [1, 2]) + pi_version = self.get_raspberry_pi_version() + self.create_param_file() + self.set_param('Connect_Version', connect_version) + self.set_param('Pcb_Version', pcb_version) + self.set_param('Pi_Version', pi_version) + else: + print("Do not modify the hardware version. Skipping...") + + def get_valid_input(self, prompt: str, valid_values: list) -> any: + """Get valid input from the user.""" + while True: + try: + value = int(input(prompt)) + if value in valid_values: + return value + else: + print(f"Invalid input. Please enter one of {valid_values}.") + except ValueError: + print("Invalid input. Please enter a number.") + + def get_connect_version(self) -> int: + """Get the Connect version from the parameter file.""" + return self.get_param('Connect_Version') + + def get_pcb_version(self) -> int: + """Get the PCB version from the parameter file.""" + return self.get_param('Pcb_Version') + + def get_pi_version(self) -> int: + """Get the Raspberry Pi version from the parameter file.""" + return self.get_param('Pi_Version') + +if __name__ == '__main__': + # Entry point of the script + manager = ParameterManager() + manager.deal_with_param() + if manager.file_exists("params.json") and manager.validate_params("params.json"): + connect_version = manager.get_connect_version() + print(f"Connect Version: {connect_version}.0") + pcb_version = manager.get_pcb_version() + print(f"PCB Version: {pcb_version}.0") + pi_version = manager.get_pi_version() + print(f"Raspberry PI version is {'less than 5' if pi_version == 1 else '5'}.") \ No newline at end of file diff --git a/Code/Server_WebUI/pca9685.py b/Code/Server_WebUI/pca9685.py new file mode 100644 index 00000000..9ed8ed56 --- /dev/null +++ b/Code/Server_WebUI/pca9685.py @@ -0,0 +1,82 @@ +#!/usr/bin/python + +import time +import math +import smbus + +# ============================================================================ +# Raspi PCA9685 16-Channel PWM Servo Driver +# ============================================================================ + +class PCA9685: + # Registers/etc. + __SUBADR1 = 0x02 + __SUBADR2 = 0x03 + __SUBADR3 = 0x04 + __MODE1 = 0x00 + __PRESCALE = 0xFE + __LED0_ON_L = 0x06 + __LED0_ON_H = 0x07 + __LED0_OFF_L = 0x08 + __LED0_OFF_H = 0x09 + __ALLLED_ON_L = 0xFA + __ALLLED_ON_H = 0xFB + __ALLLED_OFF_L = 0xFC + __ALLLED_OFF_H = 0xFD + + def __init__(self, address: int = 0x40, debug: bool = False): + self.bus = smbus.SMBus(1) + self.address = address + self.debug = debug + self.write(self.__MODE1, 0x00) + + def write(self, reg: int, value: int) -> None: + """Writes an 8-bit value to the specified register/address.""" + self.bus.write_byte_data(self.address, reg, value) + + def read(self, reg: int) -> int: + """Read an unsigned byte from the I2C device.""" + result = self.bus.read_byte_data(self.address, reg) + return result + + def set_pwm_freq(self, freq: float) -> None: + """Sets the PWM frequency.""" + prescaleval = 25000000.0 # 25MHz + prescaleval /= 4096.0 # 12-bit + prescaleval /= float(freq) + prescaleval -= 1.0 + prescale = math.floor(prescaleval + 0.5) + + oldmode = self.read(self.__MODE1) + newmode = (oldmode & 0x7F) | 0x10 # sleep + self.write(self.__MODE1, newmode) # go to sleep + self.write(self.__PRESCALE, int(math.floor(prescale))) + self.write(self.__MODE1, oldmode) + time.sleep(0.005) + self.write(self.__MODE1, oldmode | 0x80) + + + def set_pwm(self, channel: int, on: int, off: int) -> None: + """Sets a single PWM channel.""" + self.write(self.__LED0_ON_L + 4 * channel, on & 0xFF) + self.write(self.__LED0_ON_H + 4 * channel, on >> 8) + self.write(self.__LED0_OFF_L + 4 * channel, off & 0xFF) + self.write(self.__LED0_OFF_H + 4 * channel, off >> 8) + def set_motor_pwm(self, channel: int, duty: int) -> None: + """Sets the PWM duty cycle for a motor.""" + self.set_pwm(channel, 0, duty) + + def set_servo_pulse(self, channel: int, pulse: float) -> None: + """Sets the Servo Pulse, The PWM frequency must be 50HZ.""" + pulse = pulse * 4096 / 20000 # PWM frequency is 50HZ, the period is 20000us + self.set_pwm(channel, 0, int(pulse)) + + def close(self) -> None: + """Close the I2C bus.""" + self.bus.close() + + +if __name__=='__main__': + pass + + diff --git a/Code/Server_WebUI/photoresistor.py b/Code/Server_WebUI/photoresistor.py new file mode 100644 index 00000000..93dfea93 --- /dev/null +++ b/Code/Server_WebUI/photoresistor.py @@ -0,0 +1,41 @@ +from adc import Adc +import time + +class Photoresistor: + def __init__(self): + """Initialize the Photoresistor class and create an Adc instance.""" + self.adc = Adc() + + def read_left_photoresistor(self) -> float: + """Read the value from the left photoresistor.""" + try: + return self.adc.recvADC(0) + except Exception as e: + print(f"Error reading left photoresistor: {e}") + return None + + def read_right_photoresistor(self) -> float: + """Read the value from the right photoresistor.""" + try: + return self.adc.recvADC(1) + except Exception as e: + print(f"Error reading right photoresistor: {e}") + return None + + def stop(self) -> None: + """Close the I2C bus.""" + self.adc.i2cClose() + +if __name__ == '__main__': + print('Program is starting ... ') + photoresistor = Photoresistor() + try: + while True: + left_value = photoresistor.read_left_photoresistor() + right_value = photoresistor.read_right_photoresistor() + if left_value is not None and right_value is not None: + print(f"The photoresistor L is {left_value}V, R is {right_value}V") + time.sleep(0.3) + except KeyboardInterrupt: # When 'Ctrl+C' is pressed, the program will be stopped. + print('\nProgram is stopped! ') + photoresistor.stop() \ No newline at end of file diff --git a/Code/Server_WebUI/readme/Kapbotics_WebUI_Layout_for_FNK0043.jpg b/Code/Server_WebUI/readme/Kapbotics_WebUI_Layout_for_FNK0043.jpg new file mode 100644 index 00000000..5ae84041 Binary files /dev/null and b/Code/Server_WebUI/readme/Kapbotics_WebUI_Layout_for_FNK0043.jpg differ diff --git a/Code/Server_WebUI/readme/README b/Code/Server_WebUI/readme/README new file mode 100644 index 00000000..5fb1f64a --- /dev/null +++ b/Code/Server_WebUI/readme/README @@ -0,0 +1,71 @@ +# BEFORE YOU INSTALL/USE THE WebUI APP +1. Clone the Repo "Freenove_4WD_Smart_Car_Kit_for_Raspberry_Pi" +2. Make sure you have built the car according to FNK0043 instructions: Tutorial(ordinary wheels).pdf, + available at the https://docs.freenove.com/projects/fnk0066/en/latest/index.html#download), +3. You can launch successfully from the console the script "main.py located in the folder "Code/Server" (as described at page 95 of the tutorial) +4. Note the local address oy the RaspberryPi, (e.g. 192.164.168.30, assigned in your local WiFi network) + +# HOW TO USE THE WebUI APP +1. From the console, navigate to the folder "Code/Server_WebUI" +2. Launch the script "main_web.py" +3. On your Tablet / PC / Handy open the browser and navigate to: :8080 +4. the WebUI will be displayed. + +# WEB-UI CONCEPT ARCHITECTURE +The FreeKap 4WD web controller is a browser-based control panel for a Raspberry Pi 4WD robot. +The main idea: + ++ The browser shows the UI (video, drive pad, servos, telemetry). ++ The Raspberry Pi runs a Python HTTP server. ++ The Python server talks to the robot hardware (motors, servos, sensors, camera, buzzer). + +All communication between the browser and the robot happens over HTTP (GET/POST). + ++------------------------------+ +| Browser UI | +| (PC / tablet / phone) | +| | +| - index.html, app.js | +| - Video, drive pad, servos | +| - Telemetry, BEEP, RESET | ++--------------+---------------+ + | + | HTTP GET: + | / (HTML, CSS, JS) + | /video/mjpeg (MJPEG stream) + | /api/telemetry (JSON telemetry) + | + | HTTP POST: + | /api/drive/mecanum (wheel speeds) + | /api/servo (servo angle) + | /api/buzzer (beep) + v ++----------------------------------+ +| HTTP Server (server.py) | +| WebAppHandler / main_web | ++---------+-----------+----------+-+ + | | + | | + v v ++-------------------+ +------------------------+ +| Camera | | Robot Control | +| (libcamera) | | (Car, Buzzer, sensors) | +| - provides JPEG | | - motors, servos | +| frames | | - sonar, light, IR | ++-------------------+ | - buzzer, LEDs | + +------------------------+ + +# REMOVED FEATURES +- UI/CAR: Control of the Leds +- UI/CAR: Control of the Line Follower sensor + +# NEW FEATURES +- UI: WebUI removes the need of a Client running on a PC +- UI: Use of "HOLD" + direction buttons (FWRD, REAR, RIGHT, LEFT) will hold the direction command until "STOP" is pressed +- UI: Simpler & useful telemetry data +- CAR: Automatic speed reduction based on distance from obstacle (as measured by the sonar) +- CAR: Hard brake with automatic buzzer beeping when distance to obstacle <30 cm + + + + diff --git a/Code/Server_WebUI/rpi_ledpixel.py b/Code/Server_WebUI/rpi_ledpixel.py new file mode 100644 index 00000000..969ba2f8 --- /dev/null +++ b/Code/Server_WebUI/rpi_ledpixel.py @@ -0,0 +1,193 @@ +import time +from rpi_ws281x import Adafruit_NeoPixel, Color + +class Freenove_RPI_WS281X: + def __init__(self, led_count: int = 4, brightness: int = 255, sequence: str = "RGB"): + """Initialize the LED strip with default parameters.""" + self.set_led_type(sequence) + self.set_led_count(led_count) + self.set_led_brightness(brightness) + self.led_begin() + self.set_all_led_color(0, 0, 0) + + def led_begin(self) -> None: + """Initialize the NeoPixel strip.""" + self.strip = Adafruit_NeoPixel( + self.get_led_count(), 18, 800000, 10, False, self.led_brightness, 0 + ) + self.led_init_state = 0 if self.strip.begin() else 1 + + def check_rpi_ws281x_state(self) -> int: + """Check the initialization state of the NeoPixel strip.""" + return self.led_init_state + + def led_close(self) -> None: + """Turn off all LEDs.""" + self.set_all_led_rgb([0, 0, 0]) + + def set_led_count(self, count: int) -> None: + """Set the number of LEDs in the strip.""" + self.led_count = count + self.led_color = [0, 0, 0] * self.led_count + self.led_original_color = [0, 0, 0] * self.led_count + + def get_led_count(self) -> int: + """Get the number of LEDs in the strip.""" + return self.led_count + + def set_led_type(self, rgb_type: str) -> int: + """Set the RGB sequence type for the LEDs.""" + try: + led_type = ['RGB', 'RBG', 'GRB', 'GBR', 'BRG', 'BGR'] + led_type_offset = [0x06, 0x09, 0x12, 0x21, 0x18, 0x24] + index = led_type.index(rgb_type) + self.led_red_offset = (led_type_offset[index] >> 4) & 0x03 + self.led_green_offset = (led_type_offset[index] >> 2) & 0x03 + self.led_blue_offset = (led_type_offset[index] >> 0) & 0x03 + return index + except ValueError: + self.led_red_offset = 1 + self.led_green_offset = 0 + self.led_blue_offset = 2 + return -1 + + def set_led_brightness(self, brightness: int) -> None: + """Set the brightness of the LEDs.""" + self.led_brightness = brightness + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, self.led_original_color) + + def set_led_pixel(self, index: int, r: int, g: int, b: int) -> None: + """Set the color of a specific LED.""" + p = [0, 0, 0] + p[self.led_red_offset] = round(r * self.led_brightness / 255) + p[self.led_green_offset] = round(g * self.led_brightness / 255) + p[self.led_blue_offset] = round(b * self.led_brightness / 255) + self.led_original_color[index * 3 + self.led_red_offset] = r + self.led_original_color[index * 3 + self.led_green_offset] = g + self.led_original_color[index * 3 + self.led_blue_offset] = b + for i in range(3): + self.led_color[index * 3 + i] = p[i] + + def set_led_color_data(self, index: int, r: int, g: int, b: int) -> None: + """Set the color data of a specific LED.""" + self.set_led_pixel(index, r, g, b) + + def set_led_rgb_data(self, index: int, color: list) -> None: + """Set the RGB data of a specific LED.""" + self.set_led_pixel(index, color[0], color[1], color[2]) + + def set_led_color(self, index: int, r: int, g: int, b: int) -> None: + """Set the color of a specific LED and update the display.""" + self.set_led_pixel(index, r, g, b) + self.show() + + def set_led_rgb(self, index: int, color: list) -> None: + """Set the RGB color of a specific LED and update the display.""" + self.set_led_rgb_data(index, color) + self.show() + + def set_all_led_color_data(self, r: int, g: int, b: int) -> None: + """Set the color data of all LEDs.""" + for i in range(self.get_led_count()): + self.set_led_color_data(i, r, g, b) + + def set_all_led_rgb_data(self, color: list) -> None: + """Set the RGB data of all LEDs.""" + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, color) + + def set_all_led_color(self, r: int, g: int, b: int) -> None: + """Set the color of all LEDs and update the display.""" + for i in range(self.get_led_count()): + self.set_led_color_data(i, r, g, b) + self.show() + + def set_all_led_rgb(self, color: list) -> None: + """Set the RGB color of all LEDs and update the display.""" + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, color) + self.show() + + def show(self) -> None: + """Update the LED strip with the current color data.""" + for i in range(self.get_led_count()): + self.strip.setPixelColor( + i, Color(self.led_color[i * 3], self.led_color[i * 3 + 1], self.led_color[i * 3 + 2]) + ) + self.strip.show() + + def wheel(self, pos: int) -> list: + """Generate a color wheel value based on the position.""" + if pos < 85: + return [255 - pos * 3, pos * 3, 0] + elif pos < 170: + pos -= 85 + return [0, 255 - pos * 3, pos * 3] + else: + pos -= 170 + return [pos * 3, 0, 255 - pos * 3] + + def hsv2rgb(self, h: int, s: int, v: int) -> list: + """Convert HSV to RGB.""" + h = h % 360 + rgb_max = round(v * 2.55) + rgb_min = round(rgb_max * (100 - s) / 100) + i = round(h / 60) + diff = round(h % 60) + rgb_adj = round((rgb_max - rgb_min) * diff / 60) + if i == 0: + r = rgb_max + g = rgb_min + rgb_adj + b = rgb_min + elif i == 1: + r = rgb_max - rgb_adj + g = rgb_max + b = rgb_min + elif i == 2: + r = rgb_min + g = rgb_max + b = rgb_min + rgb_adj + elif i == 3: + r = rgb_min + g = rgb_max - rgb_adj + b = rgb_max + elif i == 4: + r = rgb_min + rgb_adj + g = rgb_min + b = rgb_max + else: + r = rgb_max + g = rgb_min + b = rgb_max - rgb_adj + return [r, g, b] + +if __name__ == '__main__': + led = FreenoveRPIWS281X(4, 255, "RGB") + try: + if led.check_rpi_ws281x_state() != 0: + led.set_led_count(4) + led.set_all_led_color_data(255, 0, 0) + led.show() + time.sleep(0.5) + led.set_all_led_rgb_data([0, 255, 0]) + led.show() + time.sleep(0.5) + led.set_all_led_color(0, 0, 255) + time.sleep(0.5) + led.set_all_led_rgb([255, 255, 255]) + time.sleep(0.5) + led.set_all_led_rgb([0, 0, 0]) + time.sleep(0.5) + + led.set_led_brightness(20) + while True: + for j in range(255): + for i in range(led.get_led_count()): + led.set_led_rgb_data(i, led.wheel((round(i * 255 / led.get_led_count()) + j) % 256)) + led.show() + time.sleep(0.002) + else: + led.led_close() + except KeyboardInterrupt: + led.led_close() \ No newline at end of file diff --git a/Code/Server_WebUI/server.py b/Code/Server_WebUI/server.py new file mode 100644 index 00000000..0c2fdf43 --- /dev/null +++ b/Code/Server_WebUI/server.py @@ -0,0 +1,594 @@ +#!/usr/bin/env python3 + +import socket # For network communication +import fcntl # For I/O control (Linux-specific) +import struct # For packing/unpacking data +import threading # To run HTTP server alongside TCP servers (if needed) +import queue # For type hints and queue handling +import json # For JSON encode/decode +import urllib.parse # For parsing request paths +import time # For buzzer beep timing +from http.server import SimpleHTTPRequestHandler +from socketserver import ThreadingTCPServer +from pathlib import Path + +from tcp_server import TCPServer # Your existing TCP server class + +# ----------------------------------------------------------------------------- +# Configuration +# ----------------------------------------------------------------------------- + +WEB_ROOT = Path(__file__).resolve().parent / "webapp" + +HTTP_PORT = 8080 +TELMY_PORT = 5000 +VIDEO_PORT = 8000 + +# ----------------------------------------------------------------------------- +# Custom ThreadingTCPServer with address reuse +# ----------------------------------------------------------------------------- + +class WebThreadingTCPServer(ThreadingTCPServer): + # Allow quick restart on the same port after Ctrl+C + allow_reuse_address = True + +# ----------------------------------------------------------------------------- +# Shared application context +# Populated from main.py or main_web.py via set_app_context(...) +# ----------------------------------------------------------------------------- + +APP_CONTEXT = { + "car": None, + "camera": None, + "tcp_server": None, + "led": None, + "buzzer": None, + "sensors": { + "sonic": True, + "light": True, + }, + "mode": None, + "led_mode": 0, +} + + +def set_app_context(car=None, camera=None, tcp_server=None, led=None, buzzer=None): + """ + Called from main.py or main_web.py to give the HTTP handler access + to Car/Camera/Server/etc. Any argument can be None; we only overwrite + what is provided. + """ + global APP_CONTEXT + if car is not None: + APP_CONTEXT["car"] = car + if camera is not None: + APP_CONTEXT["camera"] = camera + if tcp_server is not None: + APP_CONTEXT["tcp_server"] = tcp_server + if led is not None: + APP_CONTEXT["led"] = led + if buzzer is not None: + APP_CONTEXT["buzzer"] = buzzer + +# ----------------------------------------------------------------------------- +# HTTP handler: static web UI + API + video stream +# ----------------------------------------------------------------------------- + +class WebAppHandler(SimpleHTTPRequestHandler): + """HTTP handler serving static files from WEB_ROOT and handling /api/* & /video/*.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=str(WEB_ROOT), **kwargs) + + def log_message(self, format, *args): + print(f"[HTTP] {self.address_string()} - {format % args}") + + # ------------------------------------------------------------------------- + # Helper: JSON response + # ------------------------------------------------------------------------- + def _send_json(self, status_code: int, payload: dict): + data = json.dumps(payload).encode("utf-8") + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + # ------------------------------------------------------------------------- + # Helper: telemetry snapshot from car + # ------------------------------------------------------------------------- + def _get_telemetry_snapshot(self): + """ + Build a telemetry snapshot from APP_CONTEXT['car']. + Returns a dict or raises RuntimeError if car is missing. + """ + car = APP_CONTEXT.get("car") + if car is None: + raise RuntimeError("Car object not available") + + try: + adc = car.adc + sonic = car.sonic + except AttributeError: + raise RuntimeError("Car object missing adc/sonic attributes") + + # Power / voltage + try: + raw_power = adc.read_adc(2) + factor = 3 if getattr(adc, "pcb_version", 1) == 1 else 2 + voltage = raw_power * factor + except Exception: + voltage = 0.0 + + # Distance + try: + distance = sonic.get_distance() + except Exception: + distance = 0.0 + + # Light sensors + try: + light_left = adc.read_adc(0) + light_right = adc.read_adc(1) + except Exception: + light_left = 0.0 + light_right = 0.0 + + power_pct = max(0, min(100, int((voltage / 12.0) * 100))) # rough guess + + return { + "power": power_pct, + "voltage": round(voltage, 2), + "sonic": round(distance, 2), + "light_left": round(light_left, 2), + "light_right": round(light_right, 2), + "status": "OK", + } + + # ------------------------------------------------------------------------- + # GET handler + # ------------------------------------------------------------------------- + def do_GET(self): + parsed = urllib.parse.urlparse(self.path) + path = parsed.path + + # Simple health check + if path == "/api/health": + self._send_json(200, {"status": "ok"}) + return + + # Telemetry snapshot (used by app.js polling) + if path == "/api/telemetry": + try: + snapshot = self._get_telemetry_snapshot() + self._send_json(200, snapshot) + except RuntimeError as e: + self._send_json(503, {"error": str(e)}) + return + + # MJPEG video stream + if path.startswith("/video/mjpeg"): + camera = APP_CONTEXT.get("camera") + if camera is None: + self.send_error(503, "Camera not available") + return + + boundary = "FRAME" + self.send_response(200) + self.send_header("Age", "0") + self.send_header("Cache-Control", "no-cache, private") + self.send_header("Pragma", "no-cache") + self.send_header("Content-Type", f"multipart/x-mixed-replace; boundary={boundary}") + self.end_headers() + + print("[VIDEO] MJPEG client connected") + + try: + try: + camera.start_stream() + except Exception as e: + print(f"[VIDEO] camera.start_stream() error: {e}") + + while True: + frame = camera.get_frame() + if not frame: + continue + + try: + self.wfile.write(b"--" + boundary.encode("ascii") + b"\r\n") + self.wfile.write(b"Content-Type: image/jpeg\r\n") + self.wfile.write( + b"Content-Length: " + str(len(frame)).encode("ascii") + b"\r\n\r\n" + ) + self.wfile.write(frame) + self.wfile.write(b"\r\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError): + print("[VIDEO] MJPEG client disconnected") + break + except Exception as e: + print(f"[VIDEO] stream error: {e}") + break + finally: + try: + camera.stop_stream() + except Exception: + pass + + return + + # Fallback: static files (index.html, app.js, styles.css, ...) + return super().do_GET() + + # ------------------------------------------------------------------------- + # POST handler: API endpoints + # ------------------------------------------------------------------------- + def do_POST(self): + parsed = urllib.parse.urlparse(self.path) + path = parsed.path + + length = int(self.headers.get("Content-Length", "0") or "0") + body = self.rfile.read(length) if length > 0 else b"{}" + + try: + payload = json.loads(body.decode("utf-8") or "{}") + except Exception: + payload = {} + + print(f"[API] {path} payload: {payload}") + + # ===== API: DRIVE (mecanum) ========================================== + if path == "/api/drive/mecanum": + car = APP_CONTEXT.get("car") + if car is None or getattr(car, "motor", None) is None: + self._send_json(503, {"error": "Car motor not available"}) + return + + try: + fl = int(payload.get("fl", 0)) + fr = int(payload.get("fr", 0)) + bl = int(payload.get("bl", 0)) + br = int(payload.get("br", 0)) + except Exception: + self._send_json(400, {"error": "Invalid drive payload"}) + return + + try: + # Freenove order: left front, left rear, right front, right rear + car.motor.set_motor_model(fl, bl, fr, br) + except Exception as e: + print(f"[API] drive error: {e}") + self._send_json(500, {"error": str(e)}) + return + + self._send_json(200, { + "ok": True, + "action": "drive_mecanum", + "fl": fl, "fr": fr, "bl": bl, "br": br, + }) + return + + # ===== API: SERVO ===================================================== + if path == "/api/servo": + car = APP_CONTEXT.get("car") + if car is None or getattr(car, "servo", None) is None: + self._send_json(503, {"error": "Servo controller not available"}) + return + + try: + channel = int(payload.get("channel", 0)) + angle = int(payload.get("angle", 90)) + except Exception: + self._send_json(400, {"error": "Invalid servo payload"}) + return + + try: + # original Freenove code uses string channel + car.servo.set_servo_pwm(str(channel), angle) + except Exception as e: + print(f"[API] servo error: {e}") + self._send_json(500, {"error": str(e)}) + return + + self._send_json(200, { + "ok": True, + "action": "servo", + "channel": channel, + "angle": angle, + }) + return + + # ===== API: LEDS (mask + RGB) ======================================== + if path == "/api/leds": + led = APP_CONTEXT.get("led") + mask = int(payload.get("mask", 0)) + r = int(payload.get("r", 0)) + g = int(payload.get("g", 0)) + b = int(payload.get("b", 0)) + + if led is None: + # Just acknowledge so the UI shows success even without hardware + self._send_json(200, { + "ok": True, + "action": "leds_stub_no_hw", + "mask": mask, + "rgb": [r, g, b], + }) + return + + # Simple implementation: set each LED in the mask to the same color + try: + # Assuming Led.ledIndex(index, r, g, b) with 1-based index + for idx in range(8): + if mask & (1 << idx): + led.ledIndex(idx + 1, r, g, b) + except Exception as e: + print(f"[API] leds error: {e}") + self._send_json(500, {"error": str(e)}) + return + + self._send_json(200, { + "ok": True, + "action": "leds", + "mask": mask, + "rgb": [r, g, b], + }) + return + + # ===== API: LED MODE (simple state, hook for your LED process) ======== + if path == "/api/leds/mode": + mode = int(payload.get("mode", 0)) + APP_CONTEXT["led_mode"] = mode + led = APP_CONTEXT.get("led") + + if led is None: + self._send_json(200, { + "ok": True, + "action": "leds_mode_stub_no_hw", + "mode": mode, + }) + return + + # Here we avoid long blocking patterns; you can integrate this + # with your process_led_running via APP_CONTEXT["led_mode"]. + try: + if mode == 0: + # Off + led.colorBlink(0) + elif mode == 1: + # Simple blink / placeholder + led.colorBlink(1) + # Modes 2..4 left as TODO for your own patterns + except Exception as e: + print(f"[API] leds mode error: {e}") + self._send_json(500, {"error": str(e)}) + return + + self._send_json(200, { + "ok": True, + "action": "leds_mode", + "mode": mode, + }) + return + + # ===== API: BUZZER (1 second beep) =================================== + if path == "/api/buzzer": + buzzer = APP_CONTEXT.get("buzzer") + if buzzer is None: + self._send_json(503, {"error": "Buzzer not available"}) + return + + # Ignore payload; always produce a 1-second beep + try: + buzzer.set_state(1) + time.sleep(1.0) # 1 second beep + buzzer.set_state(0) + except Exception as e: + print(f"[API] buzzer error: {e}") + self._send_json(500, {"error": str(e)}) + return + + self._send_json(200, { + "ok": True, + "action": "buzzer", + "beep": True, + "duration_ms": 1000, + }) + return + + # ===== API: MODE (car high-level mode; stored only for now) ========== + if path == "/api/mode": + mode = payload.get("mode") + APP_CONTEXT["mode"] = mode + # You can integrate this with car_mode in main.py if desired. + self._send_json(200, {"ok": True, "action": "mode", "mode": mode}) + return + + # ===== API: SENSORS TOGGLE (stores flags) ============================ + if path.startswith("/api/sensors/"): + sensor_name = path.rsplit("/", 1)[-1] + enabled = bool(payload.get("enabled", True)) + APP_CONTEXT["sensors"][sensor_name] = enabled + self._send_json( + 200, + { + "ok": True, + "action": "sensor", + "sensor": sensor_name, + "enabled": enabled, + }, + ) + return + + # ===== API: RAW ====================================================== + if path == "/api/raw": + cmd = payload.get("command", "") + # For now, just echo. You can route this into your existing + # command-processing stack if you want complete legacy compatibility. + self._send_json( + 200, + { + "ok": True, + "action": "raw", + "echo": cmd, + }, + ) + return + + # Unknown API + self.send_error(404, f"Unknown API endpoint: {path}") + + +# ----------------------------------------------------------------------------- +# HTTP server startup +# ----------------------------------------------------------------------------- + +def start_http_server(port: int = HTTP_PORT) -> None: + """Start a simple multithreaded HTTP server for the web UI.""" + with WebThreadingTCPServer(("", port), WebAppHandler) as httpd: + print(f"[HTTP] Serving {WEB_ROOT} on http://0.0.0.0:{port}/") + httpd.serve_forever() + +# ----------------------------------------------------------------------------- +# Existing robot TCP server logic (for legacy TCP clients, ports 5000/8000) +# ----------------------------------------------------------------------------- + +class Server: + def __init__(self): + """Initialize the Server class (command + video TCP servers).""" + self.ip_address = self.get_interface_ip() + self.command_server = TCPServer() + self.video_server = TCPServer() + self.command_server_is_busy = False + self.video_server_is_busy = False + + def get_interface_ip(self) -> str: + """Get the IP address of the wlan0 interface.""" + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + ip = socket.inet_ntoa(fcntl.ioctl( + s.fileno(), + 0x8915, + struct.pack('256s', b'wlan0'[:15]) + )[20:24]) + return ip + except Exception as e: + print(f"Error getting IP address: {e}") + return "127.0.0.1" + + def start_tcp_servers( + self, + command_port: int = TELMY_PORT, + video_port: int = VIDEO_PORT, + max_clients: int = 1, + listen_count: int = 1 + ) -> None: + """Start the TCP servers on specified ports.""" + try: + print(f"[TCP] Starting command server on {self.ip_address}:{command_port}") + self.command_server.start(self.ip_address, command_port, max_clients, listen_count) + + print(f"[TCP] Starting video server on {self.ip_address}:{video_port}") + self.video_server.start(self.ip_address, video_port, max_clients, listen_count) + except Exception as e: + print(f"Error starting TCP servers: {e}") + + def stop_tcp_servers(self) -> None: + """Stop the TCP servers.""" + try: + print("[TCP] Stopping TCP servers...") + self.command_server.close() + self.video_server.close() + except Exception as e: + print(f"Error stopping TCP servers: {e}") + + def set_command_server_busy(self, state: bool) -> None: + self.command_server_is_busy = state + + def set_video_server_busy(self, state: bool) -> None: + self.video_server_is_busy = state + + def get_command_server_busy(self) -> bool: + return self.command_server_is_busy + + def get_video_server_busy(self) -> bool: + return self.video_server_is_busy + + def send_data_to_command_client(self, data: bytes, ip_address: str = None) -> None: + self.set_command_server_busy(True) + try: + if ip_address is not None: + self.command_server.send_to_client(ip_address, data) + else: + self.command_server.send_to_all_client(data) + except Exception as e: + print(e) + finally: + self.set_command_server_busy(False) + + def send_data_to_video_client(self, data: bytes, ip_address: str = None) -> None: + self.set_video_server_busy(True) + try: + if ip_address is not None: + self.video_server.send_to_client(ip_address, data) + else: + self.video_server.send_to_all_client(data) + finally: + self.set_video_server_busy(False) + + def read_data_from_command_server(self) -> 'queue.Queue': + return self.command_server.message_queue + + def read_data_from_video_server(self) -> 'queue.Queue': + return self.video_server.message_queue + + def is_command_server_connected(self) -> bool: + return self.command_server.active_connections > 0 + + def is_video_server_connected(self) -> bool: + return self.video_server.active_connections > 0 + + def get_command_server_client_ips(self) -> list: + return self.command_server.get_client_ips() + + def get_video_server_client_ips(self) -> list: + return self.video_server.get_client_ips() + + +# ----------------------------------------------------------------------------- +# Standalone mode (optional: if you ever run server.py directly) +# ----------------------------------------------------------------------------- + +if __name__ == '__main__': + print('Program is starting ... ') + server = Server() + server.start_tcp_servers(command_port=TELMY_PORT, video_port=VIDEO_PORT) + + http_thread = threading.Thread( + target=start_http_server, + kwargs={"port": HTTP_PORT}, + daemon=True + ) + http_thread.start() + + print(f"[MAIN] Command server on {server.ip_address}:{TELMY_PORT}") + print(f"[MAIN] Video server on {server.ip_address}:{VIDEO_PORT}") + print(f"[MAIN] Web UI on http://{server.ip_address}:{HTTP_PORT}/") + + try: + while True: + cmd_queue = server.read_data_from_command_server() + if cmd_queue.qsize() > 0: + client_address, message = cmd_queue.get() + print("[CMD]", client_address, message) + server.send_data_to_command_client(message, client_address) + + video_queue = server.read_data_from_video_server() + if video_queue.qsize() > 0: + client_address, message = video_queue.get() + print("[VIDEO]", client_address, "") + server.send_data_to_video_client(message, client_address) + + except KeyboardInterrupt: + print("\nReceived interrupt signal, stopping server...") + server.stop_tcp_servers() diff --git a/Code/Server_WebUI/server_ui.py b/Code/Server_WebUI/server_ui.py new file mode 100644 index 00000000..34fd50f0 --- /dev/null +++ b/Code/Server_WebUI/server_ui.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file +# +# Created by: PyQt5 UI code generator 5.11.3 +# +# WARNING! All changes made in this file will be lost! + +from PyQt5 import QtCore, QtGui, QtWidgets + +class Ui_server_ui(object): + def setupUi(self, server_ui): + server_ui.setObjectName("server_ui") + server_ui.resize(300, 200) + font = QtGui.QFont() + font.setFamily("3ds") + server_ui.setFont(font) + server_ui.setStyleSheet("QWidget{\n" +"background:#484848;\n" +"}\n" +"QAbstractButton{\n" +"border-style:none;\n" +"border-radius:0px;\n" +"padding:5px;\n" +"color:#DCDCDC;\n" +"background:qlineargradient(spread:pad,x1:0,y1:0,x2:0,y2:1,stop:0 #484848,stop:1 #383838);\n" +"}\n" +"QAbstractButton:hover{\n" +"color:#FFFFFF;\n" +"background-color:#00BB9E;\n" +"}\n" +"QAbstractButton:pressed{\n" +"color:#DCDCDC;\n" +"border-style:solid;\n" +"border-width:0px 0px 0px 2px;\n" +"padding:4px 4px 4px 2px;\n" +"border-color:#00BB9E;\n" +"background-color:#444444;\n" +"}\n" +"QLabel{\n" +"color:#DCDCDC;\n" +"border:1px solid #484848;\n" +"background:qlineargradient(spread:pad,x1:0,y1:0,x2:0,y2:1,stop:0 #484848,stop:1 #383838);\n" +"}\n" +"QLabel:focus{\n" +"border:1px solid #00BB9E;\n" +"background:qlineargradient(spread:pad,x1:0,y1:0,x2:0,y2:1,stop:0 #646464,stop:1 #525252);\n" +"}\n" +"QLineEdit{\n" +"border:1px solid #242424;\n" +"border-radius:3px;\n" +"padding:2px;\n" +"background:none;\n" +"selection-background-color:#484848;\n" +"selection-color:#DCDCDC;\n" +"}\n" +"QLineEdit:focus,QLineEdit:hover{\n" +"border:1px solid #242424;\n" +"}\n" +"QLineEdit{\n" +"border:1px solid #242424;\n" +"border-radius:3px;\n" +"padding:2px;\n" +"background:none;\n" +"selection-background-color:#484848;\n" +"selection-color:#DCDCDC;\n" +"}\n" +"\n" +"QLineEdit:focus,QLineEdit:hover{\n" +"border:1px solid #242424;\n" +"}\n" +"QLineEdit{\n" +"lineedit-password-character:9679;\n" +"}") + self.label = QtWidgets.QLabel(server_ui) + self.label.setGeometry(QtCore.QRect(50, 50, 200, 42)) + font = QtGui.QFont() + font.setFamily("3ds") + font.setPointSize(26) + self.label.setFont(font) + self.label.setObjectName("label") + self.Button_Server = QtWidgets.QPushButton(server_ui) + self.Button_Server.setGeometry(QtCore.QRect(100, 120, 100, 40)) + font = QtGui.QFont() + font.setFamily("3ds") + font.setPointSize(12) + self.Button_Server.setFont(font) + self.Button_Server.setObjectName("Button_Server") + + self.retranslateUi(server_ui) + QtCore.QMetaObject.connectSlotsByName(server_ui) + + def retranslateUi(self, server_ui): + _translate = QtCore.QCoreApplication.translate + server_ui.setWindowTitle(_translate("server_ui", "Freenove")) + self.label.setText(_translate("server_ui", "Server Off")) + self.Button_Server.setText(_translate("server_ui", "On")) + diff --git a/Code/Server_WebUI/servo.py b/Code/Server_WebUI/servo.py new file mode 100644 index 00000000..673d2ff5 --- /dev/null +++ b/Code/Server_WebUI/servo.py @@ -0,0 +1,41 @@ +from pca9685 import PCA9685 + +class Servo: + def __init__(self): + self.pwm_frequency = 50 + self.initial_pulse = 1500 + self.pwm_channel_map = { + '0': 8, + '1': 9, + '2': 10, + '3': 11, + '4': 12, + '5': 13, + '6': 14, + '7': 15 + } + self.pwm_servo = PCA9685(0x40, debug=True) + self.pwm_servo.set_pwm_freq(self.pwm_frequency) + for channel in self.pwm_channel_map.values(): + self.pwm_servo.set_servo_pulse(channel, self.initial_pulse) + + def set_servo_pwm(self, channel: str, angle: int, error: int = 10) -> None: + angle = int(angle) + if channel not in self.pwm_channel_map: + raise ValueError(f"Invalid channel: {channel}. Valid channels are {list(self.pwm_channel_map.keys())}.") + pulse = 2500 - int((angle + error) / 0.09) if channel == '0' else 500 + int((angle + error) / 0.09) + self.pwm_servo.set_servo_pulse(self.pwm_channel_map[channel], pulse) + +# Main program logic follows: +if __name__ == '__main__': + print("Now servos will rotate to 90 degree.") + print("If they have already been at 90 degree, nothing will be observed.") + print("Please keep the program running when installing the servos.") + print("After that, you can press ctrl-C to end the program.") + pwm_servo = Servo() + try: + while True: + pwm_servo.set_servo_pwm('0', 90) + pwm_servo.set_servo_pwm('1', 90) + except KeyboardInterrupt: + print("\nEnd of program") \ No newline at end of file diff --git a/Code/Server_WebUI/spi_ledpixel.py b/Code/Server_WebUI/spi_ledpixel.py new file mode 100644 index 00000000..155f8e86 --- /dev/null +++ b/Code/Server_WebUI/spi_ledpixel.py @@ -0,0 +1,299 @@ +# Import necessary modules +import spidev +import numpy + +# Define the Freenove_SPI_LedPixel class +class Freenove_SPI_LedPixel(object): + def __init__(self, count=8, bright=255, sequence='GRB', bus=0, device=0): + # Initialize LED type + self.set_led_type(sequence) + # Set the number of LEDs + self.set_led_count(count) + # Set the brightness of the LEDs + self.set_led_brightness(bright) + # Initialize the SPI connection + self.led_begin(bus, device) + # Set all LEDs to off + self.set_all_led_color(0, 0, 0) + + def led_begin(self, bus=0, device=0): + # Store the bus and device numbers + self.bus = bus + self.device = device + try: + # Initialize the SPI device + self.spi = spidev.SpiDev() + self.spi.open(self.bus, self.device) + self.spi.mode = 0 + # Set initialization state to success + self.led_init_state = 1 + except OSError: + # Handle SPI initialization errors + print("Please check the configuration in /boot/firmware/config.txt.") + if self.bus == 0: + print("You can turn on the 'SPI' in 'Interface Options' by using 'sudo raspi-config'.") + print("Or make sure that 'dtparam=spi=on' is not commented, then reboot the Raspberry Pi. Otherwise spi0 will not be available.") + else: + print("Please add 'dtoverlay=spi{}-2cs' at the bottom of the /boot/firmware/config.txt, then reboot the Raspberry Pi. Otherwise spi{} will not be available.".format(self.bus, self.bus)) + # Set initialization state to failure + self.led_init_state = 0 + + def check_spi_state(self): + # Return the current SPI initialization state + return self.led_init_state + + def spi_gpio_info(self): + # Print the GPIO pin information for the specified SPI bus + if self.bus == 0: + print("SPI0-MOSI: GPIO10(WS2812-PIN) SPI0-MISO: GPIO9 SPI0-SCLK: GPIO11 SPI0-CE0: GPIO8 SPI0-CE1: GPIO7") + elif self.bus == 1: + print("SPI1-MOSI: GPIO20(WS2812-PIN) SPI1-MISO: GPIO19 SPI1-SCLK: GPIO21 SPI1-CE0: GPIO18 SPI1-CE1: GPIO17 SPI0-CE1: GPIO16") + elif self.bus == 2: + print("SPI2-MOSI: GPIO41(WS2812-PIN) SPI2-MISO: GPIO40 SPI2-SCLK: GPIO42 SPI2-CE0: GPIO43 SPI2-CE1: GPIO44 SPI2-CE1: GPIO45") + elif self.bus == 3: + print("SPI3-MOSI: GPIO2(WS2812-PIN) SPI3-MISO: GPIO1 SPI3-SCLK: GPIO3 SPI3-CE0: GPIO0 SPI3-CE1: GPIO24") + elif self.bus == 4: + print("SPI4-MOSI: GPIO6(WS2812-PIN) SPI4-MISO: GPIO5 SPI4-SCLK: GPIO7 SPI4-CE0: GPIO4 SPI4-CE1: GPIO25") + elif self.bus == 5: + print("SPI5-MOSI: GPIO14(WS2812-PIN) SPI5-MISO: GPIO13 SPI5-SCLK: GPIO15 SPI5-CE0: GPIO12 SPI5-CE1: GPIO26") + elif self.bus == 6: + print("SPI6-MOSI: GPIO20(WS2812-PIN) SPI6-MISO: GPIO19 SPI6-SCLK: GPIO21 SPI6-CE0: GPIO18 SPI6-CE1: GPIO27") + + def led_close(self): + # Turn off all LEDs and close the SPI connection + self.set_all_led_rgb([0, 0, 0]) + self.spi.close() + + def set_led_count(self, count): + # Set the number of LEDs + self.led_count = count + # Initialize the color arrays + self.led_color = [0, 0, 0] * self.led_count + self.led_original_color = [0, 0, 0] * self.led_count + + def get_led_count(self): + # Return the number of LEDs + return self.led_count + + def set_led_type(self, rgb_type): + # Set the LED color sequence (RGB, GRB, etc.) + try: + led_type = ['RGB', 'RBG', 'GRB', 'GBR', 'BRG', 'BGR'] + led_type_offset = [0x06, 0x09, 0x12, 0x21, 0x18, 0x24] + index = led_type.index(rgb_type) + self.led_red_offset = (led_type_offset[index] >> 4) & 0x03 + self.led_green_offset = (led_type_offset[index] >> 2) & 0x03 + self.led_blue_offset = (led_type_offset[index] >> 0) & 0x03 + return index + except ValueError: + self.led_red_offset = 1 + self.led_green_offset = 0 + self.led_blue_offset = 2 + return -1 + + def set_led_brightness(self, brightness): + # Set the brightness of all LEDs + self.led_brightness = brightness + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, self.led_original_color) + + def set_ledpixel(self, index, r, g, b): + # Set the color of a specific LED + p = [0, 0, 0] + p[self.led_red_offset] = round(r * self.led_brightness / 255) + p[self.led_green_offset] = round(g * self.led_brightness / 255) + p[self.led_blue_offset] = round(b * self.led_brightness / 255) + self.led_original_color[index * 3 + self.led_red_offset] = r + self.led_original_color[index * 3 + self.led_green_offset] = g + self.led_original_color[index * 3 + self.led_blue_offset] = b + for i in range(3): + self.led_color[index * 3 + i] = p[i] + + def set_led_color_data(self, index, r, g, b): + # Set the color data of a specific LED + self.set_ledpixel(index, r, g, b) + + def set_led_rgb_data(self, index, color): + # Set the RGB data of a specific LED + self.set_ledpixel(index, color[0], color[1], color[2]) + + def set_led_color(self, index, r, g, b): + # Set the color of a specific LED and update the display + self.set_ledpixel(index, r, g, b) + self.show() + + def set_led_rgb(self, index, color): + # Set the RGB color of a specific LED and update the display + self.set_led_rgb_data(index, color) + self.show() + + def set_all_led_color_data(self, r, g, b): + # Set the color data of all LEDs + for i in range(self.get_led_count()): + self.set_led_color_data(i, r, g, b) + + def set_all_led_rgb_data(self, color): + # Set the RGB data of all LEDs + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, color) + + def set_all_led_color(self, r, g, b): + # Set the color of all LEDs and update the display + for i in range(self.get_led_count()): + self.set_led_color_data(i, r, g, b) + self.show() + + def set_all_led_rgb(self, color): + # Set the RGB color of all LEDs and update the display + for i in range(self.get_led_count()): + self.set_led_rgb_data(i, color) + self.show() + + def write_ws2812_numpy8(self): + # Convert the color data to a format suitable for WS2812 LEDs + d = numpy.array(self.led_color).ravel() # Convert data into a one-dimensional array + tx = numpy.zeros(len(d) * 8, dtype=numpy.uint8) # Each RGB color has 8 bits, each represented by a uint8 type data + for ibit in range(8): # Convert each bit of data to the data that the SPI will send + tx[7 - ibit::8] = ((d >> ibit) & 1) * 0x78 + 0x80 # T0H=1,T0L=7, T1H=5,T1L=3 #0b11111000 mean T1(0.78125us), 0b10000000 mean T0(0.15625us) + if self.led_init_state != 0: + if self.bus == 0: + self.spi.xfer(tx.tolist(), int(8 / 1.25e-6)) # Send color data at a frequency of 6.4Mhz + else: + self.spi.xfer(tx.tolist(), int(8 / 1.0e-6)) # Send color data at a frequency of 8Mhz + + def write_ws2812_numpy4(self): + # Convert the color data to a format suitable for WS2812 LEDs (4-bit mode) + d = numpy.array(self.led_color).ravel() + tx = numpy.zeros(len(d) * 4, dtype=numpy.uint8) + for ibit in range(4): + tx[3 - ibit::4] = ((d >> (2 * ibit + 1)) & 1) * 0x60 + ((d >> (2 * ibit + 0)) & 1) * 0x06 + 0x88 + if self.led_init_state != 0: + if self.bus == 0: + self.spi.xfer(tx.tolist(), int(4 / 1.25e-6)) + else: + self.spi.xfer(tx.tolist(), int(4 / 1.0e-6)) + + def show(self, mode=1): + # Update the display with the current color data + if mode == 1: + write_ws2812 = self.write_ws2812_numpy8 + else: + write_ws2812 = self.write_ws2812_numpy4 + write_ws2812() + + def wheel(self, pos): + # Generate a color based on the position in the color wheel + if pos < 85: + return [(255 - pos * 3), (pos * 3), 0] + elif pos < 170: + pos = pos - 85 + return [0, (255 - pos * 3), (pos * 3)] + else: + pos = pos - 170 + return [(pos * 3), 0, (255 - pos * 3)] + + def hsv2rgb(self, h, s, v): + # Convert HSV to RGB + h = h % 360 + rgb_max = round(v * 2.55) + rgb_min = round(rgb_max * (100 - s) / 100) + i = round(h / 60) + diff = round(h % 60) + rgb_adj = round((rgb_max - rgb_min) * diff / 60) + if i == 0: + r = rgb_max + g = rgb_min + rgb_adj + b = rgb_min + elif i == 1: + r = rgb_max - rgb_adj + g = rgb_max + b = rgb_min + elif i == 2: + r = rgb_min + g = rgb_max + b = rgb_min + rgb_adj + elif i == 3: + r = rgb_min + g = rgb_max - rgb_adj + b = rgb_max + elif i == 4: + r = rgb_min + rgb_adj + g = rgb_min + b = rgb_max + else: + r = rgb_max + g = rgb_min + b = rgb_max - rgb_adj + return [r, g, b] + +if __name__ == '__main__': + import time + import os + # Print the version of the spidev module + print("spidev version is ", spidev.__version__) + # Print the available SPI devices + print("spidev device as show:") + os.system("ls /dev/spi*") + + # Create an instance of Freenove_SPI_LedPixel with 8 LEDs and maximum brightness + led = Freenove_SPI_LedPixel(8, 255) # Use MOSI for /dev/spidev0 to drive the lights + # Alternative configurations for different SPI buses (commented out) + # led = Freenove_SPI_LedPixel(8, 255, 'GRB', 0) # Use MOSI for /dev/spidev0 to drive the lights + # led = Freenove_SPI_LedPixel(8, 255, 'GRB', 1) # Use MOSI for /dev/spidev1 to drive the lights + try: + if led.check_spi_state() != 0: + # Set the number of LEDs to 8 + led.set_led_count(8) + # Set all LEDs to red + led.set_all_led_color_data(255, 0, 0) + led.show() + time.sleep(0.5) + # Set all LEDs to green + led.set_all_led_rgb_data([0, 255, 0]) + led.show() + time.sleep(0.5) + # Set all LEDs to blue + led.set_all_led_color(0, 0, 255) + time.sleep(0.5) + # Set all LEDs to cyan + led.set_all_led_rgb([0, 255, 255]) + time.sleep(0.5) + + # Set the number of LEDs to 12 + led.set_led_count(12) + # Set all LEDs to yellow + led.set_all_led_color_data(255, 255, 0) + # Fade in the brightness from 0 to 255 + for i in range(255): + led.set_led_brightness(i) + led.show() + time.sleep(0.005) + # Fade out the brightness from 255 to 0 + for i in range(255): + led.set_led_brightness(255 - i) + led.show() + time.sleep(0.005) + + # Set the brightness to 20 + led.set_led_brightness(20) + # Infinite loop to create a color wheel effect + while True: + for j in range(255): + for i in range(led.led_count): + # Set the color of each LED based on the color wheel + led.set_led_rgb_data(i, led.wheel((round(i * 255 / led.led_count) + j) % 256)) + led.show() + time.sleep(0.002) + else: + # Close the SPI connection if initialization failed + led.led_close() + except KeyboardInterrupt: + # Close the SPI connection on keyboard interrupt + led.led_close() + + + + + + + diff --git a/Code/Server_WebUI/tcp_server.py b/Code/Server_WebUI/tcp_server.py new file mode 100644 index 00000000..820ff4ba --- /dev/null +++ b/Code/Server_WebUI/tcp_server.py @@ -0,0 +1,169 @@ +import socket +import select +import threading +import fcntl +import struct +import queue + +class TCPServer: + def __init__(self): + # Initialize server and client sockets + self.server_socket = None + self.client_sockets = {} + # Message queue for incoming messages + self.message_queue = queue.Queue() + # Maximum number of clients allowed + self.max_clients = 1 + # Current number of active connections + self.active_connections = 0 + # Thread for accepting new connections + self.accept_thread = None + # Event to signal the server to stop + self.stop_event = threading.Event() + # Pipe for stopping the server + self.stop_pipe_r, self.stop_pipe_w = socket.socketpair() + self.stop_pipe_r.setblocking(0) + self.stop_pipe_w.setblocking(0) + + def start(self, ip, port, max_clients=1, listen_count=1): + # Set the maximum number of clients + self.max_clients = max_clients + # Create the server socket + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((ip, port)) + self.server_socket.listen(listen_count) + self.server_socket.setblocking(0) + print(f"Server started, listening on {ip}:{port}") + + # Start the thread for accepting connections + self.accept_thread = threading.Thread(target=self.accept_connections, daemon=True) + self.accept_thread.start() + + def accept_connections(self): + # Accept new connections until the server is stopped + while not self.stop_event.is_set(): + # Use select to monitor the server socket and the stop pipe + readable, writable, exceptional = select.select([self.server_socket, self.stop_pipe_r] + list(self.client_sockets.keys()), [], []) + for s in readable: + if s == self.server_socket and self.active_connections < self.max_clients: + # Accept a new connection if the maximum number of clients is not reached + client_socket, client_address = s.accept() + client_socket.setblocking(0) + self.client_sockets[client_socket] = client_address + self.active_connections += 1 + print(f"New connection from {client_address}, {self.active_connections} active connections.") + elif s == self.server_socket and self.active_connections >= self.max_clients: + # Reject new connections if the maximum number of clients is reached + client_socket, client_address = s.accept() + client_socket.close() + print(f"Rejected connection from {client_address}, max connections ({self.max_clients}) reached.") + elif s == self.stop_pipe_r: + # Stop the server if the stop pipe is read + self.stop_event.set() + break + else: + try: + # Receive data from the client + data = s.recv(1024) + if data: + client_address = self.client_sockets[s] + self.message_queue.put((client_address, data.decode('utf-8'))) + else: + # Remove the client if no data is received + client_address = self.client_sockets[s] + print(client_address, "disconnected") + self.remove_client(s) + except OSError as e: + if e.errno == 9 or e.errno == 32: + # Handle broken pipe errors + client_address = self.client_sockets[s] + print(client_address, "disconnected") + self.remove_client(s) + else: + print(f"Unexpected error: {e}") + for s in exceptional: + # Handle exceptional conditions + client_address = self.client_sockets[s] + print(client_address, "disconnected") + self.remove_client(s) + print("Closing accept_connections...") + + def stop_pipe(self): + # Send a byte to the stop pipe to signal the server to stop + self.stop_pipe_w.send(b'\x00') + + def send_to_all_client(self, message): + # Send a message to all connected clients + for client_socket in list(self.client_sockets.keys()): + try: + if isinstance(message, str): + encoded_message = message.encode('utf-8') + else: + encoded_message = message + client_socket.sendall(encoded_message) + except socket.error as e: + print(f"Error sending data to {self.client_sockets[client_socket]}: {e}") + self.remove_client(client_socket) + + def send_to_client(self, client_address, message): + # Send a message to a specific client + for client_socket, addr in self.client_sockets.items(): + if addr == client_address: + try: + if isinstance(message, str): + encoded_message = message.encode('utf-8') + else: + encoded_message = message + client_socket.sendall(encoded_message) + except socket.error as e: + print(f"Error sending data to {client_address}: {e}") + self.remove_client(client_socket) + return + print(f"Client at {client_address} not found.") + + def remove_client(self, client_socket): + # Remove a client from the server + if client_socket in self.client_sockets: + del self.client_sockets[client_socket] + client_socket.close() + self.active_connections -= 1 + + def close(self): + # Close the server and all client connections + self.stop_pipe() + if self.accept_thread is not None: + self.accept_thread.join() + if self.server_socket is not None: + self.server_socket.close() + for s in list(self.client_sockets): + s.close() + self.client_sockets.clear() + print("Server stopped.") + + def get_client_ips(self): + # Get a list of IP addresses of connected clients + return [addr[0] for addr in self.client_sockets.values()] + +def get_interface_ip(): + # Get the IP address of the specified network interface + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', b'wlan0'[:15]))[20:24]) + +if __name__ == "__main__": + server = TCPServer() + ip = get_interface_ip() + port = 12345 + server.start(ip, port) + + try: + while True: + # Process incoming messages + while not server.message_queue.empty(): + client_address, message = server.message_queue.get() + print(f"Received message from {client_address}: {message}") + server.send_to_client(client_address, message) + except KeyboardInterrupt: + print("Server interrupted by user.") + finally: + server.close() \ No newline at end of file diff --git a/Code/Server_WebUI/telemetry_ws.py b/Code/Server_WebUI/telemetry_ws.py new file mode 100644 index 00000000..4be9e30a --- /dev/null +++ b/Code/Server_WebUI/telemetry_ws.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +# telemetry_ws.py +# +# WebSocket server for telemetry, using the `websockets` library. +# +# Requires: +# pip3 install websockets +# +# Listens on TELEMETRY_WS_PORT (default 8090) and serves telemetry on: +# ws://:8090/ (path is ignored, port is dedicated to telemetry) +# +# It periodically reads sensor values from the shared `car` object +# and pushes JSON messages to all connected clients. + +import asyncio +import json +import threading +import time + +import websockets # pip3 install websockets + +TELEMETRY_WS_PORT = 8090 + +# --------------------------------------------------------------------------- +# CONFIG: set this to True to test with dummy data instead of real sensors. +# If the WS becomes stable with dummy data, the issue is in the sensor read +# path (Car/ADC/Sonic/etc). If it's still unstable, it's WS/network-level. +# --------------------------------------------------------------------------- +USE_DUMMY_SNAPSHOT = True + +# Global references set from main.py +_car = None +_started = False + +TELEMETRY_INTERVAL = 0.5 # seconds between telemetry updates (slightly slower) + + +def set_car(car): + """Set the car object used for telemetry snapshots.""" + global _car + _car = car + + +def _get_dummy_snapshot(): + """Return a simple, safe snapshot for debugging the WebSocket itself.""" + now = time.time() + return { + "power": 42, + "voltage": 11.1, + "sonic": 123.4, + "light_left": 1.23, + "light_right": 4.56, + "status": "DUMMY", + "t": round(now, 2), + } + + +def _get_real_snapshot(): + """ + Build a telemetry snapshot from the global `_car`. + + Returns a dict with keys: + power, voltage, sonic, light_left, light_right, status + """ + if _car is None: + raise RuntimeError("Car object not set in telemetry_ws") + + car = _car + + try: + adc = car.adc + sonic = car.sonic + except AttributeError: + # Car object exists but doesn't have the expected attributes + raise RuntimeError("Car object missing adc/sonic attributes") + + # Power / voltage + try: + raw_power = adc.read_adc(2) + factor = 3 if getattr(adc, "pcb_version", 1) == 1 else 2 + voltage = raw_power * factor + except Exception: + voltage = 0.0 + + # Distance + try: + distance = sonic.get_distance() + except Exception: + distance = 0.0 + + # Light sensors + try: + light_left = adc.read_adc(0) + light_right = adc.read_adc(1) + except Exception: + light_left = 0.0 + light_right = 0.0 + + # Crude power percentage estimate (12V nominal) + power_pct = max(0, min(100, int((voltage / 12.0) * 100))) + + return { + "power": power_pct, + "voltage": round(voltage, 2), + "sonic": round(distance, 2), + "light_left": round(light_left, 2), + "light_right": round(light_right, 2), + "status": "OK", + } + + +def _get_telemetry_snapshot(): + """Select dummy or real snapshot depending on USE_DUMMY_SNAPSHOT.""" + if USE_DUMMY_SNAPSHOT: + return _get_dummy_snapshot() + return _get_real_snapshot() + + +# IMPORTANT: your websockets version calls handler(websocket), not (websocket, path). +# We ignore the path entirely and just treat any connection to this port as telemetry. +async def _telemetry_handler(websocket): + """ + WebSocket handler for telemetry. + + Sends periodic JSON telemetry snapshots until the + client disconnects. + """ + path = getattr(websocket, "path", "/") + print(f"[WS] Telemetry client connected (path reported as: {path})") + + try: + while True: + # Be very defensive about sensor errors + try: + snapshot = _get_telemetry_snapshot() + except Exception as e: + # Don't kill the WS connection if hardware glitches + print(f"[WS] Telemetry snapshot error: {e}") + snapshot = {"error": str(e), "status": "ERROR"} + + try: + await websocket.send(json.dumps(snapshot)) + except websockets.ConnectionClosed as e: + # LOG CLOSE CODE + REASON + code = getattr(e, "code", None) + reason = getattr(e, "reason", "") + print(f"[WS] Telemetry client disconnected (code={code}, reason={reason})") + break + except Exception as e: + print(f"[WS] Error sending telemetry: {e}") + break + + await asyncio.sleep(TELEMETRY_INTERVAL) + + except Exception as e: + # This should be rare now + print(f"[WS] Telemetry error (outer): {e}") + + +async def _run_ws_server(host: str, port: int): + """Async entrypoint for the telemetry WebSocket server.""" + print(f"[WS] Starting telemetry WebSocket on ws://{host}:{port}/ (path ignored)") + # Your websockets version calls handler(websocket), so we pass _telemetry_handler + async with websockets.serve(_telemetry_handler, host, port): + await asyncio.Future() # run forever + + +def _ws_thread_entry(host: str, port: int): + """Thread entry: creates and runs its own asyncio loop.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_run_ws_server(host, port)) + finally: + loop.close() + + +def start_telemetry_ws_server(car, host: str = "0.0.0.0", port: int = TELEMETRY_WS_PORT): + """ + Start the telemetry WebSocket server in a background thread. + + Safe to call multiple times; it will only start once. + """ + global _started + if _started: + print("[WS] Telemetry WS server already started, skipping") + return + + set_car(car) + t = threading.Thread(target=_ws_thread_entry, args=(host, port), daemon=True) + t.start() + _started = True + print(f"[WS] Telemetry WS server thread started on port {port}") diff --git a/Code/Server_WebUI/test.py b/Code/Server_WebUI/test.py new file mode 100644 index 00000000..f5d76339 --- /dev/null +++ b/Code/Server_WebUI/test.py @@ -0,0 +1,169 @@ +def test_Led(): + import time + from led import Led + led=Led() + try: + led.ledIndex(0x01, 255, 0, 0) #Red + led.ledIndex(0x02, 255, 125, 0) #orange + led.ledIndex(0x04, 255, 255, 0) #yellow + led.ledIndex(0x08, 0, 255, 0) #green + led.ledIndex(0x10, 0, 255, 255) #cyan-blue + led.ledIndex(0x20, 0, 0, 255) #blue + led.ledIndex(0x40, 128, 0, 128) #purple + led.ledIndex(0x80, 255, 255, 255) #white + print ("The LED has been lit, the color is red orange yellow green cyan-blue blue white") + time.sleep(3) #wait 3s + led.colorBlink(0) #turn off the light + print ("\nEnd of program") + except KeyboardInterrupt: + led.colorBlink(0) #turn off the light + print ("\nEnd of program") + +def test_Motor(): + import time + from motor import Ordinary_Car + PWM = Ordinary_Car() + try: + PWM.set_motor_model(1000,1000,1000,1000) #Forward + print ("The car is moving forward") + time.sleep(1) + PWM.set_motor_model(-1000,-1000,-1000,-1000) #Back + print ("The car is going backwards") + time.sleep(1) + PWM.set_motor_model(-1500,-1500,2000,2000) #Turn left + print ("The car is turning left") + time.sleep(1) + PWM.set_motor_model(2000,2000,-1500,-1500) #Turn right + print ("The car is turning right") + time.sleep(1) + PWM.set_motor_model(0,0,0,0) #Stop + print ("\nEnd of program") + except KeyboardInterrupt: + print ("\nEnd of program") + finally: + PWM.close() # Close the PWM instance + +def test_Ultrasonic(): + import time + from ultrasonic import Ultrasonic + # Initialize the Ultrasonic instance with default pin numbers and max distance + ultrasonic = Ultrasonic() + try: + print("Program is starting ...") + while True: + distance = ultrasonic.get_distance() # Get the distance measurement in centimeters + if distance is not None: + print(f"Ultrasonic distance: {distance}cm") # Print the distance measurement + time.sleep(0.5) # Wait for 0.5 seconds + except KeyboardInterrupt: # Handle keyboard interrupt (Ctrl+C) + ultrasonic.close() + finally: + print("\nEnd of program") # Print an end message + +def test_Infrared(): + from infrared import Infrared + # Create an Infrared object + infrared = Infrared() + try: + # Continuously read and print the combined value of all infrared sensors + while True: + ir1_value = infrared.read_one_infrared(1) + ir2_value = infrared.read_one_infrared(2) + ir3_value = infrared.read_one_infrared(3) + if ir1_value != 1 and ir2_value == 1 and ir3_value != 1: + print ('Middle') + elif ir1_value != 1 and ir2_value != 1 and ir3_value == 1: + print ('Right') + elif ir1_value == 1 and ir2_value != 1 and ir3_value != 1: + print ('Left') + except KeyboardInterrupt: + # Close the Infrared object and print a message when interrupted + infrared.close() + print("\nEnd of program") + +def test_Servo(): + import time + from servo import Servo + servo = Servo() + try: + print ("Program is starting ...") + while True: + for i in range(50,110,1): + servo.set_servo_pwm('0',i) + time.sleep(0.01) + for i in range(110,50,-1): + servo.set_servo_pwm('0',i) + time.sleep(0.01) + for i in range(80,150,1): + servo.set_servo_pwm('1',i) + time.sleep(0.01) + for i in range(150,80,-1): + servo.set_servo_pwm('1',i) + time.sleep(0.01) + except KeyboardInterrupt: + servo.set_servo_pwm('0',90) + servo.set_servo_pwm('1',90) + finally: + print ("\nEnd of program") + +def test_Adc(): + import time + from adc import ADC + adc = ADC() + try: + print ("Program is starting ...") + while True: + Left_IDR = adc.read_adc(0) + print ("The photoresistor voltage on the left is "+str(Left_IDR)+"V") + Right_IDR = adc.read_adc(1) + print ("The photoresistor voltage on the right is "+str(Right_IDR)+"V") + Power = adc.read_adc(2) * (3 if adc.pcb_version == 1 else 2) + print ("The battery voltage is "+str(Power)+"V") + time.sleep(1) + print ('\n') + except KeyboardInterrupt: + print ("\nEnd of program") + +def test_Buzzer(): + import time + from buzzer import Buzzer + buzzer = Buzzer() + try: + print ("Program is starting ...") + buzzer.set_state(True) + time.sleep(1) + print ("1S") + time.sleep(1) + print ("2S") + time.sleep(1) + print ("3S") + buzzer.set_state(False) + except KeyboardInterrupt: + buzzer.set_state(False) + finally: + print ("\nEnd of program") + +# Main program logic follows: +if __name__ == '__main__': + print ('Program is starting ... ') + import sys + if len(sys.argv)<2: + print ("Parameter error: Please assign the device") + exit() + if sys.argv[1] == 'Led': + test_Led() + elif sys.argv[1] == 'Motor': + test_Motor() + elif sys.argv[1] == 'Ultrasonic': + test_Ultrasonic() + elif sys.argv[1] == 'Infrared': + test_Infrared() + elif sys.argv[1] == 'Servo': + test_Servo() + elif sys.argv[1] == 'ADC': + test_Adc() + elif sys.argv[1] == 'Buzzer': + test_Buzzer() + + + diff --git a/Code/Server_WebUI/ultrasonic.py b/Code/Server_WebUI/ultrasonic.py new file mode 100644 index 00000000..b468eb9b --- /dev/null +++ b/Code/Server_WebUI/ultrasonic.py @@ -0,0 +1,49 @@ +from gpiozero import DistanceSensor, PWMSoftwareFallback, DistanceSensorNoEcho +import warnings +import time + +class Ultrasonic: + def __init__(self, trigger_pin: int = 27, echo_pin: int = 22, max_distance: float = 3.0): + # Initialize the Ultrasonic class and set up the distance sensor. + warnings.filterwarnings("ignore", category = DistanceSensorNoEcho) + warnings.filterwarnings("ignore", category = PWMSoftwareFallback) # Ignore PWM software fallback warnings + self.trigger_pin = trigger_pin # Set the trigger pin number + self.echo_pin = echo_pin # Set the echo pin number + self.max_distance = max_distance # Set the maximum distance + self.sensor = DistanceSensor(echo=self.echo_pin, trigger=self.trigger_pin, max_distance=self.max_distance) # Initialize the distance sensor + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def get_distance(self) -> float: + """ + Get the distance measurement from the ultrasonic sensor. + + Returns: + float: The distance measurement in centimeters, rounded to one decimal place. + """ + try: + distance = self.sensor.distance * 100 # Get the distance in centimeters + return round(float(distance), 1) # Return the distance rounded to one decimal place + except RuntimeWarning as e: + print(f"Warning: {e}") + return None + + def close(self): + # Close the distance sensor. + self.sensor.close() # Close the sensor to release resources + +if __name__ == '__main__': + # Initialize the Ultrasonic instance with default pin numbers and max distance + with Ultrasonic() as ultrasonic: + try: + while True: + distance = ultrasonic.get_distance() # Get the distance measurement in centimeters + if distance is not None: + print(f"Ultrasonic distance: {distance}cm") # Print the distance measurement + time.sleep(0.5) # Wait for 0.5 seconds + except KeyboardInterrupt: # Handle keyboard interrupt (Ctrl+C) + print("\nEnd of program") # Print an end message \ No newline at end of file diff --git a/Code/Server_WebUI/webapp/RaspURL.txt b/Code/Server_WebUI/webapp/RaspURL.txt new file mode 100644 index 00000000..123b0275 --- /dev/null +++ b/Code/Server_WebUI/webapp/RaspURL.txt @@ -0,0 +1 @@ +192.164.168.30:8080 diff --git a/Code/Server_WebUI/webapp/app.js b/Code/Server_WebUI/webapp/app.js new file mode 100644 index 00000000..2e020c44 --- /dev/null +++ b/Code/Server_WebUI/webapp/app.js @@ -0,0 +1,544 @@ +// app.js - FreeKap 4WD minimal web UI + smart assist + +const DEFAULT_TELEMETRY = ""; +const DEFAULT_VIDEO = ""; + +const state = { + telemetryUrl: localStorage.getItem("fk_telemetry") || DEFAULT_TELEMETRY, + videoUrl: localStorage.getItem("fk_video") || DEFAULT_VIDEO, + videoConnected: false, + + driveActive: null, // "front", "rear", "left", "right", "stop"… + holdMode: false, // HOLD button toggle + + // Smart assist state + sonicDistance: null, // latest distance in cm + lastBuzzerTime: 0, // ms timestamp + smartStopped: false, // already hard-stopped for obstacle +}; + +const el = {}; + +let lastVideoBase = null; +let telemetryTimer = null; + +console.log("FreeKap minimal UI app.js loaded"); + +// --------------------------------------------------------------------------- +// Boot +// --------------------------------------------------------------------------- + +document.addEventListener("DOMContentLoaded", () => { + cacheElements(); + hydrateEndpoints(); + loadRaspUrlFile() + .catch(() => {}) + .finally(() => { + bindEvents(); + preventContextMenus(); + setVideoFeed(); + startTelemetryPolling(); + }); +}); + +// --------------------------------------------------------------------------- +// Element cache +// --------------------------------------------------------------------------- + +function cacheElements() { + el.telemetryUrl = document.getElementById("telemetry-url"); + el.videoUrl = document.getElementById("video-url"); + el.connectBtn = document.getElementById("connect-btn"); + + el.video = document.getElementById("video-feed"); + el.videoOverlay = document.getElementById("video-overlay"); + el.autoReconnect = document.getElementById("auto-reconnect"); + + el.driveButtons = [...document.querySelectorAll("[data-drive]")]; + el.holdBtn = document.getElementById("hold-btn"); + + el.servoPan = document.getElementById("servo-pan"); + el.servoTilt = document.getElementById("servo-tilt"); + el.servoPanValue = document.getElementById("servo-pan-value"); + el.servoTiltValue = document.getElementById("servo-tilt-value"); + + el.buzzerToggle = document.getElementById("buzzer-toggle"); + el.cmd1Btn = document.getElementById("cmd1-btn"); // RESET / SERVO + el.cmd2Btn = document.getElementById("cmd2-btn"); + + el.telemetryPower = document.getElementById("power"); + el.telemetrySonic = document.getElementById("sonic"); + el.telemetryLight = document.getElementById("light"); + el.status = document.getElementById("status"); + + el.batterySummary = document.getElementById("battery-summary"); + el.apiStatus = document.getElementById("api-status"); + el.wsStatus = document.getElementById("ws-status"); +} + +// --------------------------------------------------------------------------- +// URL helpers +// --------------------------------------------------------------------------- + +function hydrateEndpoints() { + el.telemetryUrl.value = state.telemetryUrl; + el.videoUrl.value = state.videoUrl; +} + +function normalizeBase(input, fallback) { + if (!input) return fallback || state.telemetryUrl; + let url = input.trim(); + if (!/^https?:\/\//i.test(url)) { + url = `http://${url}`; + } + return url.replace(/\/$/, ""); +} + +// --------------------------------------------------------------------------- +// Event binding +// --------------------------------------------------------------------------- + +function bindEvents() { + el.connectBtn.addEventListener("click", () => { + state.telemetryUrl = normalizeBase(el.telemetryUrl.value, state.telemetryUrl); + state.videoUrl = normalizeBase(el.videoUrl.value, state.videoUrl); + localStorage.setItem("fk_telemetry", state.telemetryUrl); + localStorage.setItem("fk_video", state.videoUrl); + setVideoFeed(); + startTelemetryPolling(true); + }); + + el.video.addEventListener("load", () => setVideoState(true)); + el.video.addEventListener("error", () => setVideoState(false)); + + // Drive buttons + el.driveButtons.forEach((btn) => { + const kind = btn.dataset.drive; + + if (kind === "hold") { + btn.addEventListener("click", toggleHoldMode); + } else if (kind === "stop") { + btn.addEventListener("click", () => { + state.driveActive = null; + state.holdMode = false; + state.smartStopped = false; + updateHoldIndicator(false); + handleDrive("stop"); + }); + } else { + // Directional buttons – press to move, release to stop (unless HOLD) + btn.addEventListener("pointerdown", () => handleDrive(kind)); + btn.addEventListener("pointerup", () => { + if (!state.holdMode) { + state.smartStopped = false; + handleDrive("stop"); + } + }); + btn.addEventListener("pointerleave", () => { + if (!state.holdMode) { + state.smartStopped = false; + handleDrive("stop"); + } + }); + } + }); + + const setServoFromSlider = (slider, label, channel) => { + label.textContent = `${slider.value}°`; + sendServo(channel, Number(slider.value)); + }; + + el.servoPan.addEventListener("input", () => + setServoFromSlider(el.servoPan, el.servoPanValue, 0) + ); + el.servoTilt.addEventListener("input", () => + setServoFromSlider(el.servoTilt, el.servoTiltValue, 1) + ); + + // BEEP + el.buzzerToggle.addEventListener("click", () => { + sendJson("/api/buzzer", {}) + .then(() => setStatus("Beep!")) + .catch((err) => console.error("Buzzer error:", err)); + }); + + // RESET / SERVO + el.cmd1Btn.addEventListener("click", () => { + const neutral = 90; + el.servoPan.value = neutral; + el.servoTilt.value = neutral; + el.servoPanValue.textContent = `${neutral}°`; + el.servoTiltValue.textContent = `${neutral}°`; + sendServo(0, neutral); + sendServo(1, neutral); + setStatus("Servos centered"); + }); + + // CMD2 placeholder + el.cmd2Btn.addEventListener("click", () => { + console.log("CMD2 pressed (placeholder)"); + }); +} + +// --------------------------------------------------------------------------- +// Disable context menus / long-press menus on controls +// --------------------------------------------------------------------------- + +function preventContextMenus() { + const selectors = [ + "button", + ".drive-pad", + ".action-buttons", + ".servo-controls", + ".video-servos", + ]; + const blockers = document.querySelectorAll(selectors.join(", ")); + + blockers.forEach((el) => { + el.addEventListener("contextmenu", (e) => { + e.preventDefault(); + }); + el.addEventListener("pointerdown", (e) => { + if (e.button === 2) { + e.preventDefault(); + } + }); + }); +} + +// --------------------------------------------------------------------------- +// Video +// --------------------------------------------------------------------------- + +function setVideoFeed() { + const base = state.videoUrl || state.telemetryUrl; + + if (!base) { + el.videoOverlay.textContent = "No video base URL configured"; + el.videoOverlay.style.display = "grid"; + return; + } + + if (base === lastVideoBase && state.videoConnected) { + // Already streaming from this base + return; + } + + lastVideoBase = base; + const src = `${withPath(base, "/video/mjpeg")}?ts=${Date.now()}`; + el.video.src = src; + el.videoOverlay.textContent = "Connecting…"; + el.videoOverlay.style.display = "grid"; +} + +function setVideoState(ok) { + state.videoConnected = ok; + if (ok) { + el.videoOverlay.textContent = ""; + el.videoOverlay.style.display = "none"; + } else { + el.videoOverlay.textContent = "Stream unavailable"; + el.videoOverlay.style.display = "grid"; + if (el.autoReconnect.checked) { + setTimeout(setVideoFeed, 1500); + } + } +} + +// --------------------------------------------------------------------------- +// Telemetry (HTTP polling) +// --------------------------------------------------------------------------- + +async function fetchTelemetryOnce() { + if (!state.telemetryUrl) return; + try { + const url = withPath(state.telemetryUrl, "/api/telemetry"); + const res = await fetch(url); + if (!res.ok) throw new Error(`status ${res.status}`); + const data = await res.json(); + updateTelemetryFromJson(data); + setBadge(el.wsStatus, "online", "success"); + setBadge(el.apiStatus, "online", "success"); + } catch (err) { + setBadge(el.wsStatus, "offline", "error"); + setBadge(el.apiStatus, "offline", "error"); + } +} + +function startTelemetryPolling(force = false) { + if (telemetryTimer && !force) return; + if (telemetryTimer) { + clearInterval(telemetryTimer); + telemetryTimer = null; + } + setBadge(el.wsStatus, "probing…", "warn"); + setBadge(el.apiStatus, "probing…", "warn"); + telemetryTimer = setInterval(fetchTelemetryOnce, 500); + fetchTelemetryOnce(); +} + +function stopTelemetryPolling() { + if (telemetryTimer) { + clearInterval(telemetryTimer); + telemetryTimer = null; + } + setBadge(el.wsStatus, "offline", "error"); + setBadge(el.apiStatus, "offline", "error"); +} + +// --------------------------------------------------------------------------- +// Telemetry parsing + Smart assist +// --------------------------------------------------------------------------- + +function updateTelemetryFromJson(msg) { + if (msg.power !== undefined) { + const powerText = `${msg.power}%${msg.voltage ? ` (${msg.voltage}V)` : ""}`; + el.telemetryPower.textContent = powerText; + if (el.batterySummary) { + el.batterySummary.textContent = powerText; + } + } + if (msg.sonic !== undefined) { + el.telemetrySonic.textContent = `${msg.sonic} cm`; + state.sonicDistance = Number(msg.sonic); + } + if (msg.light_left !== undefined && msg.light_right !== undefined) { + el.telemetryLight.textContent = `${msg.light_left} / ${msg.light_right} V`; + } + if (msg.status) { + el.status.textContent = msg.status; + } + + // Smart assist logic runs after updating telemetry + maybeSmartAssist(); +} + +// Smart assist: distance-based forward braking +function maybeSmartAssist() { + const d = state.sonicDistance; + + // Only assist when actively driving forward + if (state.driveActive !== "front") { + state.smartStopped = false; + return; + } + if (d == null || isNaN(d) || d <= 0) return; + + // Emergency stop at very close range + if (d <= 15) { + if (!state.smartStopped) { + console.log("[ASSIST] Obstacle ≤15 cm -> STOP"); + state.smartStopped = true; + state.holdMode = false; + updateHoldIndicator(false); + handleDrive("stop"); + maybeSmartBeep(); + } + return; + } + + // From here on, we are not in hard-stop + state.smartStopped = false; + + // Decide speed level based on distance + let factor; + let modeLabel; + + if (d > 80) { + // Far away: let manual command dominate, no override + return; + } else if (d > 50) { + factor = 0.7; + modeLabel = "ASSIST: CAUTION"; + } else if (d > 30) { + factor = 0.4; + modeLabel = "ASSIST: BRAKE"; + } else { + // 30 >= d > 15 + factor = 0.2; + modeLabel = "ASSIST: HARD BRAKE"; + maybeSmartBeep(); + } + + factor = Math.max(0, Math.min(1, factor)); + + const base = drivePayload("front"); + const scaled = { + fl: Math.round(base.fl * factor), + fr: Math.round(base.fr * factor), + bl: Math.round(base.bl * factor), + br: Math.round(base.br * factor), + }; + + sendDrive( + scaled, + `${modeLabel} – FRONT @ ${Math.round(d)} cm (${Math.round(factor * 100)}%)` + ); +} + +// Cooldown beep to avoid spam +function maybeSmartBeep() { + const now = Date.now(); + const intervalMs = 1500; + if (now - state.lastBuzzerTime < intervalMs) return; + state.lastBuzzerTime = now; + + console.log("[ASSIST] Beep: obstacle close"); + sendJson("/api/buzzer", {}) + .then(() => setStatus("Assist: Beep (obstacle close)")) + .catch((err) => console.error("Smart assist buzzer error:", err)); +} + +// --------------------------------------------------------------------------- +// Drive logic +// --------------------------------------------------------------------------- + +function toggleHoldMode() { + state.holdMode = !state.holdMode; + updateHoldIndicator(state.holdMode); +} + +function updateHoldIndicator(on) { + if (!el.holdBtn) return; + if (on) el.holdBtn.classList.add("active"); + else el.holdBtn.classList.remove("active"); +} + +// Single place to send drive commands +function sendDrive(body, statusText) { + return sendJson("/api/drive/mecanum", body) + .then(() => { + if (statusText) setStatus(statusText); + }) + .catch((err) => console.error("Drive error:", err)); +} + +function drivePayload(kind) { + // Base speed (tuned for Freenove; adjust if needed) + const fast = 1500; + const zero = { fl: 0, fr: 0, bl: 0, br: 0 }; + + const mappings = { + front: { fl: fast, fr: fast, bl: fast, br: fast }, + rear: { fl: -fast, fr: -fast, bl: -fast, br: -fast }, + left: { fl: -fast, fr: fast, bl: 0, br: fast }, + right: { fl: fast, fr: -fast, bl: fast, br: 0 }, + diagLeft: { fl: 0, fr: fast, bl: 0, br: 0 }, + diagRight: { fl: fast, fr: 0, bl: 0, br: 0 }, + diagBackLeft: { fl: 0, fr: 0, bl: 0, br: -fast }, + diagBackRight: { fl: 0, fr: 0, bl: -fast, br: 0 }, + stop: zero, + }; + + const key = camel(kind); + return mappings[key] || zero; +} + +function handleDrive(kind) { + const payload = drivePayload(kind); + if (!payload) return; + + if (kind === "stop") { + state.driveActive = null; + } else { + state.driveActive = kind; + } + + sendDrive(payload, `Drive: ${kind}`); +} + +// --------------------------------------------------------------------------- +// Servos +// --------------------------------------------------------------------------- + +function sendServo(channel, angle) { + return sendJson("/api/servo", { channel, angle }) + .then(() => setStatus(`Servo ${channel} → ${angle}`)) + .catch((err) => console.error("Servo error:", err)); +} + +// --------------------------------------------------------------------------- +// HTTP helper +// --------------------------------------------------------------------------- + +async function sendJson(path, body) { + if (!state.telemetryUrl) throw new Error("No telemetry URL set"); + const url = withPath(state.telemetryUrl, path); + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body || {}), + }); + if (!res.ok) { + const text = await res.text(); + throw new Error(text || res.statusText); + } + try { + return await res.json(); + } catch (_) { + return {}; + } +} + +// --------------------------------------------------------------------------- +// UI helpers +// --------------------------------------------------------------------------- + +function setBadge(elm, text, variant) { + if (!elm) return; + elm.textContent = text; + elm.classList.remove("success", "error", "warn"); + if (variant) elm.classList.add(variant); +} + +function setStatus(text) { + if (!el.status) return; + el.status.textContent = text; +} + +// --------------------------------------------------------------------------- +// Misc helpers +// --------------------------------------------------------------------------- + +function camel(text) { + return text.replace(/-([a-z])/g, (_, c) => c.toUpperCase()); +} + +async function loadRaspUrlFile() { + try { + const res = await fetch("RaspURL.txt", { cache: "no-cache" }); + if (!res.ok) return; + const text = await res.text(); + const [telemetryLine, videoLine] = text + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean); + if (telemetryLine) { + state.telemetryUrl = normalizeBase(telemetryLine, DEFAULT_TELEMETRY); + localStorage.setItem("fk_telemetry", state.telemetryUrl); + } + if (videoLine) { + state.videoUrl = normalizeBase(videoLine, state.telemetryUrl || DEFAULT_VIDEO); + localStorage.setItem("fk_video", state.videoUrl); + } + hydrateEndpoints(); + setVideoFeed(); + } catch (err) { + console.warn("Could not read RaspURL.txt:", err.message); + } +} + +function withPath(base, path) { + try { + const url = new URL(base); + const cleanPath = path.startsWith("/") ? path : `/${path}`; + if (url.pathname === "/" || url.pathname === "") { + url.pathname = cleanPath; + } else if (!url.pathname.endsWith(path)) { + url.pathname = `${url.pathname.replace(/\/$/, "")}${cleanPath}`; + } + return url.toString(); + } catch (_) { + return `${base}${path}`; + } +} diff --git a/Code/Server_WebUI/webapp/index.html b/Code/Server_WebUI/webapp/index.html new file mode 100644 index 00000000..237f89a9 --- /dev/null +++ b/Code/Server_WebUI/webapp/index.html @@ -0,0 +1,139 @@ + + + + + + FreeKap 4WD Controller + + + +
+
+ FreeKap logo +
+

FreeKap 4WD

+

Web controller

+
+
+ +
+
+ + +
+
+ + +
+ +
+
+ +
+ +
+
+

Telemetry

+
+
+ Power + +
+
+ Ultrasonic + +
+
+ Light + +
+
+ Status + Idle +
+
+
+ +
+

Drive

+
+ + + + + + +
+
+
+ + +
+
+

Video

+ +
+
+ video feed +
Waiting for stream…
+
+ + +
+

Camera servos

+
+
+ + + 90° +
+
+ + + 90° +
+
+
+
+ + +
+
+

Status

+
+
+ Battery + +
+
+ API + offline +
+
+ Telemetry + offline +
+
+
+ +
+

Actions

+
+ + + +
+
+ + +
+
+
+ + + + + diff --git a/Code/Server_WebUI/webapp/log_errors b/Code/Server_WebUI/webapp/log_errors new file mode 100644 index 00000000..d3f5a12f --- /dev/null +++ b/Code/Server_WebUI/webapp/log_errors @@ -0,0 +1 @@ + diff --git a/Code/Server_WebUI/webapp/logo_Mini.png b/Code/Server_WebUI/webapp/logo_Mini.png new file mode 100644 index 00000000..8c607a0e Binary files /dev/null and b/Code/Server_WebUI/webapp/logo_Mini.png differ diff --git a/Code/Server_WebUI/webapp/styles.css b/Code/Server_WebUI/webapp/styles.css new file mode 100644 index 00000000..d2bf089a --- /dev/null +++ b/Code/Server_WebUI/webapp/styles.css @@ -0,0 +1,612 @@ +:root { + --bg: #020617; + --bg-elevated: #020617; + --panel: #020617; + --panel-soft: #050b16; + --panel-strong: #020617; + --text: #e5e7eb; + --muted: #9ca3af; + --accent: #3b82f6; + --accent-soft: rgba(59, 130, 246, 0.16); + --danger: #ef4444; + --border: #1f2937; + --border-soft: rgba(15, 23, 42, 0.8); + --shadow: 0 18px 45px rgba(0, 0, 0, 0.85); +} + +*, +*::before, +*::after { + box-sizing: border-box; +} + +body { + margin: 0; + min-height: 100vh; + font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; + font-size: 13px; /* unified base size */ + color: var(--text); + background: + radial-gradient(circle at top left, #1e293b 0, transparent 45%), + radial-gradient(circle at top right, #020617 0, #020617 60%), + #020617; + display: flex; + flex-direction: column; +} + +/* -------------------------------------------------------------------------- */ +/* Top bar */ +/* -------------------------------------------------------------------------- */ + +.top-bar { + display: flex; + align-items: center; + justify-content: space-between; + padding: 10px 18px; + background: rgba(9, 14, 24, 0.96); + border-bottom: 1px solid rgba(15, 23, 42, 0.9); + box-shadow: 0 12px 28px rgba(0, 0, 0, 0.75); + backdrop-filter: blur(16px); + position: sticky; + top: 0; + z-index: 20; +} + +.brand { + display: flex; + align-items: center; + gap: 10px; +} + +.brand img { + width: 34px; + height: 34px; + border-radius: 10px; + box-shadow: + 0 0 0 1px rgba(15, 23, 42, 0.95), + 0 0 0 2px rgba(37, 99, 235, 0.45); +} + +.brand h1 { + margin: 0; + font-size: 15px; + letter-spacing: 0.05em; + text-transform: uppercase; +} + +.brand p { + margin: 0; + font-size: 13px; + color: var(--muted); +} + +.connection-panel { + display: flex; + flex-wrap: wrap; + align-items: flex-end; + gap: 10px 14px; +} + +.connection-row { + display: flex; + flex-direction: column; + gap: 3px; +} + +.connection-row label { + font-size: 13px; + color: var(--muted); +} + +.connection-row input { + min-width: 210px; + padding: 6px 10px; + border-radius: 999px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: radial-gradient(circle at top, #020617, #020617); + color: var(--text); + font-size: 13px; +} + +#connect-btn { + padding: 7px 16px; + border-radius: 999px; + border: none; + background: linear-gradient(135deg, #2563eb, #3b82f6); + color: #f9fafb; + font-weight: 600; + font-size: 13px; + cursor: pointer; + box-shadow: 0 10px 24px rgba(37, 99, 235, 0.7); + letter-spacing: 0.06em; + text-transform: uppercase; +} + +#connect-btn:hover { + filter: brightness(1.06); +} + +#connect-btn:active { + transform: translateY(1px); +} + +/* -------------------------------------------------------------------------- */ +/* Main layout */ +/* -------------------------------------------------------------------------- */ + +.switch-layout { + flex: 1; + display: grid; + grid-template-columns: minmax(240px, 1.1fr) minmax(420px, 2.3fr) minmax(260px, 1.2fr); + grid-template-rows: minmax(0, 1fr); + grid-template-areas: "left video right"; + gap: 18px; + padding: 16px 18px 18px; +} + +.pad-left { + grid-area: left; +} + +.video-center { + grid-area: video; +} + +.pad-right { + grid-area: right; +} + +.pad, +.card.video-center { + background: radial-gradient(circle at top, #020617, #020617); + border-radius: 26px; + border: 1px solid var(--border-soft); + box-shadow: var(--shadow); + padding: 12px; + position: relative; + overflow: hidden; +} + +.pad::before, +.card.video-center::before { + content: ""; + position: absolute; + inset: 0; + background: + radial-gradient(circle at top, rgba(148, 163, 184, 0.07), transparent 55%), + radial-gradient(circle at bottom, rgba(15, 23, 42, 0.6), transparent 60%); + mix-blend-mode: soft-light; + pointer-events: none; + opacity: 0.9; +} + +.pad > *, +.card.video-center > * { + position: relative; + z-index: 1; +} + +.pad-title { + margin: 0 0 6px; + font-size: 13px; + letter-spacing: 0.15em; + text-transform: uppercase; + color: #6b7280; +} + +.pad-top, +.pad-middle, +.pad-bottom { + padding: 6px 4px; +} + +.pad-top { + margin-bottom: 4px; +} + +.pad-bottom { + margin-top: 6px; +} + +/* -------------------------------------------------------------------------- */ +/* Center video */ +/* -------------------------------------------------------------------------- */ + +.card.video-center { + display: flex; + flex-direction: column; +} + +.video-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 8px; +} + +.inline-checkbox { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 13px; + color: var(--muted); +} + +.video-frame { + position: relative; + border-radius: 20px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: radial-gradient(circle at center, #020617, #020617); + overflow: hidden; + min-height: 50vh; +} + +#video-feed { + width: 100%; + height: 100%; + object-fit: cover; + display: block; +} + +.overlay { + position: absolute; + inset: 0; + display: grid; + place-items: center; + color: var(--muted); + font-size: 13px; + background: radial-gradient(circle at center, rgba(15, 23, 42, 0.96), rgba(15, 23, 42, 0.96)); +} + +/* Servo section under video */ + +.video-servos { + margin-top: 10px; +} + +.servo-controls { + border-radius: 18px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: linear-gradient(145deg, #020617, #020617); + padding: 8px 10px; + display: grid; + gap: 8px; + font-size: 13px; +} + +.slider-row { + display: grid; + grid-template-columns: 70px 1fr 48px; + gap: 8px; + align-items: center; +} + +.slider-row label { + font-size: 13px; + color: var(--muted); +} + +.slider-row input[type="range"] { + width: 100%; +} + +.slider-row .value { + text-align: right; + font-size: 13px; + color: var(--muted); +} + +/* -------------------------------------------------------------------------- */ +/* Telemetry */ +/* -------------------------------------------------------------------------- */ + +.telemetry-panel { + border-radius: 18px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: linear-gradient(145deg, #020617, #020617); + padding: 10px 11px; + display: grid; + gap: 8px; /* more vertical spacing */ + font-size: 13px; +} + +.telemetry-row { + display: flex; + justify-content: space-between; + gap: 12px; + line-height: 1.4; /* more breathing room */ +} + +.telemetry-label { + color: var(--muted); +} + +/* -------------------------------------------------------------------------- */ +/* Drive pad – wind-rose cross */ +/* -------------------------------------------------------------------------- */ + +.drive-pad { + display: grid; + grid-template-areas: + ". front ." + "left hold right" + ". stop ." + ". rear ."; + grid-template-columns: repeat(3, minmax(0, 1fr)); + grid-auto-rows: 44px; + gap: 10px; + padding: 10px 4px 4px; +} + +.drive-front { grid-area: front; } +.drive-left { grid-area: left; } +.drive-right { grid-area: right; } +.drive-rear { grid-area: rear; } +.drive-hold { grid-area: hold; } +.drive-stop { grid-area: stop; } + +.drive-btn { + border-radius: 999px; + border: 1px solid rgba(31, 41, 55, 0.95); + background: radial-gradient(circle at top, #020617, #020617); + color: var(--text); + font-size: 13px; + font-weight: 600; + letter-spacing: 0.16em; + text-transform: uppercase; + cursor: pointer; + box-shadow: 0 10px 22px rgba(0, 0, 0, 0.85); + transition: + background 0.12s ease, + transform 0.08s ease, + box-shadow 0.12s ease, + border-color 0.12s ease; +} + +/* Directional buttons */ + +.drive-front, +.drive-left, +.drive-right, +.drive-rear { + background: radial-gradient(circle at top, #111827, #020617); +} + +/* Stop / Hold emphasis */ + +.drive-stop { + background: #7f1d1d; + border-color: rgba(248, 113, 113, 0.9); + color: #fee2e2; +} + +.drive-stop:hover { + background: #991b1b; +} + +.drive-hold { + background: #1d4ed8; + border-color: #60a5fa; + color: #e5f0ff; +} + +.drive-btn:hover { + transform: translateY(1px); + box-shadow: 0 6px 16px rgba(0, 0, 0, 0.85); +} + +.drive-btn:active { + transform: translateY(2px); + box-shadow: 0 4px 10px rgba(0, 0, 0, 0.9); +} + +.drive-hold.active { + box-shadow: + 0 0 0 1px rgba(37, 99, 235, 0.9), + 0 0 0 4px var(--accent-soft); +} + +/* -------------------------------------------------------------------------- */ +/* Status (right pad) */ +/* -------------------------------------------------------------------------- */ + +.status-grid { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 8px; + font-size: 13px; +} + +.status-block { + border-radius: 18px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: linear-gradient(145deg, #020617, #020617); + padding: 7px 9px; + display: flex; + flex-direction: column; + gap: 3px; +} + +.status-label { + color: var(--muted); + font-size: 13px; +} + +.status-value { + font-weight: 600; +} + +.badge { + display: inline-flex; + align-items: center; + justify-content: center; + padding: 2px 8px; + border-radius: 999px; + border: 1px solid var(--border); + font-size: 13px; + min-height: 18px; + text-transform: uppercase; + letter-spacing: 0.12em; +} + +.badge.success { + border-color: #4ade80; + color: #bbf7d0; + background: rgba(22, 163, 74, 0.25); +} + +.badge.error { + border-color: #f97373; + color: #fecaca; + background: rgba(153, 27, 27, 0.4); +} + +.badge.warn { + border-color: #fb923c; + color: #fed7aa; + background: rgba(180, 83, 9, 0.35); +} + +/* -------------------------------------------------------------------------- */ +/* Actions (right middle) */ +/* -------------------------------------------------------------------------- */ + +.pad-middle { + margin-top: 6px; +} + +/* Make control text non-selectable and discourage long-press menus */ +button, +.drive-pad, +.action-buttons, +.video-servos, +.servo-controls { + -webkit-user-select: none; /* Safari / iOS */ + -moz-user-select: none; /* Firefox */ + -ms-user-select: none; /* old IE/Edge */ + user-select: none; /* standard */ + -webkit-touch-callout: none; /* iOS long-press menu */ + touch-action: manipulation; /* hint: it's an interactive control */ +} + +.action-buttons { + display: flex; + gap: 8px; + justify-content: center; + margin-top: 6px; +} + +.action-buttons button { + padding: 8px 12px; + border-radius: 999px; + border: 1px solid rgba(31, 41, 55, 0.9); + background: radial-gradient(circle at top, #020617, #020617); + color: var(--text); + font-size: 13px; + font-weight: 600; + cursor: pointer; + min-width: 70px; + box-shadow: 0 8px 18px rgba(0, 0, 0, 0.85); + text-transform: uppercase; + letter-spacing: 0.14em; +} + +.action-buttons button:hover { + transform: translateY(1px); +} + +.action-buttons button:active { + transform: translateY(2px); +} + +.action-buttons #buzzer-toggle { + background: #f97316; + border-color: #fed7aa; + color: #111827; +} + +button.ghost { + background: transparent; + color: var(--muted); +} + +/* -------------------------------------------------------------------------- */ +/* Responsive – tablet & mobile */ +/* -------------------------------------------------------------------------- */ + +/* Tablet / small desktop: video full width on top, pads below side-by-side */ +@media (max-width: 1080px) { + .switch-layout { + grid-template-columns: minmax(0, 1fr) minmax(0, 1fr); + grid-template-rows: auto minmax(0, 1fr); + grid-template-areas: + "video video" + "left right"; + gap: 14px; + padding: 12px 14px 14px; + } + + .video-frame { + min-height: 45vh; + } +} + +/* Phone / portrait tablet: everything stacked in one column */ +@media (max-width: 720px) { + .top-bar { + flex-direction: column; + align-items: flex-start; + gap: 8px; + padding: 8px 12px; + } + + .connection-panel { + width: 100%; + justify-content: flex-start; + } + + .connection-row input { + width: 100%; + min-width: 0; + } + + .switch-layout { + grid-template-columns: 1fr; + grid-template-rows: auto auto auto; + grid-template-areas: + "video" + "left" + "right"; + gap: 12px; + padding: 10px; + } + + .video-frame { + min-height: 38vh; + } + + /* Bigger buttons for thumbs */ + .drive-pad { + grid-auto-rows: 56px; + gap: 12px; + padding: 10px 2px 6px; + } + + .drive-btn { + font-size: 15px; + padding-inline: 16px; + } + + .action-buttons button { + font-size: 14px; + padding: 9px 16px; + min-width: 90px; + } + + .telemetry-panel { + padding: 10px; + gap: 6px; + } +} + +