-
Notifications
You must be signed in to change notification settings - Fork 0
Action Server and Brain Client
The pipeline uses a single ROS 2 Action (RunVision) as the interface between the robot's brain and the entire perception system. This keeps the robot controller decoupled from the complexity of individual vision nodes.
Brain Client ──── RunVision Action ────▶ Perception Dispatcher
(sends goal) (feedback) (manages lifecycle nodes)
(result)
Located in my_robot_interfaces/action/RunVision.action:
# --- GOAL ---
string task_name # 'car_objects', 'table_height', etc.
string object_class # optional class filter (e.g. 'motor')
float32 time_duration
float32 radius
---
# --- RESULT ---
bool success
string message
geometry_msgs/PoseStamped[] poses # 1 pose (most tasks) or 4 (subdoor)
float32 estimated_value # Used for table height
---
# --- FEEDBACK ---
string current_phase # e.g. "Activating YOLO Node...", "Waiting for detection..."
float32 time_elapsed
# With lazy loading enabled (saves VRAM, slower per task)
ros2 launch perception action_launch.py lazy_loading:=true# Terminal 2 — trigger a task
ros2 run perception brain_client <task_name> [object_class] [radius]# Estimate table height
ros2 run perception brain_client table_height
# Detect screws
ros2 run perception brain_client detect_screws
# Detect car objects (any class)
ros2 run perception brain_client car_objects
# Detect a specific car part class
ros2 run perception brain_client car_objects motor
# Detect the screwdriver
ros2 run perception brain_client detect_screwdriver
# Find a safe placement zone
ros2 run perception brain_client place_object
# Estimate subdoor panel corners
ros2 run perception brain_client subdoor_pose-
Connects to the
run_perception_pipelineaction server -
Sends a goal with
task_name(and optionalobject_class) - Receives feedback messages printed to the terminal as the dispatcher works through lifecycle transitions
-
Receives a result — success/failure, a human-readable message, and optionally raw
PoseStamped[]data - Shuts itself down after result is received
[brain_client_node]: Brain Client initialized. Ready to dispatch tasks.
[brain_client_node]: Waiting for Perception Dispatcher to come online...
[brain_client_node]: Sending goal request for task: 'car_objects', class='motor'
[brain_client_node]: Goal accepted!
[brain_client_node]: Dispatcher Feedback: [Activating Pipeline (car object)...]
[brain_client_node]: Dispatcher Feedback: [Waiting for motor detection...]
[brain_client_node]: Dispatcher Feedback: [Deactivating Pipeline (car object)...]
[brain_client_node]: ✅ VISION TASK SUCCESSFUL
[brain_client_node]: DATA: Successfully detected motor at [x: 0.312, y: -0.045, z: 0.723]
[brain_client_node]: Raw Coordinates: x=0.312, y=-0.045, z=0.723
[brain_client_node]: Task complete. Shutting down Brain Client.
You can trigger any task directly from the terminal using the ROS 2 CLI — no need to run brain_client.py. This is useful for quick testing and debugging.
The full goal structure is:
{task_name: '<task>', object_class: '<class_or_empty>', radius: 0.0}
Add --feedback to stream live phase updates from the dispatcher.
# Table Height
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'table_height'}" \
--feedback
# Screw Detection
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'detect_screws'}" \
--feedback
# Car Object Detection (specific class)
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'car_objects', object_class: 'motor'}" \
--feedback
# Subdoor Pose Estimation
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'subdoor_pose'}" \
--feedback
# Screwdriver Detection
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'detect_screwdriver'}" \
--feedback
# Place Object (find safe placement zone)
ros2 action send_goal /run_perception_pipeline \
my_robot_interfaces/action/RunVision \
"{task_name: 'place_object', object_class: '', radius: 0.7}" \
--feedbackThe dispatcher (action_vision_manager.py → PerceptionDispatcher) handles one task at a time using a ReentrantCallbackGroup and a MultiThreadedExecutor. Each task runs in its own asyncio event loop, allowing it to await lifecycle state transitions without blocking the ROS executor.
Task routing is handled in execute_callback:
if task == 'table_height':
result = await self.handle_table_height(goal_handle)
elif task == 'detect_screws':
result = await self.handle_detect_yolo(goal_handle, ...)
elif task == 'car_objects':
result = await self.handle_detect_yolo(goal_handle, ...)
...