Skip to content

Commit

Permalink
Fix most of the typing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Flova committed Dec 20, 2024
1 parent 9b73d06 commit 0e73b26
Show file tree
Hide file tree
Showing 21 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class BodyBlackboard:
def __init__(self, node: Node, tf_buffer: tf2.Buffer):
def __init__(self, node: Node, tf_buffer: tf2.BufferInterface):
# References
self.node = node
self.tf_buffer = tf_buffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Optional

from bitbots_utils.utils import nobeartype
from rclpy.node import Node

if TYPE_CHECKING:
Expand All @@ -9,6 +10,7 @@
class AbstractBlackboardCapsule:
"""Abstract class for blackboard capsules."""

@nobeartype
def __init__(self, node: Node, blackboard: Optional["BodyBlackboard"] = None):
self._node = node
self._blackboard = blackboard
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_pass_regions(self) -> np.ndarray:
# Smooth obstacle map
return gaussian_filter(costmap, pass_smooth)

def field_2_costmap_coord(self, x: float, y: float) -> Tuple[float, float]:
def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
"""
Converts a field position to the corresponding indices for the costmap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class PathfindingCapsule(AbstractBlackboardCapsule):

def __init__(self, node, blackboard):
super().__init__(node, blackboard)
self.position_threshold: str = self._node.get_parameter("pathfinding_position_threshold").value
self.orientation_threshold: str = self._node.get_parameter("pathfinding_orientation_threshold").value
self.position_threshold: float = self._node.get_parameter("pathfinding_position_threshold").value
self.orientation_threshold: float = self._node.get_parameter("pathfinding_orientation_threshold").value

self.direct_cmd_vel_pub = self._node.create_publisher(Twist, "cmd_vel", 1)
self.pathfinding_pub = self._node.create_publisher(PoseStamped, "goal_pose", 1)
Expand All @@ -48,7 +48,7 @@ def publish(self, msg: PoseStamped):
self.goal = msg
self.pathfinding_pub.publish(msg)

def get_goal(self) -> PoseStamped:
def get_goal(self) -> Optional[PoseStamped]:
"""
Returns the latest goal that was send to the pathfinding.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, node, blackboard):

# Data
# indexed with one to match robot ids
self.team_data: Dict[TeamData] = {}
self.team_data: Dict[int, TeamData] = {}
for i in range(1, 7):
self.team_data[i] = TeamData()
self.times_to_ball = dict()
Expand Down Expand Up @@ -65,8 +65,8 @@ def __init__(self, node, blackboard):
self.action_update: float = 0.0

# Config
self.data_timeout: float = self._node.get_parameter("team_data_timeout").value
self.ball_max_covariance: float = self._node.get_parameter("ball_max_covariance").value
self.data_timeout: float = float(self._node.get_parameter("team_data_timeout").value)
self.ball_max_covariance: float = float(self._node.get_parameter("ball_max_covariance").value)

def is_valid(self, data: TeamData) -> bool:
"""
Expand Down Expand Up @@ -227,7 +227,7 @@ def get_teammate_ball(self) -> Optional[PointStamped]:
team_data_infos = filter(self.is_valid, self.team_data.values())

def is_ball_good_enough(team_data: TeamData) -> bool:
return (
return bool(
team_data.ball_absolute.covariance[0] < self.ball_max_covariance
and team_data.ball_absolute.covariance[7] < self.ball_max_covariance
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, blackboard, dsd, parameters):
self.max_speed = parameters.get("max_speed", 0.4)

# Check if the have a duration
self.duration: Optional[float] = parameters.get("duration", None)
self.duration: Optional[float] = float(parameters.get("duration", None))
self.start_time = self.blackboard.node.get_clock().now()

def perform(self, reevaluate=False):
Expand Down
2 changes: 1 addition & 1 deletion bitbots_misc/bitbots_utils/bitbots_utils/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,5 @@ def compute_imu_orientation_from_world(robot_quat_in_world):
return [rp[0], rp[1], 0], yaw_quat


def quat_from_yaw(yaw: float) -> Quaternion:
def quat_from_yaw(yaw: float | int) -> Quaternion:
return msgify(Quaternion, wxyz2xyzw(euler2quat(yaw, 0, 0, axes="szxy")))
4 changes: 4 additions & 0 deletions bitbots_misc/bitbots_utils/bitbots_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import rclpy
import yaml
from ament_index_python import get_package_share_directory
from beartype import BeartypeConf, BeartypeStrategy, beartype
from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterType as ParameterTypeMsg
from rcl_interfaces.msg import ParameterValue as ParameterValueMsg
from rcl_interfaces.srv import GetParameters, SetParameters
from rclpy.node import Node
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter, parameter_value_to_python

# Create a new @nobeartype decorator disabling type-checking.
nobeartype = beartype(conf=BeartypeConf(strategy=BeartypeStrategy.O0))


def read_urdf(robot_name):
urdf = os.popen(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def __init__(self):
self.key_frames: list[dict] = []
self.author: str = "Unknown"
self.description: str = "Edit me!"
self.last_edited: datetime = datetime.now().isoformat(" ")
self.last_edited: str = datetime.now().isoformat(" ")
self.name: str = "None yet"
self.version: int = 0
self.version: str = "0"


class Recorder:
Expand Down Expand Up @@ -79,14 +79,14 @@ def get_keyframe_index(self, name: str) -> Optional[int]:
return None
return self.get_keyframes().index(key_frame)

def get_meta_data(self) -> tuple[str, int, str, str]:
def get_meta_data(self) -> tuple[str, str, str, str]:
"""
Returns the meta data of the current animation
"""
data = self.current_state
return data.name, data.version, data.author, data.description

def set_meta_data(self, name: str, version: int, author: str, description: str) -> None:
def set_meta_data(self, name: str, version: str, author: str, description: str) -> None:
self.current_state.name = name
self.current_state.version = version
self.current_state.author = author
Expand Down Expand Up @@ -174,7 +174,7 @@ def record(
self.save_step(f"Overriding keyframe at position {seq_pos} with '{frame_name}'", frozen=frozen)
return True

def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
def save_animation(self, path: str, file_name: Optional[str] = None) -> str:
"""Dumps all keyframe data to an animation .json file
:param path: The folder where the file should be saved
Expand Down Expand Up @@ -233,7 +233,7 @@ def load_animation(self, path: str) -> None:
# Set the current state to the loaded animation
self.current_state.key_frames = data["keyframes"]
self.current_state.name = data["name"]
self.current_state.version = data["version"] if "version" in data else 0
self.current_state.version = data["version"] if "version" in data else "0"
self.current_state.last_edited = data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" ")
self.current_state.author = data["author"] if "author" in data else "Unknown"
self.current_state.description = data["description"] if "description" in data else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def connect_gui_callbacks(self) -> None:
self._widget.frameList.key_pressed.connect(self.delete)
self._widget.frameList.itemSelectionChanged.connect(self.frame_select)

def play_walkready(self) -> None:
def play_walkready(self, _) -> None:
"""
Plays the walkready animation on the robot
"""
Expand Down Expand Up @@ -401,7 +401,7 @@ def save(self, new_location: bool = False) -> None:
else:
self._widget.statusBar.showMessage("There is nothing to save!")

def open(self) -> None:
def open(self, _) -> None:
"""
Deletes all current frames and instead loads an animation from a json file
"""
Expand Down Expand Up @@ -664,10 +664,15 @@ def frame_select(self):
current_keyframe_goals = self._recorder.get_keyframe(self._selected_frame)["goals"]

for motor_name, text_field in self._motor_controller_text_fields.items():
# Get the angle from the textfield
angle = text_field.value()
# compare with angles in current keyframe
if not current_keyframe_goals[motor_name] == math.radians(angle):
# Check if the motor active state has changed
currently_active = (
self._motor_switcher_active_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked
)
active_in_current_keyframe = motor_name in current_keyframe_goals
motor_active_changed = currently_active != active_in_current_keyframe
# Check if the motor goal has changed
motor_goal_changed = current_keyframe_goals.get(motor_name, 0) != math.radians(text_field.value())
if motor_active_changed or motor_goal_changed:
unrecorded_changes.append(motor_name)

# warn user about unrecorded changes
Expand All @@ -678,6 +683,10 @@ def frame_select(self):
sure = QMessageBox.question(self._widget, "Sure?", message, QMessageBox.Yes | QMessageBox.No)
# Cancel the open if the user does not want to discard the changes
if sure == QMessageBox.No:
# Update the UI to reselect the current frame (blocking signals to avoid infinite loop)
self._widget.frameList.blockSignals(True)
self._widget.frameList.setCurrentRow(self._recorder.get_keyframe_index(self._selected_frame))
self._widget.frameList.blockSignals(False)
return
# Update state so we have a new selected frame
self._selected_frame = selected_frame_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def picked_up(self) -> bool:
mean_long = np.mean(buffer_long[..., 2])
absolute_diff = abs(mean_long - mean)

return absolute_diff > 1.0
return bool(absolute_diff > 1.0)

def get_imu_yaw(self) -> float:
"""Returns the current yaw of the IMU (this is not an absolute measurement!!! It drifts over time!)"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Controller:
A simple follow the carrot controller which controls the robots command velocity to stay on a given path.
"""

def __init__(self, node: Node, buffer: tf2.Buffer) -> None:
def __init__(self, node: Node, buffer: tf2.BufferInterface) -> None:
self.node = node
self.buffer = buffer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Map:
Costmap that keeps track of obstacles like the ball or other robots.
"""

def __init__(self, node: Node, buffer: tf2.Buffer) -> None:
def __init__(self, node: Node, buffer: tf2.BufferInterface) -> None:
self.node = node
self.buffer = buffer
self.resolution: int = self.node.config.map.resolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Planner:
A simple grid based A* interface
"""

def __init__(self, node: Node, buffer: tf2.Buffer, map: Map) -> None:
def __init__(self, node: Node, buffer: tf2.BufferInterface, map: Map) -> None:
self.node = node
self.buffer = buffer
self.map = map
Expand Down Expand Up @@ -103,7 +103,7 @@ def get_path(self) -> Path | None:


class DummyPlanner(Planner):
def __init__(self, node: Node, buffer: tf2.Buffer, map: Map) -> None:
def __init__(self, node: Node, buffer: tf2.BufferInterface, map: Map) -> None:
super().__init__(node, buffer, map)

def step(self) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_step_cmd_vel_smoothing(snapshot, node, tf2_buffer, config, pose_opponen
assert str(controller.last_cmd_vel) == snapshot


def setup_controller(node: Node, buffer: tf2.Buffer) -> Controller:
def setup_controller(node: Node, buffer: tf2.BufferInterface) -> Controller:
return Controller(node, buffer)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def set_state_defaults(self):
self.cmd_vel: Optional[Twist] = None
self.cmd_vel_time = Time(clock_type=self.node.get_clock().clock_type)
self.ball: Optional[PointStamped] = None
self.ball_velocity: Tuple[float, float, float] = (0, 0, 0)
self.ball_velocity: Tuple[float, float, float] = (0.0, 0.0, 0.0)
self.ball_covariance: List[double] = []
self.strategy: Optional[Strategy] = None
self.strategy_time = Time(clock_type=self.node.get_clock().clock_type)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import math
from typing import Callable, List, Optional, Tuple
from typing import Callable, Optional, Tuple

import numpy as np
import transforms3d
from builtin_interfaces.msg import Time
from game_controller_hl_interfaces.msg import GameState
from geometry_msgs.msg import PointStamped, PoseStamped, PoseWithCovarianceStamped, Quaternion, Twist
from numpy import double
from rclpy.time import Time
from jaxtyping import Float64
from soccer_vision_3d_msgs.msg import Robot, RobotArray

import bitbots_team_communication.robocup_extension_pb2 as Proto # noqa: N812
Expand Down Expand Up @@ -63,7 +64,7 @@ def convert_target_position(target_position: Optional[PoseStamped], message):
def convert_ball_position(
ball_position: Optional[PointStamped],
ball_velocity: Tuple[float, float, float],
ball_covariance: List[double],
ball_covariance: Float64[np.ndarray, "36"],
message,
):
if ball_position is not None and is_still_valid_checker(ball_position.header.stamp):
Expand Down Expand Up @@ -130,8 +131,8 @@ def calculate_time_to_ball(
def convert_time_to_ball(
time_to_ball: Optional[float],
time_to_ball_time: Time,
ball_position: PointStamped,
current_pose: PoseWithCovarianceStamped,
ball_position: Optional[PointStamped],
current_pose: Optional[PoseWithCovarianceStamped],
walking_speed: float,
message: Proto.Message,
):
Expand Down Expand Up @@ -174,7 +175,9 @@ def extract_orientation_yaw_angle(self, quaternion: Quaternion):
def convert_to_euler(self, quaternion: Quaternion):
return transforms3d.euler.quat2euler([quaternion.w, quaternion.x, quaternion.y, quaternion.z])

def convert_to_covariance_matrix(self, covariance_matrix: Proto.fmat3, row_major_covariance: List[double]):
def convert_to_covariance_matrix(
self, covariance_matrix: Proto.fmat3, row_major_covariance: Float64[np.ndarray, "36"]
):
# ROS covariance is row-major 36 x float, while protobuf covariance
# is column-major 9 x float [x, y, θ]
covariance_matrix.x.x = row_major_covariance[0]
Expand Down
8 changes: 5 additions & 3 deletions bitbots_vision/bitbots_vision/vision_modules/ros_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from typing import Union
from typing import TypeAlias, Union

from bitbots_utils.utils import get_parameters_from_other_node
from cv_bridge import CvBridge
Expand Down Expand Up @@ -32,6 +32,8 @@
global own_team_color
own_team_color = None

T_RobotAttributes_Team: TypeAlias = int # Type for RobotAttributes.team


def create_or_update_publisher(
node, old_config, new_config, publisher_object, topic_key, data_class, qos_profile=1, callback_group=None
Expand Down Expand Up @@ -325,7 +327,7 @@ def update_own_team_color(vision_node: Node):
vision_node._logger.debug(f"Own team color is: {own_team_color}")


def get_team_from_robot_color(color: int) -> RobotAttributes.team:
def get_team_from_robot_color(color: int) -> T_RobotAttributes_Team:
"""
Maps the detected robot color to the current team.
If the color is the same as the current team, returns own team, else returns opponent team.
Expand All @@ -343,7 +345,7 @@ def get_team_from_robot_color(color: int) -> RobotAttributes.team:
return RobotAttributes.TEAM_OPPONENT


def get_robot_color_for_team(team: RobotAttributes.team) -> Union[int, None]:
def get_robot_color_for_team(team: T_RobotAttributes_Team) -> Union[int, None]:
"""
Maps team (own, opponent, unknown) to the current robot color.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ def _add_robots_to(self, robot_msgs: list[Robot]) -> None:
robot_msgs.extend(robot_candidate_messages)

@staticmethod
def _create_robot_messages(robot_type: Robot, candidates: list[candidate.Candidate]) -> list[Robot]:
def _create_robot_messages(
robot_type: ros_utils.T_RobotAttributes_Team, candidates: list[candidate.Candidate]
) -> list[Robot]:
return [ros_utils.build_robot_msg(robot_candidate, robot_type) for robot_candidate in candidates]

def _publish_robots_message(self, image_msg: Image, robot_msgs: list[Robot]) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#! /usr/bin/env python3
from typing import Optional, Tuple
from typing import Optional

import numpy as np
import rclpy
Expand Down Expand Up @@ -81,7 +81,7 @@ def reset_ball(self) -> None:
self.ball_state_position = np.zeros(3)
self.ball_state_covariance = np.eye(3) * 1000

def reset_filter_cb(self, req, response) -> Tuple[bool, str]:
def reset_filter_cb(self, _: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
self.logger.info("Resetting bitbots ball filter...")
self.reset_ball()
response.success = True
Expand Down Expand Up @@ -224,8 +224,8 @@ def is_estimate_in_fov(self, header: Header) -> bool:
# Make sure that the transformed pixel is on the sensor and not too close to the border
border_fraction = self.config.filter.covariance.negative_observation.ignore_border
border_px = np.array([self.camera_info.width, self.camera_info.height]) / 2 * border_fraction
in_fov_horizontal = border_px[0] < pixel[0] <= self.camera_info.width - border_px[0]
in_fov_vertical = border_px[1] < pixel[1] <= self.camera_info.height - border_px[1]
in_fov_horizontal = bool(border_px[0] < pixel[0] <= self.camera_info.width - border_px[0])
in_fov_vertical = bool(border_px[1] < pixel[1] <= self.camera_info.height - border_px[1])
return in_fov_horizontal and in_fov_vertical

def filter_step(self) -> None:
Expand Down
Loading

0 comments on commit 0e73b26

Please sign in to comment.