Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Code/Server_WebUI/Thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import threading
import time
import inspect
import ctypes


def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")


def stop_thread(thread):
for i in range(7):
_async_raise(thread.ident, SystemExit)


def test():
while True:
print('-------')
time.sleep(1)


if __name__ == "__main__":
t = threading.Thread(target=test)
t.start()
time.sleep(5)
print("main thread sleep finish")
stop_thread(t)
56 changes: 56 additions & 0 deletions Code/Server_WebUI/adc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import smbus # Import the smbus module for I2C communication
import time # Import the time module for sleep functionality
from parameter import ParameterManager # Import the ParameterManager class from the parameter module

class ADC:
def __init__(self):
"""Initialize the ADC class."""
self.I2C_ADDRESS = 0x48 # Set the I2C address of the ADC
self.ADS7830_COMMAND = 0x84 # Set the command byte for ADS7830
self.parameter_manager = ParameterManager() # Create an instance of ParameterManager
self.pcb_version = self.parameter_manager.get_pcb_version() # Get the PCB version
self.adc_voltage_coefficient = 3.3 if self.pcb_version == 1 else 5.2 # Set the ADC voltage coefficient based on the PCB version
self.i2c_bus = smbus.SMBus(1) # Initialize the I2C bus

def _read_stable_byte(self) -> int:
"""Read a stable byte from the ADC."""
while True:
value1 = self.i2c_bus.read_byte(self.I2C_ADDRESS) # Read the first byte from the ADC
value2 = self.i2c_bus.read_byte(self.I2C_ADDRESS) # Read the second byte from the ADC
if value1 == value2:
return value1 # Return the value if both reads are the same

def read_adc(self, channel: int) -> float:
"""Read the ADC value for the specified channel using ADS7830."""
command_set = self.ADS7830_COMMAND | ((((channel << 2) | (channel >> 1)) & 0x07) << 4) # Calculate the command set for the specified channel
self.i2c_bus.write_byte(self.I2C_ADDRESS, command_set) # Write the command set to the ADC
value = self._read_stable_byte() # Read a stable byte from the ADC
voltage = value / 255.0 * self.adc_voltage_coefficient # Convert the ADC value to voltage
return round(voltage, 2) # Return the voltage rounded to 2 decimal places

def scan_i2c_bus(self) -> None:
"""Scan the I2C bus for connected devices."""
print("Scanning I2C bus...") # Print a message indicating the start of I2C bus scanning
for device in range(128): # Iterate over possible I2C addresses (0 to 127)
try:
self.i2c_bus.read_byte_data(device, 0) # Try to read data from the current device address
print(f"Device found at address: 0x{device:02X}") # Print the address of the found device
except OSError:
pass # Ignore any OSError exceptions

def close_i2c(self) -> None:
"""Close the I2C bus."""
self.i2c_bus.close() # Close the I2C bus

if __name__ == '__main__':
print('Program is starting ... ') # Print a message indicating the start of the program
adc = ADC() # Create an instance of the ADC class
try:
while True:
left_idr = adc.read_adc(0) # Read the left photoresistor value
right_idr = adc.read_adc(1) # Read the right photoresistor value
power = adc.read_adc(2) * (3 if adc.pcb_version == 1 else 2) # Calculate the power value based on the PCB version
print(f"Left IDR: {left_idr}V, Right IDR: {right_idr}V, Power: {power}V") # Print the values of left IDR, right IDR, and power
time.sleep(1) # Wait for 1 second
except KeyboardInterrupt:
adc.close_i2c() # Close the I2C bus when the program is interrupted
28 changes: 28 additions & 0 deletions Code/Server_WebUI/buzzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
from gpiozero import OutputDevice

class Buzzer:
def __init__(self):
"""Initialize the Buzzer class."""
self.PIN = 17 # Set the GPIO pin for the buzzer
self.buzzer_pin = OutputDevice(self.PIN) # Initialize the buzzer pin

def set_state(self, state: bool) -> None:
"""Set the state of the buzzer."""
self.buzzer_pin.on() if state else self.buzzer_pin.off() # Turn on or off the buzzer based on the state

def close(self) -> None:
"""Close the buzzer pin."""
self.buzzer_pin.close() # Close the buzzer pin to release the GPIO resource

if __name__ == '__main__':
print('Program is starting ... ') # Print a message indicating the start of the program
buzzer = Buzzer() # Create an instance of the Buzzer class
try:
for _ in range(3):
buzzer.set_state(True) # Turn on the buzzer
time.sleep(0.1) # Wait for 0.1 second
buzzer.set_state(False) # Turn off the buzzer
time.sleep(0.1) # Wait for 0.1 second
finally:
buzzer.close() # Ensure the buzzer pin is closed when the program is interrupted
120 changes: 120 additions & 0 deletions Code/Server_WebUI/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import time
from picamera2 import Picamera2, Preview
from picamera2.encoders import H264Encoder, JpegEncoder
from picamera2.outputs import FileOutput
from libcamera import Transform
from threading import Condition
import io

class StreamingOutput(io.BufferedIOBase):
def __init__(self):
"""Initialize the StreamingOutput class."""
self.frame = None
self.condition = Condition() # Initialize the condition variable for thread synchronization

def write(self, buf: bytes) -> int:
"""Write a buffer to the frame and notify all waiting threads."""
with self.condition:
self.frame = buf # Update the frame buffer with new data
self.condition.notify_all() # Notify all waiting threads that new data is available
return len(buf)

class Camera:
def __init__(self, preview_size: tuple = (640, 480), hflip: bool = True, vflip: bool = True, stream_size: tuple = (400, 300)):
"""Initialize the Camera class."""
self.camera = Picamera2() # Initialize the Picamera2 object
self.transform = Transform(hflip=1 if hflip else 0, vflip=1 if vflip else 0) # Set the transformation for flipping the image
preview_config = self.camera.create_preview_configuration(main={"size": preview_size}, transform=self.transform) # Create the preview configuration
self.camera.configure(preview_config) # Configure the camera with the preview settings

# Configure video stream
self.stream_size = stream_size # Set the size of the video stream
self.stream_config = self.camera.create_video_configuration(main={"size": stream_size}, transform=self.transform) # Create the video configuration
self.streaming_output = StreamingOutput() # Initialize the streaming output object
self.streaming = False # Initialize the streaming flag

def start_image(self) -> None:
"""Start the camera preview and capture."""
self.camera.start_preview(Preview.QTGL) # Start the camera preview using the QTGL backend
self.camera.start() # Start the camera

def save_image(self, filename: str) -> dict:
"""Capture and save an image to the specified file."""
try:
metadata = self.camera.capture_file(filename) # Capture an image and save it to the specified file
return metadata # Return the metadata of the captured image
except Exception as e:
print(f"Error capturing image: {e}") # Print error message if capturing fails
return None # Return None if capturing fails

def start_stream(self, filename: str = None) -> None:
"""Start the video stream or recording."""
if not self.streaming:
if self.camera.started:
self.camera.stop() # Stop the camera if it is currently running

self.camera.configure(self.stream_config) # Configure the camera with the video stream settings
if filename:
encoder = H264Encoder() # Use H264 encoder for video recording
output = FileOutput(filename) # Set the output file for the recorded video
else:
encoder = JpegEncoder() # Use Jpeg encoder for streaming
output = FileOutput(self.streaming_output) # Set the streaming output object
self.camera.start_recording(encoder, output) # Start recording or streaming
self.streaming = True # Set the streaming flag to True

def stop_stream(self) -> None:
"""Stop the video stream or recording."""
if self.streaming:
try:
self.camera.stop_recording() # Stop the recording or streaming
self.streaming = False # Set the streaming flag to False
except Exception as e:
print(f"Error stopping stream: {e}") # Print error message if stopping fails

def get_frame(self) -> bytes:
"""Get the current frame from the streaming output."""
with self.streaming_output.condition:
self.streaming_output.condition.wait() # Wait for a new frame to be available
return self.streaming_output.frame # Return the current frame

def save_video(self, filename: str, duration: int = 10) -> None:
"""Save a video for the specified duration."""
self.start_stream(filename) # Start the video recording
time.sleep(duration) # Record for the specified duration
self.stop_stream() # Stop the video recording

def close(self) -> None:
"""Close the camera."""
if self.streaming:
self.stop_stream() # Stop the streaming if it is active
self.camera.close() # Close the camera

if __name__ == '__main__':
print('Program is starting ... ') # Print a message indicating the start of the program
camera = Camera() # Create a Camera instance

print("View image...")
camera.start_image() # Start the camera preview
time.sleep(10) # Wait for 10 seconds

print("Capture image...")
camera.save_image(filename="image.jpg") # Capture and save an image
time.sleep(1) # Wait for 1 second

'''
print("Stream video...")
camera.start_stream() # Start the video stream
time.sleep(3) # Stream for 3 seconds

print("Stop video...")
camera.stop_stream() # Stop the video stream
time.sleep(1) # Wait for 1 second

print("Save video...")
camera.save_video("video.h264", duration=3) # Save a video for 3 seconds
time.sleep(1) # Wait for 1 second

print("Close camera...")
camera.close() # Close the camera
'''
Loading