diff --git a/agents/deepvss/deepvss_atk_vs_gk.py b/agents/deepvss/deepvss_atk_vs_gk.py new file mode 100644 index 00000000..0680ddee --- /dev/null +++ b/agents/deepvss/deepvss_atk_vs_gk.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +import argparse +import ctypes +import math +import os +import sys +import threading +import time +# VSS customization: +import traceback +from importlib.machinery import SourceFileLoader + +import gym +import torch +import torch.multiprocessing as mp +from tensorboardX import SummaryWriter + +import rc_gym +import parameters_dqn +import numpy as np + +# Global variables +writer = None +collected_samples = None + + +def dict_to_str(dct): + dict_str = "" + for key, value in dct.items(): + dict_str += "{}: {}\n".format(key, value) + + return dict_str + + +class NormalizedWrapper(gym.Wrapper): + """ + :param env: (gym.Env) Gym environment that will be wrapped + """ + + def __init__(self, env): + # Call the parent constructor, so we can access self.env later + super(NormalizedWrapper, self).__init__(env) + + assert isinstance(self.env.action_space, + gym.spaces.Box), "This wrapper only works with continuous action space (spaces.Box)" + assert isinstance(self.env.observation_space, + gym.spaces.Box), "This wrapper only works with continuous observation space (spaces.Box)" + + # We modify the wrapper action space, so all actions will lie in [-1, 1] + self.action_space = gym.spaces.Box( + low=-1, high=1, shape=self.env.action_space.shape, dtype=np.float32) + self.observation_space = gym.spaces.Box( + low=-1, high=1, shape=self.env.observation_space.shape, dtype=np.float32) + + def rescale_action(self, scaled_action): + """ + Rescale the action from [-1, 1] to [low, high] + (no need for symmetric action space) + :param scaled_action: (np.ndarray) + :return: (np.ndarray) + """ + return self.env.action_space.low + ( + 0.5 * (scaled_action + 1.0) * (self.env.action_space.high - self.env.action_space.low)) + + def scale_observation(self, observation): + """ + Scale the observation to bounds [-1, 1] + """ + return (2 * ((observation - self.env.observation_space.low) / + (self.env.observation_space.high - self.env.observation_space.low))) - 1 + + def reset(self): + """ + Reset the environment + """ + # Reset the counter + return self.scale_observation(self.env.reset()) + + def step(self, action): + """ + :param action: ([float] or int) Action taken by the agent + :return: (np.ndarray, float, bool, dict) observation, reward, is the episode over?, additional informations + """ + # Rescale action from [-1, 1] to original [low, high] interval + rescaled_action = self.rescale_action(action) + obs, reward, done, info = self.env.step(rescaled_action) + return self.scale_observation(obs), reward, done, info + + +if __name__ == "__main__": + env = None + play_threads = [] + train_process = None + mp.set_start_method('spawn') + collected_samples = mp.Value(ctypes.c_ulonglong, 0) + finish_event = mp.Event() + exp_queue = None + + try: + parser = argparse.ArgumentParser() + parser.add_argument("--cuda", default=False, + action="store_true", help="Enable cuda") + parser.add_argument("--name", default='Lambada', + help="Experiment name") + parser.add_argument("--resume", default=[], action='append', + nargs='?', help="Pre-trained model to be loaded") + parser.add_argument("--exp", default=[], action='append', + nargs='?', help="Load experience buffer") + parser.add_argument("--test", default=False, + action='store_true', help="Test mode") + parser.add_argument("--collected", default=-1, type=int, + help="The starting collected samples: defaults to zero or the value stored with the pretrained model") + parser.add_argument("--processed", default=-1, type=int, + help="The starting processed samples: defaults to zero or the value stored with the pretrained model") + parser.add_argument("--real", default=False, + action="store_true", help='set env to vss_real_soccer') + parser.add_argument("--params", default=None, + help="Path to a python parameters file") + + args = parser.parse_args() + + if args.real: + args.simulator = 'real' + + model_params = parameters_dqn.MODEL_HYPERPARAMS + + if args.params is not None: + imported_parameters = SourceFileLoader( + "module.name", args.params).load_module() + model_params = imported_parameters.MODEL_HYPERPARAMS + + run_name = str(args.name) + + if args.test: + model_params['epsilon_high_start'] = model_params['epsilon_high_final'] = model_params['epsilon_low'] = 0.02 + + sys.stdout.flush() + + env = gym.make('VSSAtkVsGk-v0') + state_shape, action_shape = env.observation_space, env.action_space + model_params['state_shape'] = state_shape + model_params['action_shape'] = action_shape + + print(f' === Running: <{run_name}> experiment ===') + print(f'Model params:\n{dict_to_str(model_params)}\n') + + if model_params['agent'] == 'DDPG': + from agents.agentDDPG import train, play, create_actor_model, load_actor_model + elif model_params['agent'] == 'SAC': + from agents.agentSAC import train, play, create_actor_model, load_actor_model + + device = torch.device("cuda" if args.cuda else "cpu") + net = create_actor_model(model_params, state_shape, + action_shape, device) + + checkpoint = {} + + if len(args.resume) > 0: # load checkpoint + args.resume = args.resume[0] + load = True + if args.resume is None: + print("Looking for default pth file") + args.resume = "model/" + run_name + ".pth" + load = os.path.isfile(args.resume) + if not load: + print('File not found:"%s" (nothing to resume)' % + args.resume) + + if load: + print("=> loading checkpoint '%s'" % args.resume) + checkpoint = torch.load(args.resume, map_location=device) + net = load_actor_model(net, checkpoint) + + if args.test: + checkpoint['collected_samples'] = checkpoint['processed_samples'] = 0 + + if args.processed >= 0: # override values in checkpoint + checkpoint['processed_samples'] = args.processed + + if args.collected >= 0: # override values in checkpoint + checkpoint['collected_samples'] = args.collected + + if 'collected_samples' in checkpoint: + collected_samples.value = checkpoint['collected_samples'] + + print("=> collected samples: %d, processed_samples: %d" % + (collected_samples.value, checkpoint['processed_samples'])) + + if len(args.exp) > 0: # load experience buffer + checkpoint['exp'] = args.exp[0] + + writer_path = model_params['data_path'] + \ + '/logs/' + run_name + writer = SummaryWriter(log_dir=writer_path+"/agents", comment="-agent") + + queue_size = model_params['batch_size'] + exp_queue = mp.Queue(maxsize=queue_size) + + torch.set_num_threads(20) + print("Threads available: %d" % torch.get_num_threads()) + + th_a = threading.Thread(target=play, args=( + model_params, net, device, exp_queue, env, args.test, writer, collected_samples, finish_event)) + play_threads.append(th_a) + th_a.start() + + if args.test: + while True: + time.sleep(0.01) + + else: # crate train process: + model_params['run_name'] = run_name + model_params['writer_path'] = writer_path + model_params['action_format'] = '2f' + model_params['state_format'] = f"{state_shape.shape[0]}f" + net.share_memory() + train_process = mp.Process(target=train, args=( + model_params, net, device, exp_queue, finish_event, checkpoint)) + train_process.start() + train_process.join() + print("Train process joined.") + + except KeyboardInterrupt: + print("...Finishing...") + finish_event.set() + + except Exception: + print("!!! Exception caught on main !!!") + print(traceback.format_exc()) + + finally: + + finish_event.set() + if exp_queue: + while exp_queue.qsize() > 0: + exp_queue.get() + + print('queue is empty') + + if train_process is not None: + train_process.join() + + print("Waiting for threads to finish...") + play_threads = [t.join(1) + for t in play_threads if t is not None and t.isAlive()] + + env.close() + sys.exit(1) diff --git a/envs/rc_gym/__init__.py b/envs/rc_gym/__init__.py index bbe8f197..714e3ce4 100644 --- a/envs/rc_gym/__init__.py +++ b/envs/rc_gym/__init__.py @@ -19,3 +19,7 @@ register(id='VSS3v3FIRA-v0', entry_point='rc_gym.vss.env_3v3:VSS3v3FIRAEnv' ) + +register(id='VSSAtkVsGk-v0', + entry_point='rc_gym.vss.env_atk_vs_gk:VSSAtkVsGkEnv' + ) diff --git a/envs/rc_gym/vss/env_atk_vs_gk/__init__.py b/envs/rc_gym/vss/env_atk_vs_gk/__init__.py new file mode 100644 index 00000000..72f12e34 --- /dev/null +++ b/envs/rc_gym/vss/env_atk_vs_gk/__init__.py @@ -0,0 +1 @@ +from rc_gym.vss.env_atk_vs_gk.vss_atk_vs_gk import VSSAtkVsGkEnv diff --git a/envs/rc_gym/vss/env_atk_vs_gk/goalkeeper/models.py b/envs/rc_gym/vss/env_atk_vs_gk/goalkeeper/models.py new file mode 100644 index 00000000..e7581c8e --- /dev/null +++ b/envs/rc_gym/vss/env_atk_vs_gk/goalkeeper/models.py @@ -0,0 +1,81 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.distributions import Normal + +LOG_SIG_MAX = 2 +LOG_SIG_MIN = -20 +epsilon = 1e-6 + + +def weights_init_(m): + if isinstance(m, nn.Linear): + nn.init.xavier_uniform_(m.weight, gain=1) + nn.init.constant_(m.bias, 0) + + +class DDPGActor(nn.Module): + def __init__(self, obs_size, act_size): + super(DDPGActor, self).__init__() + self.net = nn.Sequential( + nn.Linear(obs_size, 400), + nn.ReLU(), + nn.Linear(400, 300), + nn.ReLU(), + nn.Linear(300, act_size), + nn.Tanh() + ) + + def forward(self, x): + return self.net(x) + + def get_action(self, state): + x = torch.FloatTensor(state) + action = self.forward(x) + action = action.cpu().detach().numpy() + return action + + +class GaussianPolicy(nn.Module): + def __init__(self, num_inputs, num_actions, hidden_dim=256, action_space=None): + super(GaussianPolicy, self).__init__() + + self.linear1 = nn.Linear(num_inputs, hidden_dim) + self.linear2 = nn.Linear(hidden_dim, hidden_dim) + + self.mean_linear = nn.Linear(hidden_dim, num_actions) + self.log_std_linear = nn.Linear(hidden_dim, num_actions) + + self.apply(weights_init_) + + # action rescaling + if action_space is None: + self.action_scale = torch.tensor(1.) + self.action_bias = torch.tensor(0.) + else: + self.action_scale = torch.FloatTensor( + (action_space.high - action_space.low) / 2.) + self.action_bias = torch.FloatTensor( + (action_space.high + action_space.low) / 2.) + + def _forward(self, state): + x = F.relu(self.linear1(state)) + x = F.relu(self.linear2(x)) + mean = self.mean_linear(x) + log_std = self.log_std_linear(x) + log_std = torch.clamp(log_std, min=LOG_SIG_MIN, max=LOG_SIG_MAX) + return mean, log_std + + def forward(self, state): + mean, log_std = self._forward(state) + std = log_std.exp() + normal = Normal(mean, std) + x_t = normal.rsample() # for reparameterization trick (mean + std * N(0,1)) + y_t = torch.tanh(x_t) + action = y_t * self.action_scale + self.action_bias + return action + + def to(self, device): + self.action_scale = self.action_scale.to(device) + self.action_bias = self.action_bias.to(device) + return super(GaussianPolicy, self).to(device) diff --git a/envs/rc_gym/vss/env_atk_vs_gk/vss_atk_vs_gk.py b/envs/rc_gym/vss/env_atk_vs_gk/vss_atk_vs_gk.py new file mode 100644 index 00000000..87a70a3f --- /dev/null +++ b/envs/rc_gym/vss/env_atk_vs_gk/vss_atk_vs_gk.py @@ -0,0 +1,393 @@ +import math +import random +import torch +import os +from rc_gym.Utils.Utils import OrnsteinUhlenbeckAction +from typing import Dict + +import gym +import numpy as np +from rc_gym.Entities import Frame, Robot +from rc_gym.vss.vss_gym_base import VSSBaseEnv +from rc_gym.vss.env_atk_vs_gk.goalkeeper.models import DDPGActor + + +class VSSAtkVsGkEnv(VSSBaseEnv): + """This environment controls a single robot in a VSS soccer League 3v3 match + + + Description: + Observation: + Type: Box(40) + Normalized Bounds to [-1.25, 1.25] + Num Observation normalized + 0 Ball X + 1 Ball Y + 2 Ball Vx + 3 Ball Vy + 4 + (7 * i) id i Blue Robot X + 5 + (7 * i) id i Blue Robot Y + 6 + (7 * i) id i Blue Robot sin(theta) + 7 + (7 * i) id i Blue Robot cos(theta) + 8 + (7 * i) id i Blue Robot Vx + 9 + (7 * i) id i Blue Robot Vy + 10 + (7 * i) id i Blue Robot v_theta + 25 + (5 * i) id i Yellow Robot X + 26 + (5 * i) id i Yellow Robot Y + 27 + (5 * i) id i Yellow Robot Vx + 28 + (5 * i) id i Yellow Robot Vy + 29 + (5 * i) id i Yellow Robot v_theta + Actions: + Type: Box(2, ) + Num Action + 0 id 0 Blue Left Wheel Speed (%) + 1 id 0 Blue Right Wheel Speed (%) + Reward: + Sum of Rewards: + Goal + Ball Potential Gradient + Move to Ball + Energy Penalty + Starting State: + Randomized Robots and Ball initial Position + Episode Termination: + 5 minutes match time + """ + + def __init__(self): + super().__init__(field_type=0, n_robots_blue=3, n_robots_yellow=3, + time_step=0.032) + + self.action_space = gym.spaces.Box(low=-1, high=1, + shape=(2, ), dtype=np.float32) + self.observation_space = gym.spaces.Box(low=-self.NORM_BOUNDS, + high=self.NORM_BOUNDS, + shape=(40, ), dtype=np.float32) + + # Initialize Class Atributes + self.previous_ball_potential = None + self.actions: Dict = None + self.reward_shaping_total = None + self.v_wheel_deadzone = 0.05 + self.goalkeeper = None + + self.ou_actions = [] + for i in range(self.n_robots_blue + self.n_robots_yellow): + self.ou_actions.append( + OrnsteinUhlenbeckAction(self.action_space, dt=self.time_step) + ) + + self.load_gk() + print('Environment initialized') + + def reset(self): + self.actions = None + self.reward_shaping_total = None + self.previous_ball_potential = None + for ou in self.ou_actions: + ou.reset() + + return super().reset() + + def step(self, action): + observation, reward, done, _ = super().step(action) + return observation, reward, done, self.reward_shaping_total + + def load_gk(self): + device = torch.device('cuda') + gk_path = os.path.dirname(os.path.realpath( + __file__)) + '/goalkeeper/gk_model.pth' + self.goalkeeper = DDPGActor(40, 2) + print(gk_path) + gk_checkpoint = torch.load(gk_path, map_location=device) + # print("-------> type: ", type(gk_checkpoint)) + if 'state_dict_act' in gk_checkpoint: + self.goalkeeper.load_state_dict(gk_checkpoint['state_dict_act']) + else: + self.goalkeeper.load_state_dict(gk_checkpoint) + self.goalkeeper.eval() + + def _gk_obs(self): + observation = [] + observation.append(self.norm_pos(-self.frame.ball.x)) + observation.append(self.norm_pos(self.frame.ball.y)) + observation.append(self.norm_v(-self.frame.ball.v_x)) + observation.append(self.norm_v(self.frame.ball.v_y)) + + # we reflect the side that the attacker is attacking, + # so that he will attack towards the goal where the goalkeeper is + for i in range(self.n_robots_yellow): + observation.append(self.norm_pos(-self.frame.robots_yellow[i].x)) + observation.append(self.norm_pos(self.frame.robots_yellow[i].y)) + observation.append( + np.sin(np.deg2rad(self.frame.robots_yellow[i].theta)) + ) + observation.append( + -np.cos(np.deg2rad(self.frame.robots_yellow[i].theta)) + ) + observation.append(self.norm_v(-self.frame.robots_yellow[i].v_x)) + observation.append(self.norm_v(self.frame.robots_yellow[i].v_y)) + observation.append(self.norm_w(-self.frame.robots_yellow[i].v_theta)) + + for i in range(self.n_robots_blue): + observation.append(self.norm_pos(-self.frame.robots_blue[i].x)) + observation.append(self.norm_pos(self.frame.robots_blue[i].y)) + observation.append(self.norm_v(-self.frame.robots_blue[i].v_x)) + observation.append(self.norm_v(self.frame.robots_blue[i].v_y)) + observation.append(self.norm_w(-self.frame.robots_blue[i].v_theta)) + + return np.array(observation) + + def _frame_to_observations(self): + + observation = [] + + observation.append(self.norm_pos(self.frame.ball.x)) + observation.append(self.norm_pos(self.frame.ball.y)) + observation.append(self.norm_v(self.frame.ball.v_x)) + observation.append(self.norm_v(self.frame.ball.v_y)) + + for i in range(self.n_robots_blue): + observation.append(self.norm_pos(self.frame.robots_blue[i].x)) + observation.append(self.norm_pos(self.frame.robots_blue[i].y)) + observation.append( + np.sin(np.deg2rad(self.frame.robots_blue[i].theta)) + ) + observation.append( + np.cos(np.deg2rad(self.frame.robots_blue[i].theta)) + ) + observation.append(self.norm_v(self.frame.robots_blue[i].v_x)) + observation.append(self.norm_v(self.frame.robots_blue[i].v_y)) + observation.append(self.norm_w(self.frame.robots_blue[i].v_theta)) + + for i in range(self.n_robots_yellow): + observation.append(self.norm_pos(self.frame.robots_yellow[i].x)) + observation.append(self.norm_pos(self.frame.robots_yellow[i].y)) + observation.append(self.norm_v(self.frame.robots_yellow[i].v_x)) + observation.append(self.norm_v(self.frame.robots_yellow[i].v_y)) + observation.append( + self.norm_w(self.frame.robots_yellow[i].v_theta) + ) + + return np.array(observation) + + def _get_commands(self, actions): + commands = [] + self.actions = {} + + self.actions[0] = actions + v_wheel1, v_wheel2 = self._actions_to_v_wheels(actions) + commands.append(Robot(yellow=False, id=0, v_wheel1=v_wheel1, + v_wheel2=v_wheel2)) + + # Send random commands to the other robots + for i in range(1, self.n_robots_blue): + actions = self.ou_actions[i].sample() + self.actions[i] = actions + v_wheel1, v_wheel2 = self._actions_to_v_wheels(actions) + commands.append(Robot(yellow=False, id=i, v_wheel1=v_wheel1, + v_wheel2=v_wheel2)) + + gk_action = self.goalkeeper.get_action(self._gk_obs()) + # we invert the speed on the wheels because of the goalkeeper's reflection on the Y axis. + commands.append(Robot(yellow=True, id=0, v_wheel1=gk_action[1], + v_wheel2=gk_action[0])) + for i in range(1, self.n_robots_yellow): + actions = self.ou_actions[self.n_robots_blue+i].sample() + v_wheel1, v_wheel2 = self._actions_to_v_wheels(actions) + commands.append(Robot(yellow=True, id=i, v_wheel1=v_wheel1, + v_wheel2=v_wheel2)) + + return commands + + def __ball_grad(self): + '''Calculate ball potential gradient + Difference of potential of the ball in time_step seconds. + ''' + # Calculate ball potential + length_cm = self.field_params['field_length'] * 100 + half_lenght = (self.field_params['field_length'] / 2.0)\ + + self.field_params['goal_depth'] + + # distance to defence + dx_d = (half_lenght + self.frame.ball.x) * 100 + # distance to attack + dx_a = (half_lenght - self.frame.ball.x) * 100 + dy = (self.frame.ball.y) * 100 + + dist_1 = -math.sqrt(dx_a ** 2 + 2 * dy ** 2) + dist_2 = math.sqrt(dx_d ** 2 + 2 * dy ** 2) + ball_potential = ((dist_1 + dist_2) / length_cm - 1) / 2 + + grad_ball_potential = 0 + # Calculate ball potential gradient + # = actual_potential - previous_potential + if self.previous_ball_potential is not None: + diff = ball_potential - self.previous_ball_potential + grad_ball_potential = np.clip(diff * 3 / self.time_step, + -5.0, 5.0) + + self.previous_ball_potential = ball_potential + + return grad_ball_potential + + def __move_reward(self): + '''Calculate Move to ball reward + + Cosine between the robot vel vector and the vector robot -> ball. + This indicates rather the robot is moving towards the ball or not. + ''' + + ball = np.array([self.frame.ball.x, self.frame.ball.y]) + robot = np.array([self.frame.robots_blue[0].x, + self.frame.robots_blue[0].y]) + robot_vel = np.array([self.frame.robots_blue[0].v_x, + self.frame.robots_blue[0].v_y]) + robot_ball = ball - robot + robot_ball = robot_ball/np.linalg.norm(robot_ball) + + move_reward = np.dot(robot_ball, robot_vel) + + move_reward = np.clip(move_reward / 0.4, -5.0, 5.0) + return move_reward + + def __energy_penalty(self): + '''Calculates the energy penalty''' + + en_penalty_1 = abs(self.sent_commands[0].v_wheel1) + en_penalty_2 = abs(self.sent_commands[0].v_wheel2) + energy_penalty = - (en_penalty_1 + en_penalty_2) + energy_penalty /= self.rsim.robot_wheel_radius + return energy_penalty + + def _calculate_reward_and_done(self): + reward = 0 + goal = False + w_move = 0.2 + w_ball_grad = 0.8 + w_energy = 2e-4 + if self.reward_shaping_total is None: + self.reward_shaping_total = {'goal_score': 0, 'move': 0, + 'ball_grad': 0, 'energy': 0, + 'goals_blue': 0, 'goals_yellow': 0} + + # Check if goal ocurred + if self.frame.ball.x > (self.field_params['field_length'] / 2): + self.reward_shaping_total['goal_score'] += 1 + self.reward_shaping_total['goals_blue'] += 1 + reward = 10 + goal = True + elif self.frame.ball.x < -(self.field_params['field_length'] / 2): + self.reward_shaping_total['goal_score'] -= 1 + self.reward_shaping_total['goals_yellow'] += 1 + reward = -10 + goal = True + else: + + if self.last_frame is not None: + # Calculate ball potential + grad_ball_potential = self.__ball_grad() + # Calculate Move ball + move_reward = self.__move_reward() + # Calculate Energy penalty + energy_penalty = self.__energy_penalty() + + reward = w_move * move_reward + \ + w_ball_grad * grad_ball_potential + \ + w_energy * energy_penalty + + self.reward_shaping_total['move'] += w_move * move_reward + self.reward_shaping_total['ball_grad'] += w_ball_grad \ + * grad_ball_potential + self.reward_shaping_total['energy'] += w_energy \ + * energy_penalty + + if goal: + initial_pos_frame: Frame = self._get_initial_positions_frame() + self.rsim.reset(initial_pos_frame) + self.frame = self.rsim.get_frame() + self.previous_ball_potential = None + self.last_frame = None + + done = self.steps * self.time_step >= 300 + + return reward, done + + def _get_initial_positions_frame(self): + """ + Goalie starts at the center of the goal, Attaker and ball randomly. + Other robots also starts at random positions. + """ + field_half_length = self.field_params['field_length'] / 2 + field_half_width = self.field_params['field_width'] / 2 + + def x(): return random.uniform(-field_half_length + 0.1, + field_half_length - 0.1) + + def y(): return random.uniform(-field_half_width + 0.1, + field_half_width - 0.1) + + def theta(): return random.uniform(-180, 180) + + pos_frame: Frame = Frame() + + pos_frame.ball.x = x() + pos_frame.ball.y = y() + pos_frame.ball.v_x = 0. + pos_frame.ball.v_y = 0. + + agents = [] + for i in range(self.n_robots_blue): + pos_frame.robots_blue[i] = Robot(x=x(), y=y(), theta=theta()) + agents.append(pos_frame.robots_blue[i]) + + pos_frame.robots_yellow[0] = Robot(x= field_half_length - 0.05, + y=0., + theta=0) + agents.append(pos_frame.robots_yellow[0]) + for i in range(1, self.n_robots_yellow): + pos_frame.robots_yellow[i] = Robot(x=x(), y=y(), theta=theta()) + agents.append(pos_frame.robots_yellow[i]) + + def same_position_ref(obj, ref, radius): + if obj.x >= ref.x - radius and obj.x <= ref.x + radius and \ + obj.y >= ref.y - radius and obj.y <= ref.y + radius: + return True + return False + + radius_ball = 0.04 + radius_robot = 0.07 + + for i in range(len(agents)): + while same_position_ref(agents[i], pos_frame.ball, radius_ball): + agents[i] = Robot(x=x(), y=y(), theta=theta()) + for j in range(i): + while same_position_ref(agents[i], agents[j], radius_robot): + agents[i] = Robot(x=x(), y=y(), theta=theta()) + + for i in range(self.n_robots_blue): + pos_frame.robots_blue[i] = agents[i] + + for i in range(self.n_robots_yellow): + pos_frame.robots_yellow[i] = agents[i+self.n_robots_blue] + + return pos_frame + + def _actions_to_v_wheels(self, actions): + left_wheel_speed = actions[0] * self.rsim.linear_speed_range + right_wheel_speed = actions[1] * self.rsim.linear_speed_range + + left_wheel_speed, right_wheel_speed = np.clip( + (left_wheel_speed, right_wheel_speed), + -self.rsim.linear_speed_range, + self.rsim.linear_speed_range + ) + + # Deadzone + if -self.v_wheel_deadzone < left_wheel_speed < self.v_wheel_deadzone: + left_wheel_speed = 0 + + if -self.v_wheel_deadzone < right_wheel_speed < self.v_wheel_deadzone: + right_wheel_speed = 0 + + return left_wheel_speed, right_wheel_speed