Skip to content

Commit

Permalink
Expand type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
Flova committed Dec 20, 2024
1 parent 0e73b26 commit e435c7d
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, node, blackboard):
self.calc_base_costmap()
self.calc_gradients()

def robot_callback(self, msg: RobotArray):
def robot_callback(self, msg: RobotArray) -> None:
"""
Callback with new robot detections
"""
Expand All @@ -77,7 +77,7 @@ def robot_callback(self, msg: RobotArray):
# Publish debug costmap
self.publish_costmap()

def publish_costmap(self):
def publish_costmap(self) -> None:
"""
Publishes the costmap for rviz
"""
Expand Down Expand Up @@ -153,7 +153,7 @@ def field_2_costmap_coord(self, x: float, y: float) -> Tuple[int, int]:
)
return idx_x, idx_y

def calc_gradients(self):
def calc_gradients(self) -> None:
"""
Recalculates the gradient map based on the current costmap.
"""
Expand Down Expand Up @@ -186,7 +186,7 @@ def cost_at_relative_xy(self, x: float, y: float) -> float:

return self.get_cost_at_field_position(point.point.x, point.point.y)

def calc_base_costmap(self):
def calc_base_costmap(self) -> None:
"""
Builds the base costmap based on the behavior parameters.
This costmap includes a gradient towards the enemy goal and high costs outside the playable area
Expand Down Expand Up @@ -299,7 +299,7 @@ def get_cost_at_field_position(self, x: float, y: float) -> float:
idx_x, idx_y = self.field_2_costmap_coord(x, y)
return self.costmap[idx_x, idx_y]

def get_gradient_direction_at_field_position(self, x: float, y: float):
def get_gradient_direction_at_field_position(self, x: float, y: float) -> float:
"""
Returns the gradient direction at the given position
:param x: Field coordinate in the x direction
Expand All @@ -318,7 +318,9 @@ def get_gradient_direction_at_field_position(self, x: float, y: float):
grad = self.get_gradient_at_field_position(x, y)
return math.atan2(grad[1], grad[0])

def get_cost_of_kick_relative(self, x: float, y: float, direction: float, kick_length: float, angular_range: float):
def get_cost_of_kick_relative(
self, x: float, y: float, direction: float, kick_length: float, angular_range: float
) -> float:
"""
Returns the cost of a kick at the given position and direction in base footprint frame
:param x: Field coordinate in the x direction
Expand Down Expand Up @@ -386,7 +388,7 @@ def get_cost_of_kick(self, x: float, y: float, direction: float, kick_length: fl
# This should contribute way less than the max and should have an impact if the max values are similar in all directions.
return masked_costmap.max() * 0.75 + masked_costmap.min() * 0.25

def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float):
def get_current_cost_of_kick(self, direction: float, kick_length: float, angular_range: float) -> float:
"""
Returns the cost of the kick at the current position
:param direction: The direction of the kick
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from bitbots_utils.utils import get_parameters_from_other_node
from game_controller_hl_interfaces.msg import GameState

Expand All @@ -9,96 +11,86 @@ class GameStatusCapsule(AbstractBlackboardCapsule):

def __init__(self, node, blackboard=None):
super().__init__(node, blackboard)
self.team_id = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.team_id: int = get_parameters_from_other_node(self._node, "parameter_blackboard", ["team_id"])["team_id"]
self.gamestate = GameState()
self.last_update = 0
self.unpenalized_time = 0
self.last_update: float = 0.0
self.unpenalized_time: float = 0.0
self.last_goal_from_us_time = -86400
self.last_goal_time = -86400
self.free_kick_kickoff_team = None

def is_game_state_equals(self, value):
assert value in [
GameState.GAMESTATE_PLAYING,
GameState.GAMESTATE_FINISHED,
GameState.GAMESTATE_INITIAL,
GameState.GAMESTATE_READY,
GameState.GAMESTATE_SET,
]
return value == self.get_gamestate()

def get_gamestate(self):
self.free_kick_kickoff_team: Optional[bool] = None

def get_gamestate(self) -> int:
return self.gamestate.game_state

def get_secondary_state(self):
def get_secondary_state(self) -> int:
return self.gamestate.secondary_state

def get_secondary_state_mode(self):
def get_secondary_state_mode(self) -> int:
return self.gamestate.secondary_state_mode

def get_secondary_team(self):
def get_secondary_team(self) -> int:
return self.gamestate.secondary_state_team

def has_kickoff(self):
def has_kickoff(self) -> bool:
return self.gamestate.has_kick_off

def has_penalty_kick(self):
def has_penalty_kick(self) -> bool:
return (
self.gamestate.secondary_state == GameState.STATE_PENALTYKICK
or self.gamestate.secondary_state == GameState.STATE_PENALTYSHOOT
) and self.gamestate._secondary_state_team == self.team_id

def get_own_goals(self):
def get_our_goals(self) -> int:
return self.gamestate.own_score

def get_opp_goals(self):
def get_opp_goals(self) -> int:
return self.gamestate.rival_score

def get_seconds_since_own_goal(self):
def get_seconds_since_own_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_from_us_time

def get_seconds_since_any_goal(self):
def get_seconds_since_any_goal(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.last_goal_time

def get_seconds_remaining(self):
def get_seconds_remaining(self) -> float:
# Time from the message minus time passed since receiving it
return max(
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0
self.gamestate.seconds_remaining - (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update), 0.0
)

def get_secondary_seconds_remaining(self):
def get_secondary_seconds_remaining(self) -> float:
"""Seconds remaining for things like kickoff"""
# Time from the message minus time passed since receiving it
return max(
self.gamestate.secondary_seconds_remaining
- (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update),
0,
0.0,
)

def get_seconds_since_last_drop_ball(self):
def get_seconds_since_last_drop_ball(self) -> Optional[float]:
"""Returns the seconds since the last drop in"""
if self.gamestate.drop_in_time == -1:
return None
else:
# Time from the message plus seconds passed since receiving it
return self.gamestate.drop_in_time + (self._node.get_clock().now().nanoseconds / 1e9 - self.last_update)

def get_seconds_since_unpenalized(self):
def get_seconds_since_unpenalized(self) -> float:
return self._node.get_clock().now().nanoseconds / 1e9 - self.unpenalized_time

def get_is_penalized(self):
def get_is_penalized(self) -> bool:
return self.gamestate.penalized

def received_gamestate(self):
return self.last_update != 0
def received_gamestate(self) -> bool:
return self.last_update != 0.0

def get_team_id(self):
def get_team_id(self) -> int:
return self.team_id

def get_red_cards(self):
def get_red_cards(self) -> int:
return self.gamestate.team_mates_with_red_card

def gamestate_callback(self, gamestate_msg: GameState):
def gamestate_callback(self, gamestate_msg: GameState) -> None:
if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self._node.get_clock().now().nanoseconds / 1e9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ def __init__(self, node, blackboard):
self.walk_kick_pub = self._node.create_publisher(Bool, "/kick", 1)
# self.connect_dynamic_kick() Do not connect if dynamic_kick is disabled

def walk_kick(self, target: WalkKickTargets):
def walk_kick(self, target: WalkKickTargets) -> None:
"""
Kick the ball while walking
:param target: Target for the walk kick (e.g. left or right foot)
"""
self.walk_kick_pub.publish(Bool(data=target.value))

def connect_dynamic_kick(self):
def connect_dynamic_kick(self) -> None:
topic = self._blackboard.config["dynamic_kick"]["topic"]
self.__action_client = ActionClient(self._node, Kick, topic, callback_group=ReentrantCallbackGroup())
self.__connected = self.__action_client.wait_for_server(self._blackboard.config["dynamic_kick"]["wait_time"])

if not self.__connected:
self._node.get_logger().error(f"No dynamic_kick server running on {topic}")

def dynamic_kick(self, goal: Kick.Goal):
def dynamic_kick(self, goal: Kick.Goal) -> None:
"""
:param goal: Goal to kick to
:type goal: KickGoal
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Literal, Optional, TypeAlias

from bitbots_utils.utils import get_parameters_from_other_node
from rclpy.duration import Duration
Expand All @@ -7,6 +7,15 @@
from bitbots_blackboard.capsules import AbstractBlackboardCapsule
from bitbots_msgs.msg import Audio, HeadMode, RobotControlState

THeadMode: TypeAlias = Literal[
HeadMode.BALL_MODE,
HeadMode.FIELD_FEATURES,
HeadMode.LOOK_FORWARD,
HeadMode.DONT_MOVE,
HeadMode.BALL_MODE_PENALTY,
HeadMode.LOOK_FRONT,
]


class MiscCapsule(AbstractBlackboardCapsule):
"""Capsule for miscellaneous functions."""
Expand All @@ -33,7 +42,10 @@ def __init__(self, node, blackboard):
# ## Tracking Part ##
#####################

def set_head_duty(self, head_duty: int):
def set_head_duty(
self,
head_duty: THeadMode,
) -> None:
head_duty_msg = HeadMode()
head_duty_msg.head_mode = head_duty
self.head_pub.publish(head_duty_msg)
Expand All @@ -42,7 +54,7 @@ def set_head_duty(self, head_duty: int):
# ## Robot state ##
###################

def robot_state_callback(self, msg: RobotControlState):
def robot_state_callback(self, msg: RobotControlState) -> None:
self.robot_control_state = msg

def is_currently_walking(self) -> bool:
Expand All @@ -52,7 +64,7 @@ def is_currently_walking(self) -> bool:
# ## Timer ##
#############

def start_timer(self, timer_name: str, duration_secs: int):
def start_timer(self, timer_name: str, duration_secs: int) -> None:
"""
Starts a timer
:param timer_name: Name of the timer
Expand All @@ -61,7 +73,7 @@ def start_timer(self, timer_name: str, duration_secs: int):
"""
self.timers[timer_name] = self._node.get_clock().now() + Duration(seconds=duration_secs)

def end_timer(self, timer_name: str):
def end_timer(self, timer_name: str) -> None:
"""
Ends a timer
:param timer_name: Name of the timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


# Type of pathfinding goal relative to the ball
class BallGoalType(Enum):
class BallGoalType(str, Enum):
GRADIENT = "gradient"
MAP = "map"
CLOSE = "close"
Expand All @@ -41,7 +41,7 @@ def __init__(self, node, blackboard):
self._node, "bitbots_path_planning", ["controller.orient_to_goal_distance"]
)["controller.orient_to_goal_distance"]

def publish(self, msg: PoseStamped):
def publish(self, msg: PoseStamped) -> None:
"""
Sends a goal to the pathfinding.
"""
Expand All @@ -54,15 +54,15 @@ def get_goal(self) -> Optional[PoseStamped]:
"""
return self.goal

def cancel_goal(self):
def cancel_goal(self) -> None:
"""
This function cancels the current goal of the pathfinding,
which will stop sending cmd_vel messages to the walking.
This does not stop the walking itself.
"""
self.pathfinding_cancel_pub.publish(Empty())

def cmd_vel_cb(self, msg: Twist):
def cmd_vel_cb(self, msg: Twist) -> None:
self.current_cmd_vel = msg

def get_current_cmd_vel(self) -> Twist:
Expand All @@ -73,7 +73,7 @@ def get_current_cmd_vel(self) -> Twist:
"""
return self.current_cmd_vel

def stop_walk(self):
def stop_walk(self) -> None:
"""
This function stops the walking. It does not cancel the current goal of the
pathfinding and the walking will start again if the pathfinding sends a new message.
Expand All @@ -86,7 +86,7 @@ def stop_walk(self):
# Publish the stop command
self.direct_cmd_vel_pub.publish(msg)

def calculate_time_to_ball(self):
def calculate_time_to_ball(self) -> None:
"""
Calculates the time to ball and saves it in the team data capsule.
"""
Expand Down
Loading

0 comments on commit e435c7d

Please sign in to comment.