-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwaste.py
More file actions
191 lines (161 loc) · 7.25 KB
/
waste.py
File metadata and controls
191 lines (161 loc) · 7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse, FileResponse
import os
import numpy as np
from keras.models import load_model
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import tempfile
from PIL import Image
import uvicorn
import asyncio # For async operations
app = FastAPI()
# --- ML Model Loading ---
model_path = os.path.join("model", "tf_model.h5")
try:
model = load_model(model_path)
print("INFO: ML model loaded successfully.")
except Exception as e:
print(f"ERROR: Failed to load ML model: {e}")
model = None # Set model to None if loading fails
# --- Prediction Function ---
def prediction(image_data):
"""
Performs image classification using the loaded ML model.
Expects binary image data as input.
"""
if model is None:
print("WARNING: ML model is not loaded. Cannot perform prediction.")
return np.array([[0.5]]) # Return a neutral value if model isn't loaded
# Use a temporary file to save the image data for PIL/TensorFlow processing
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp_file:
tmp_file.write(image_data)
temp_path = tmp_file.name
try:
# Open, convert to RGB, and resize the image
img = Image.open(temp_path).convert("RGB")
img = img.resize((150, 150))
x = np.array(img) # Convert to NumPy array
# Expand dimensions to match model input shape (batch_size, height, width, channels)
x = np.expand_dims(x, axis=0)
# Normalize pixel values to 0-1 range
x = x / 255.0
print(f"DEBUG: Image prepared for prediction. Shape: {x.shape}")
# Perform the prediction
prediction_result = model.predict(x)
return prediction_result
except Exception as e:
print(f"ERROR: Error during image prediction: {e}")
return np.array([[0.5]]) # Return a neutral value on error
finally:
# Ensure the temporary file is deleted
if os.path.exists(temp_path):
os.remove(temp_path)
# --- WebSocket Client Storage ---
# Dictionary to hold active WebSocket connections for camera and sensor/actuator
websockets = {
"image": None, # For ESP32-CAM
"sensor": None # For Esp32 (ultrasonic sensor and servo control)
}
# --- HTTP Endpoints ---
@app.get("/health")
def health():
"""Health check endpoint for the FastAPI server."""
print("INFO: /health endpoint accessed. Server is running.")
return {"status": "ok"}
@app.get("/latest")
def latest_image():
"""
HTTP endpoint to serve the latest captured image.
This is for web viewing and does not update in real-time automatically.
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
image_path = os.path.join(script_dir, "latest.jpg")
if os.path.exists(image_path):
print(f"INFO: Serving latest image from: {image_path}")
return FileResponse(image_path, media_type="image/jpeg")
else:
print(f"WARNING: No latest image found at: {image_path}")
raise HTTPException(status_code=404, detail="No latest image available.")
# --- WebSocket Endpoints ---
@app.websocket("/ws/image")
async def websocket_image_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for the ESP32-CAM (image client).
Receives binary image data and sends back classification (optional).
"""
await websocket.accept()
websockets["image"] = websocket
print("INFO: ESP32-CAM (image) connected via WebSocket.")
try:
while True:
# Wait for binary image data from the camera
data = await websocket.receive_bytes()
print(f"INFO: Received image data from /ws/image: {len(data)} bytes")
# Save the latest image for web viewing
script_dir = os.path.dirname(os.path.abspath(__file__))
image_path = os.path.join(script_dir, "latest.jpg")
with open(image_path, "wb") as f:
f.write(data)
print("INFO: Latest image saved to latest.jpg")
# Run prediction
result = prediction(data)
print(f"DEBUG: Model Prediction raw output: {result}")
# Determine classification based on model output (assuming single output neuron for binary)
clas = "unknown"
if result[0][0] < 0.5:
print('INFO: Classified as: Biodegradable (0)')
clas = "0"
elif result[0][0] >= 0.5:
print('INFO: Classified as: Non-biodegradable (1)')
clas = "1"
else:
print('WARNING: Prediction result inconclusive. Defaulting to unknown.')
# Send the classification to the Esp32 (sensor/actuator) client
if websockets["sensor"] is not None:
print(f"INFO: Sending classification '{clas}' to Esp32 (/ws/sensor).")
await websockets["sensor"].send_text(clas)
# After classification, send a signal to Esp32 to resume sensing
await websockets["sensor"].send_text("resume_sensing")
print("INFO: Sent 'resume_sensing' signal to Esp32 (/ws/sensor).")
else:
print("WARNING: Esp32 (sensor) client not connected. Cannot send classification or resume signal.")
# Send classification back to the image client (ESP32-CAM) - optional, for its own logging/feedback
await websocket.send_json({"message": clas})
except WebSocketDisconnect:
print("INFO: ESP32-CAM (image) disconnected.")
websockets["image"] = None # Clear the client on disconnect
except Exception as e:
print(f"ERROR: Error in /ws/image websocket: {e}")
websockets["image"] = None # Clear the client on error
@app.websocket("/ws/sensor")
async def websocket_sensor_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for the Esp32 (sensor/actuator client).
Receives object detection, triggers camera, and sends classification/resume signals.
"""
await websocket.accept()
websockets["sensor"] = websocket
print("INFO: Esp32 (sensor) connected via WebSocket.")
try:
while True:
# Wait for text data from the sensor (e.g., "object_detected")
data = await websocket.receive_text()
print(f"INFO: Received from /ws/sensor: {data}")
# If the sensor detects an object, trigger the camera
if data == "object_detected":
if websockets["image"] is not None:
print("INFO: Object detected by sensor. Sending 'capture' signal to ESP32-CAM (/ws/image).")
await websockets["image"].send_text("capture")
else:
print("WARNING: ESP32-CAM (image) not connected. Cannot send 'capture' signal.")
except WebSocketDisconnect:
print("INFO: Node32s (sensor) disconnected.")
websockets["sensor"] = None # Clear the client on disconnect
except Exception as e:
print(f"ERROR: Error in /ws/sensor websocket: {e}")
websockets["sensor"] = None # Clear the client on error
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
#env\Scripts\activate
#python -m waste