diff --git a/.gitignore b/.gitignore index b63ec90f2..d03a1f8db 100644 --- a/.gitignore +++ b/.gitignore @@ -16,8 +16,7 @@ cov_html/* dev/* results/* temp/* -client_data/* -remote_data/* +rlberry_data/* # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/CITATION.cff b/CITATION.cff index 527a9b76e..f05179896 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -16,7 +16,7 @@ authors: title: "rlberry - A Reinforcement Learning Library for Research and Education" abbreviation: rlberry -version: 0.2 -doi: 10.5281/zenodo.5544540 +version: 0.2.1 +doi: 10.5281/zenodo.5223307 date-released: 2021-10-01 url: "https://github.com/rlberry-py/rlberry" \ No newline at end of file diff --git a/README.md b/README.md index eb21e1a9f..ba66faf50 100644 --- a/README.md +++ b/README.md @@ -25,9 +25,9 @@ codecov - +

diff --git a/examples/demo_adaptiveql.py b/examples/demo_adaptiveql.py index e761e2a43..145102d23 100644 --- a/examples/demo_adaptiveql.py +++ b/examples/demo_adaptiveql.py @@ -57,7 +57,7 @@ preprocess_func=np.cumsum, title='Cumulative Rewards') for stats in multimanagers.managers: - agent = stats.agent_handlers[0] + agent = stats.get_agent_instances()[0] try: agent.Qtree.plot(0, 25) except AttributeError: diff --git a/examples/demo_agent_manager.py b/examples/demo_agent_manager.py index 526b7fda7..88fb46855 100644 --- a/examples/demo_agent_manager.py +++ b/examples/demo_agent_manager.py @@ -57,7 +57,12 @@ init_kwargs=params, eval_kwargs=eval_kwargs, n_fit=4, - seed=123) + seed=123, + enable_tensorboard=True, + default_writer_kwargs=dict( + maxlen=N_EPISODES - 10, + log_interval=5.0, + )) rskernel_stats = AgentManager( RSKernelUCBVIAgent, train_env, @@ -65,7 +70,8 @@ init_kwargs=params_kernel, eval_kwargs=eval_kwargs, n_fit=4, - seed=123) + seed=123, + enable_tensorboard=True) a2c_stats = AgentManager( A2CAgent, train_env, @@ -81,16 +87,26 @@ for st in agent_manager_list: st.fit() + # Fit RSUCBVI for 50 more episodes + rsucbvi_stats.fit(budget=50) + # learning curves plot_writer_data(agent_manager_list, tag='episode_rewards', preprocess_func=np.cumsum, title='cumulative rewards', show=False) + + plot_writer_data(agent_manager_list, + tag='episode_rewards', + title='episode rewards', + show=False) # compare final policies output = evaluate_agents(agent_manager_list) + print(output) - for st in agent_manager_list: - st.clear_output_dir() + # uncomment to delete output directories + # for st in agent_manager_list: + # st.clear_output_dir() diff --git a/examples/demo_dqn.py b/examples/demo_dqn.py index 771d75ac7..caed348b8 100644 --- a/examples/demo_dqn.py +++ b/examples/demo_dqn.py @@ -12,8 +12,8 @@ agent.set_writer(SummaryWriter()) print(f"Running DQN on {env}") -print(f"Visualize with tensorboard by \ -running:\n$tensorboard --logdir {Path(agent.writer.log_dir).parent}") +print("Visualize with tensorboard by " + f"running:\n$tensorboard --logdir {Path(agent.writer.log_dir).parent}") agent.fit(budget=50) diff --git a/examples/demo_experiment/run.py b/examples/demo_experiment/run.py index 5cc99156e..1ee513d0a 100644 --- a/examples/demo_experiment/run.py +++ b/examples/demo_experiment/run.py @@ -26,4 +26,9 @@ del multimanagers data = load_experiment_results('results', 'params_experiment') + print(data) + + # Fit one of the managers for a few more episodes + # If tensorboard is enabled, you should see more episodes ran for 'rsucbvi_alternative' + data['manager']['rsucbvi_alternative'].fit(50) diff --git a/examples/demo_hyperparam_optim.py b/examples/demo_hyperparam_optim.py index 58e8a4082..2d22bfa51 100644 --- a/examples/demo_hyperparam_optim.py +++ b/examples/demo_hyperparam_optim.py @@ -1,5 +1,5 @@ from rlberry.envs.benchmarks.ball_exploration import PBall2D -from rlberry.agents.torch.ppo import PPOAgent +from rlberry.agents.torch import REINFORCEAgent from rlberry.manager import AgentManager if __name__ == '__main__': @@ -11,54 +11,54 @@ # ----------------------------- # Parameters # ----------------------------- - N_EPISODES = 100 + N_EPISODES = 10 GAMMA = 0.99 HORIZON = 50 BONUS_SCALE_FACTOR = 0.1 MIN_DIST = 0.1 - params_ppo = {"gamma": GAMMA, - "horizon": HORIZON, - "learning_rate": 0.0003} + params = {"gamma": GAMMA, + "horizon": HORIZON, + "learning_rate": 0.0003} eval_kwargs = dict(eval_horizon=HORIZON, n_simulations=20) # ------------------------------- # Run AgentManager and save results # -------------------------------- - ppo_stats = AgentManager( - PPOAgent, train_env, fit_budget=N_EPISODES, - init_kwargs=params_ppo, + manager = AgentManager( + REINFORCEAgent, train_env, fit_budget=N_EPISODES, + init_kwargs=params, eval_kwargs=eval_kwargs, - n_fit=4, - output_dir='dev/') + n_fit=4) # hyperparam optim with multiple threads - ppo_stats.optimize_hyperparams( + manager.optimize_hyperparams( n_trials=5, timeout=None, n_fit=2, sampler_method='optuna_default', optuna_parallelization='thread') - initial_n_trials = len(ppo_stats.optuna_study.trials) + initial_n_trials = len(manager.optuna_study.trials) # save - ppo_stats_fname = ppo_stats.save() - del ppo_stats + manager_fname = manager.save() + del manager # load - ppo_stats = AgentManager.load(ppo_stats_fname) + manager = AgentManager.load(manager_fname) # continue previous optimization, now with 120s of timeout and multiprocessing - ppo_stats.optimize_hyperparams( + manager.optimize_hyperparams( n_trials=512, timeout=120, - n_fit=2, + n_fit=8, continue_previous=True, - optuna_parallelization='process') + optuna_parallelization='process', + n_optuna_workers=4) print("number of initial trials = ", initial_n_trials) - print("number of trials after continuing= ", len(ppo_stats.optuna_study.trials)) + print("number of trials after continuing= ", len(manager.optuna_study.trials)) print("----") print("fitting agents after choosing hyperparams...") - ppo_stats.fit() # fit the 4 agents + manager.fit() # fit the 4 agents diff --git a/examples/demo_network/run_remote_manager.py b/examples/demo_network/run_remote_manager.py index 530766f21..71a395945 100644 --- a/examples/demo_network/run_remote_manager.py +++ b/examples/demo_network/run_remote_manager.py @@ -14,7 +14,7 @@ port = int(input("Select server port: ")) client = BerryClient(port=port) - FIT_BUDGET = 1000 + FIT_BUDGET = 500 local_manager = AgentManager( agent_class=REINFORCEAgent, @@ -35,10 +35,11 @@ fit_budget=FIT_BUDGET, init_kwargs=dict(gamma=0.99), eval_kwargs=dict(eval_horizon=200, n_simulations=20), - n_fit=2, + n_fit=3, seed=10, agent_name='REINFORCE(remote)', - parallelization='process' + parallelization='process', + enable_tensorboard=True, ) remote_manager.set_writer( @@ -48,7 +49,7 @@ ) # Optimize hyperparams of remote agent - best_params = remote_manager.optimize_hyperparams(timeout=120, optuna_parallelization='process') + best_params = remote_manager.optimize_hyperparams(timeout=60, optuna_parallelization='process') print(f'best params = {best_params}') # Test save/load @@ -62,6 +63,9 @@ mmanagers.append(remote_manager) mmanagers.run() + # Fit remotely for a few more episodes + remote_manager.fit(budget=100) + # plot plot_writer_data(mmanagers.managers, tag='episode_rewards', show=False) evaluate_agents(mmanagers.managers, n_simulations=10, show=True) @@ -69,6 +73,7 @@ # Test some methods print([manager.eval_agents() for manager in mmanagers.managers]) - for manager in mmanagers.managers: - manager.clear_handlers() - manager.clear_output_dir() + # # uncomment to clear output files + # for manager in mmanagers.managers: + # manager.clear_handlers() + # manager.clear_output_dir() diff --git a/rlberry/agents/adaptiveql/adaptiveql.py b/rlberry/agents/adaptiveql/adaptiveql.py index ce4b92159..83b97f824 100644 --- a/rlberry/agents/adaptiveql/adaptiveql.py +++ b/rlberry/agents/adaptiveql/adaptiveql.py @@ -2,7 +2,6 @@ import gym.spaces as spaces import numpy as np from rlberry.agents import AgentWithSimplePolicy -from rlberry.utils.writers import DefaultWriter from rlberry.agents.adaptiveql.tree import MDPTreePartition logger = logging.getLogger(__name__) @@ -85,9 +84,6 @@ def reset(self): # info self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): action, _ = self.Qtree.get_argmax_and_node(observation, 0) return action diff --git a/rlberry/agents/agent.py b/rlberry/agents/agent.py index d542c8a0b..b26699223 100644 --- a/rlberry/agents/agent.py +++ b/rlberry/agents/agent.py @@ -5,11 +5,14 @@ import numpy as np from inspect import signature from pathlib import Path +from rlberry import metadata_utils from rlberry import types from rlberry.seeding.seeder import Seeder from rlberry.seeding import safe_reseed from rlberry.envs.utils import process_env -from typing import Any, Optional, Mapping +from rlberry.utils.writers import DefaultWriter +from typing import Optional + logger = logging.getLogger(__name__) @@ -27,8 +30,10 @@ class Agent(ABC): If true, makes a deep copy of the environment. seeder : rlberry.seeding.Seeder, int, or None Object for random number generation. - _metadata : dict - Extra information (e.g. about which is the process id where the agent is running). + _execution_metadata : ExecutionMetadata (optional) + Extra information about agent execution (e.g. about which is the process id where the agent is running). + _default_writer_kwargs : dict (optional) + Parameters to initialize DefaultWriter (attribute self.writer). .. note:: Classes that implement this interface should send ``**kwargs`` to :code:`Agent.__init__()` @@ -45,6 +50,11 @@ class Agent(ABC): Writer object (e.g. tensorboard SummaryWriter). seeder : rlberry.seeding.Seeder, int, or None Object for random number generation. + output_dir : str or Path + Directory that the agent can use to store data. + unique_id : str + Unique identifier for the agent instance. Can be used, for example, + to create files/directories for the agent to log data safely. """ name = "" @@ -54,7 +64,9 @@ def __init__(self, eval_env: Optional[types.Env] = None, copy_env: bool = True, seeder: Optional[types.Seed] = None, - _metadata: Optional[Mapping[str, Any]] = None, + output_dir: Optional[str] = None, + _execution_metadata: Optional[metadata_utils.ExecutionMetadata] = None, + _default_writer_kwargs: Optional[dict] = None, **kwargs): # Check if wrong parameters have been sent to an agent. assert kwargs == {}, \ @@ -62,14 +74,37 @@ def __init__(self, self.seeder = Seeder(seeder) self.env = process_env(env, self.seeder, copy_env=copy_env) - self.writer = None # evaluation environment eval_env = eval_env or env self.eval_env = process_env(eval_env, self.seeder, copy_env=True) # metadata - self._metadata = _metadata or dict() + self._execution_metadata = _execution_metadata or metadata_utils.ExecutionMetadata() + self._unique_id = metadata_utils.get_unique_id(self) + if self.name: + self._unique_id = self.name + '_' + self._unique_id + + # create writer + _default_writer_kwargs = _default_writer_kwargs or dict( + name=self.name, execution_metadata=self._execution_metadata) + self._writer = DefaultWriter(**_default_writer_kwargs) + + # output directory for the agent instance + self._output_dir = output_dir or f"output_{self._unique_id}" + self._output_dir = Path(self._output_dir) + + @property + def writer(self): + return self._writer + + @property + def unique_id(self): + return self._unique_id + + @property + def output_dir(self): + return self._output_dir @abstractmethod def fit(self, budget: int, **kwargs): @@ -87,6 +122,8 @@ def fit(self, budget: int, **kwargs): optimization (by allowing early stopping), but it is not strictly required elsewhere in the library. + If the agent does not require a budget, set it to -1. + Parameters ---------- budget: int @@ -110,9 +147,9 @@ def eval(self, **kwargs): pass def set_writer(self, writer): - self.writer = writer + self._writer = writer - if self.writer: + if self._writer: init_args = signature(self.__init__).parameters kwargs = [f"| {key} | {getattr(self, key, None)} |" for key in init_args] writer.add_text( diff --git a/rlberry/agents/jax/dqn/dqn.py b/rlberry/agents/jax/dqn/dqn.py index e92d641d2..7f889f260 100644 --- a/rlberry/agents/jax/dqn/dqn.py +++ b/rlberry/agents/jax/dqn/dqn.py @@ -41,7 +41,6 @@ from rlberry import types from rlberry.agents import AgentWithSimplePolicy from rlberry.agents.jax.utils.replay_buffer import ReplayBuffer -from rlberry.utils.writers import DefaultWriter from typing import Any, Callable, Mapping, Optional logger = logging.getLogger(__name__) @@ -135,7 +134,6 @@ def __init__( AgentWithSimplePolicy.__init__(self, env, **kwargs) env = self.env self.rng_key = jax.random.PRNGKey(self.rng.integers(2 ** 32).item()) - self.writer = DefaultWriter(name=self.name, metadata=self._metadata) # checks if not isinstance(self.env.observation_space, spaces.Box): @@ -433,7 +431,7 @@ def load(cls, filename, **kwargs): agent._all_states = agent_data['states'] writer = agent_data['writer'] if writer: - agent.writer = writer + agent._writer = writer return agent # diff --git a/rlberry/agents/kernel_based/rs_kernel_ucbvi.py b/rlberry/agents/kernel_based/rs_kernel_ucbvi.py index 5e676ae22..e376fdbab 100644 --- a/rlberry/agents/kernel_based/rs_kernel_ucbvi.py +++ b/rlberry/agents/kernel_based/rs_kernel_ucbvi.py @@ -10,7 +10,6 @@ from rlberry.utils.metrics import metric_lp from rlberry.agents.kernel_based.kernels import kernel_func from rlberry.agents.kernel_based.common import map_to_representative -from rlberry.utils.writers import DefaultWriter logger = logging.getLogger(__name__) @@ -250,9 +249,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.Q_policy is not None diff --git a/rlberry/agents/kernel_based/rs_ucbvi.py b/rlberry/agents/kernel_based/rs_ucbvi.py index 9f15fff08..a9908c0e1 100644 --- a/rlberry/agents/kernel_based/rs_ucbvi.py +++ b/rlberry/agents/kernel_based/rs_ucbvi.py @@ -6,7 +6,6 @@ from rlberry.agents.dynprog.utils import backward_induction from rlberry.agents.dynprog.utils import backward_induction_in_place from rlberry.agents.kernel_based.common import map_to_representative -from rlberry.utils.writers import DefaultWriter logger = logging.getLogger(__name__) @@ -191,9 +190,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.Q_policy is not None diff --git a/rlberry/agents/linear/lsvi_ucb.py b/rlberry/agents/linear/lsvi_ucb.py index e8890dfac..438b39e3c 100644 --- a/rlberry/agents/linear/lsvi_ucb.py +++ b/rlberry/agents/linear/lsvi_ucb.py @@ -2,7 +2,6 @@ import numpy as np from rlberry.agents import AgentWithSimplePolicy from gym.spaces import Discrete -from rlberry.utils.writers import DefaultWriter from rlberry.utils.jit_setup import numba_jit logger = logging.getLogger(__name__) @@ -203,9 +202,6 @@ def reset(self): # self.w_policy = None - # default writer - self.writer = DefaultWriter(name=self.name, metadata=self._metadata) - def fit(self, budget, **kwargs): del kwargs diff --git a/rlberry/agents/optql/optql.py b/rlberry/agents/optql/optql.py index 8e71e5999..f0fc66750 100644 --- a/rlberry/agents/optql/optql.py +++ b/rlberry/agents/optql/optql.py @@ -4,7 +4,6 @@ import gym.spaces as spaces from rlberry.agents import AgentWithSimplePolicy from rlberry.exploration_tools.discrete_counter import DiscreteCounter -from rlberry.utils.writers import DefaultWriter logger = logging.getLogger(__name__) @@ -103,9 +102,6 @@ def reset(self, **kwargs): self.counter = DiscreteCounter(self.env.observation_space, self.env.action_space) - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): """ Recommended policy. """ state = observation diff --git a/rlberry/agents/torch/a2c/a2c.py b/rlberry/agents/torch/a2c/a2c.py index d444140c5..6fe9b475e 100644 --- a/rlberry/agents/torch/a2c/a2c.py +++ b/rlberry/agents/torch/a2c/a2c.py @@ -9,7 +9,6 @@ from rlberry.agents.torch.utils.models import default_policy_net_fn from rlberry.agents.torch.utils.models import default_value_net_fn from rlberry.utils.torch import choose_device -from rlberry.utils.writers import DefaultWriter from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper logger = logging.getLogger(__name__) @@ -144,9 +143,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.cat_policy is not None diff --git a/rlberry/agents/torch/avec/avec_ppo.py b/rlberry/agents/torch/avec/avec_ppo.py index a5351aa0f..a3176f34a 100644 --- a/rlberry/agents/torch/avec/avec_ppo.py +++ b/rlberry/agents/torch/avec/avec_ppo.py @@ -9,7 +9,6 @@ from rlberry.agents.torch.utils.models import default_policy_net_fn from rlberry.agents.torch.utils.models import default_value_net_fn from rlberry.utils.torch import choose_device -from rlberry.utils.writers import DefaultWriter from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper logger = logging.getLogger(__name__) @@ -183,9 +182,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.cat_policy is not None diff --git a/rlberry/agents/torch/dqn/dqn.py b/rlberry/agents/torch/dqn/dqn.py index da4958485..df8ada635 100644 --- a/rlberry/agents/torch/dqn/dqn.py +++ b/rlberry/agents/torch/dqn/dqn.py @@ -135,7 +135,6 @@ def __init__(self, self.training = True self.steps = 0 self.episode = 0 - self.writer = None self.optimizer_kwargs = {'optimizer_type': optimizer_type, 'lr': learning_rate} @@ -465,7 +464,7 @@ def initialize_model(self): self.value_net.reset() def set_writer(self, writer): - self.writer = writer + self._writer = writer try: self.exploration_policy.set_writer(writer) except AttributeError: diff --git a/rlberry/agents/torch/ppo/ppo.py b/rlberry/agents/torch/ppo/ppo.py index 3a73e2e45..5b32c5b97 100644 --- a/rlberry/agents/torch/ppo/ppo.py +++ b/rlberry/agents/torch/ppo/ppo.py @@ -10,7 +10,6 @@ from rlberry.agents.torch.utils.models import default_policy_net_fn from rlberry.agents.torch.utils.models import default_value_net_fn from rlberry.utils.torch import choose_device -from rlberry.utils.writers import DefaultWriter from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper @@ -174,9 +173,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.cat_policy is not None diff --git a/rlberry/agents/torch/reinforce/reinforce.py b/rlberry/agents/torch/reinforce/reinforce.py index 6d2843536..80255877c 100644 --- a/rlberry/agents/torch/reinforce/reinforce.py +++ b/rlberry/agents/torch/reinforce/reinforce.py @@ -7,7 +7,6 @@ from rlberry.agents.torch.utils.training import optimizer_factory from rlberry.agents.torch.utils.models import default_policy_net_fn from rlberry.utils.torch import choose_device -from rlberry.utils.writers import DefaultWriter logger = logging.getLogger(__name__) @@ -113,9 +112,6 @@ def reset(self, **kwargs): self.episode = 0 - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.policy_net is not None diff --git a/rlberry/agents/torch/tests/test_actor_critic_algos.py b/rlberry/agents/torch/tests/test_actor_critic_algos.py index 3d8dc1db7..989cf63dd 100644 --- a/rlberry/agents/torch/tests/test_actor_critic_algos.py +++ b/rlberry/agents/torch/tests/test_actor_critic_algos.py @@ -88,7 +88,6 @@ def test_ppo_agent_partial_fit(): eps_clip=0.2, k_epochs=4, use_bonus=False) - agent._log_interval = 0 agent.fit(budget=n_episodes // 2) agent.policy(env.observation_space.sample()) diff --git a/rlberry/agents/ucbvi/ucbvi.py b/rlberry/agents/ucbvi/ucbvi.py index cd69b8d60..99476b5f4 100644 --- a/rlberry/agents/ucbvi/ucbvi.py +++ b/rlberry/agents/ucbvi/ucbvi.py @@ -7,7 +7,6 @@ from rlberry.exploration_tools.discrete_counter import DiscreteCounter from rlberry.agents.dynprog.utils import backward_induction_sd from rlberry.agents.dynprog.utils import backward_induction_in_place -from rlberry.utils.writers import DefaultWriter logger = logging.getLogger(__name__) @@ -154,9 +153,6 @@ def reset(self, **kwargs): if self.real_time_dp: self.name = 'UCBVI-RTDP' - # default writer - self.writer = DefaultWriter(self.name, metadata=self._metadata) - def policy(self, observation): state = observation assert self.Q_policy is not None diff --git a/rlberry/check_packages.py b/rlberry/check_packages.py index df8d2bd98..8bac791e1 100644 --- a/rlberry/check_packages.py +++ b/rlberry/check_packages.py @@ -5,6 +5,12 @@ except ModuleNotFoundError: TORCH_INSTALLED = False +TENSORBOARD_INSTALLED = True +try: + import torch.utils.tensorboard +except ModuleNotFoundError: + TENSORBOARD_INSTALLED = False + NUMBA_INSTALLED = True try: import numba diff --git a/rlberry/envs/basewrapper.py b/rlberry/envs/basewrapper.py index 6f6c90a03..f0bb3c6e4 100644 --- a/rlberry/envs/basewrapper.py +++ b/rlberry/envs/basewrapper.py @@ -1,6 +1,7 @@ from rlberry.seeding import Seeder, safe_reseed import numpy as np from rlberry.envs.interface import Model +from rlberry.spaces.from_gym import convert_space_from_gym class Wrapper(Model): @@ -12,23 +13,34 @@ class Wrapper(Model): The input environment is not copied (Wrapper.env points to the input env). + Parameters + ---------- + env: gym.Env + Environment to be wrapped. + wrap_spaces: bool, default = False + If True, gym.spaces are converted to rlberry.spaces, which defined a reseed() method. + See also: https://stackoverflow.com/questions/1443129/completely-wrap-an-object-in-python [1] https://github.com/openai/gym/blob/master/gym/core.py """ - def __init__(self, env): + def __init__(self, env, wrap_spaces=False): # Init base class Model.__init__(self) # Save reference to env self.env = env - - self.observation_space = self.env.observation_space - self.action_space = self.env.action_space self.metadata = self.env.metadata + if wrap_spaces: + self.observation_space = convert_space_from_gym(self.env.observation_space) + self.action_space = convert_space_from_gym(self.env.action_space) + else: + self.observation_space = self.env.observation_space + self.action_space = self.env.action_space + try: self.reward_range = self.env.reward_range except AttributeError: @@ -65,8 +77,8 @@ def reseed(self, seed_seq=None): self.seeder = Seeder(seed_seq) # seed gym.Env that is not a rlberry Model if not isinstance(self.env, Model): - # get a seed for gym environment - safe_reseed(self.env, self.seeder) + # get a seed for gym environment; spaces are reseeded below. + safe_reseed(self.env, self.seeder, reseed_spaces=False) # seed rlberry Model else: self.env.reseed(self.seeder) diff --git a/rlberry/envs/finite/gridworld.py b/rlberry/envs/finite/gridworld.py index 1f965d6d1..082ed41ef 100644 --- a/rlberry/envs/finite/gridworld.py +++ b/rlberry/envs/finite/gridworld.py @@ -67,7 +67,7 @@ def __init__(self, if terminal_states is not None: self.terminal_states = terminal_states else: - self.terminal_states = ((nrows - 1, ncols - 1),) + self.terminal_states = () # Probability of going left/right/up/down when choosing the # correspondent action @@ -354,7 +354,8 @@ def get_layout_img( # map data to [0.0, 1.0] if state_data is not None: state_data = state_data - state_data.min() - state_data = state_data / state_data.max() + if state_data.max() > 0.0: + state_data = state_data / state_data.max() colormap_fn = plt.get_cmap(colormap_name) layout = self.get_layout_array(state_data, fill_walls_with=np.nan) @@ -364,9 +365,9 @@ def get_layout_img( for rr in range(layout.shape[0]): for cc in range(layout.shape[1]): if np.isnan(layout[rr, cc]): - img[rr, cc, :] = wall_color + img[self.nrows - 1 - rr, cc, :] = wall_color else: - img[rr, cc, :3] = scalar_map.to_rgba(layout[rr, cc])[:3] + img[self.nrows - 1 - rr, cc, :3] = scalar_map.to_rgba(layout[rr, cc])[:3] return img def get_background(self): diff --git a/rlberry/envs/gym_make.py b/rlberry/envs/gym_make.py index dd57c7bba..93766e049 100644 --- a/rlberry/envs/gym_make.py +++ b/rlberry/envs/gym_make.py @@ -2,10 +2,20 @@ from rlberry.envs.basewrapper import Wrapper -def gym_make(id, **kwargs): +def gym_make(id, wrap_spaces=False, **kwargs): """ Same as gym.make, but wraps the environment to ensure unified seeding with rlberry. + + Parameters + ---------- + id : str + Environment id. + wrap_spaces : bool, default = False + If true, also wraps observation_space and action_space using classes in rlberry.spaces, + that define a reseed() method. + **kwargs + Optional arguments to configure the environment. """ if "module_import" in kwargs: __import__(kwargs.pop("module_import")) @@ -14,7 +24,7 @@ def gym_make(id, **kwargs): env.configure(kwargs) except AttributeError: pass - return Wrapper(env) + return Wrapper(env, wrap_spaces=wrap_spaces) def atari_make(id, scalarize=True, **kwargs): diff --git a/rlberry/envs/utils.py b/rlberry/envs/utils.py index 3d9cd3678..e29a4cdde 100644 --- a/rlberry/envs/utils.py +++ b/rlberry/envs/utils.py @@ -20,5 +20,6 @@ def process_env(env, seeder, copy_env=True): else: processed_env = env reseeded = safe_reseed(processed_env, seeder) - assert reseeded + if not reseeded: + logger.warning("[Agent] Not possible to reseed environment.") return processed_env diff --git a/rlberry/experiment/generator.py b/rlberry/experiment/generator.py index a93625423..816016a7e 100644 --- a/rlberry/experiment/generator.py +++ b/rlberry/experiment/generator.py @@ -1,27 +1,22 @@ """Run experiments. Usage: - run.py [--writer] [--n_fit=] [--output_dir=

] [--parallelization=] + run.py [--enable_tensorboard] [--n_fit=] [--output_dir=] [--parallelization=] run.py (-h | --help) Options: - -h --help Show this screen. - --writer Use a tensorboard writer. - --n_fit= Number of times each agent is fit [default: 4]. - --output_dir= Directory to save the results [default: results]. + -h --help Show this screen. + --enable_tensorboard Enable tensorboard writer in AgentManager. + --n_fit= Number of times each agent is fit [default: 4]. + --output_dir= Directory to save the results [default: results]. --parallelization= Either 'thread' or 'process' [default: process]. """ import logging from docopt import docopt from pathlib import Path -from datetime import datetime from rlberry.experiment.yaml_utils import parse_experiment_config - -_TENSORBOARD_INSTALLED = True -try: - from torch.utils.tensorboard import SummaryWriter -except ImportError: - _TENSORBOARD_INSTALLED = False +from rlberry.manager import AgentManager +from rlberry import check_packages logger = logging.getLogger(__name__) @@ -31,17 +26,15 @@ def experiment_generator(): Parse command line arguments and yields AgentManager instances. """ args = docopt(__doc__) - for (_, agent_manager) in parse_experiment_config( + for (_, agent_manager_kwargs) in parse_experiment_config( Path(args[""]), n_fit=int(args["--n_fit"]), output_base_dir=args["--output_dir"], parallelization=args["--parallelization"]): - if args["--writer"]: - if _TENSORBOARD_INSTALLED: - for idx in range(agent_manager.n_fit): - logdir = agent_manager.output_dir / f"run_{idx + 1}_{datetime.now().strftime('%b%d_%H-%M-%S')}" - agent_manager.set_writer(idx=idx, writer_fn=SummaryWriter, writer_kwargs={'log_dir': logdir}) + if args["--enable_tensorboard"]: + if check_packages.TENSORBOARD_INSTALLED: + agent_manager_kwargs.update(dict(enable_tensorboard=True)) else: - logger.warning('Option --writer is not available: tensorboard is not installed.') + logger.warning('Option --enable_tensorboard is not available: tensorboard is not installed.') - yield agent_manager + yield AgentManager(**agent_manager_kwargs) diff --git a/rlberry/experiment/tests/test_experiment_generator.py b/rlberry/experiment/tests/test_experiment_generator.py index bf4340bdc..0d6e6c8ab 100644 --- a/rlberry/experiment/tests/test_experiment_generator.py +++ b/rlberry/experiment/tests/test_experiment_generator.py @@ -16,25 +16,25 @@ def test_mock_args(monkeypatch): random_numbers.append(rng.uniform(size=10)) assert agent_manager.agent_class is RSUCBVIAgent - assert agent_manager.init_kwargs['horizon'] == 51 + assert agent_manager._base_init_kwargs['horizon'] == 51 assert agent_manager.fit_budget == 10 assert agent_manager.eval_kwargs['eval_horizon'] == 51 - assert agent_manager.init_kwargs['lp_metric'] == 2 - assert agent_manager.init_kwargs['min_dist'] == 0.0 - assert agent_manager.init_kwargs['max_repr'] == 800 - assert agent_manager.init_kwargs['bonus_scale_factor'] == 1.0 - assert agent_manager.init_kwargs['reward_free'] is True + assert agent_manager._base_init_kwargs['lp_metric'] == 2 + assert agent_manager._base_init_kwargs['min_dist'] == 0.0 + assert agent_manager._base_init_kwargs['max_repr'] == 800 + assert agent_manager._base_init_kwargs['bonus_scale_factor'] == 1.0 + assert agent_manager._base_init_kwargs['reward_free'] is True train_env = agent_manager.train_env[0](**agent_manager.train_env[1]) assert train_env.reward_free is False assert train_env.array_observation is True if agent_manager.agent_name == 'rsucbvi': - assert agent_manager.init_kwargs['gamma'] == 1.0 + assert agent_manager._base_init_kwargs['gamma'] == 1.0 elif agent_manager.agent_name == 'rsucbvi_alternative': - assert agent_manager.init_kwargs['gamma'] == 0.9 + assert agent_manager._base_init_kwargs['gamma'] == 0.9 else: raise ValueError() diff --git a/rlberry/experiment/yaml_utils.py b/rlberry/experiment/yaml_utils.py index c9250af18..e9852512e 100644 --- a/rlberry/experiment/yaml_utils.py +++ b/rlberry/experiment/yaml_utils.py @@ -1,8 +1,6 @@ from pathlib import Path from typing import Generator, Tuple import yaml - -from rlberry.manager import AgentManager from rlberry.utils.factory import load _AGENT_KEYS = ('init_kwargs', 'eval_kwargs', 'fit_kwargs') @@ -102,7 +100,7 @@ def read_env_config(config_path): def parse_experiment_config(path: Path, n_fit: int = 4, output_base_dir: str = 'results', - parallelization: str = 'process') -> Generator[Tuple[int, AgentManager], None, None]: + parallelization: str = 'process') -> Generator[Tuple[int, dict], None, None]: """ Read .yaml files. set global seed and convert to AgentManager instances. @@ -133,8 +131,8 @@ def parse_experiment_config(path: Path, ------- seed: int global seed - agent_manager: AgentManager - the Agent Stats to fit + agent_manager_kwargs: + parameters to create an AgentManager instance. """ with path.open() as file: config = yaml.safe_load(file) @@ -185,7 +183,7 @@ def parse_experiment_config(path: Path, # append run index to dir output_dir = output_dir / str(last + 1) - yield seed, AgentManager( + yield seed, dict( agent_class=agent_class, init_kwargs=init_kwargs, eval_kwargs=eval_kwargs, diff --git a/rlberry/manager/agent_manager.py b/rlberry/manager/agent_manager.py index c1c63ec80..9c4ec685c 100644 --- a/rlberry/manager/agent_manager.py +++ b/rlberry/manager/agent_manager.py @@ -1,10 +1,10 @@ import concurrent.futures from copy import deepcopy -from datetime import datetime from pathlib import Path from rlberry.seeding import safe_reseed, set_external_seed from rlberry.seeding import Seeder +from rlberry import metadata_utils import functools import json @@ -17,7 +17,6 @@ import threading import multiprocessing import numpy as np -import uuid from rlberry.envs.utils import process_env from rlberry.utils.logging import configure_logging from rlberry.utils.writers import DefaultWriter @@ -55,7 +54,7 @@ class AgentHandler: Class of the agent to be wrapped agent_instance: An instance of agent_class, or None (if not loaded). - **agent_kwargs: + agent_kwargs: Arguments required by __init__ method of agent_class. """ @@ -65,13 +64,13 @@ def __init__(self, seeder, agent_class, agent_instance=None, - **agent_kwargs) -> None: + agent_kwargs=None) -> None: self._id = id self._fname = Path(filename) self._seeder = seeder self._agent_class = agent_class self._agent_instance = agent_instance - self._agent_kwargs = agent_kwargs + self._agent_kwargs = agent_kwargs or {} @property def id(self): @@ -80,6 +79,11 @@ def id(self): def set_instance(self, agent_instance): self._agent_instance = agent_instance + def get_instance(self): + if not self.is_loaded(): + self.load() + return self._agent_instance + def is_empty(self): return self._agent_instance is None and (not self._fname.exists()) @@ -156,7 +160,7 @@ class AgentManager: eval_env : Tuple (constructor, kwargs) Environment used to evaluate the agent. If None, set train_env. init_kwargs : dict - Arguments required by the agent's constructor. + Arguments required by the agent's constructor. Shared across all n_fit instances. fit_kwargs : dict Extra required to call agent.fit(bugdet, **fit_kwargs). eval_kwargs : dict @@ -169,16 +173,26 @@ class AgentManager: Directory where to store data. parallelization: {'thread', 'process'}, default: 'process' Whether to parallelize agent training using threads or processes. - thread_logging_level : str, default: 'INFO' - Logging level in each of the threads used to fit agents. + worker_logging_level : str, default: 'INFO' + Logging level in each of the threads/processes used to fit agents. seed : np.random.SeedSequence, rlberry.seeding.Seeder or int, default : None Seed sequence from which to spawn the random number generator. If None, generate random seed. If int, use as entropy for SeedSequence. If seeder, use seeder.seed_seq + enable_tensorboard : bool, default = False + If True, enable tensorboard logging in Agent's DefaultWriter. create_unique_out_dir : bool, default = True If true, data is saved to output_dir/manager_data/ Otherwise, data is saved to output_dir/manager_data + default_writer_kwargs : dict + Optional arguments for DefaultWriter. + init_kwargs_per_instance : List[dict] (optional) + List of length n_fit containing the params to be passed to each of + the n_fit agent instances. It can be useful if different instances + require different parameters. If the same parameter is defined by + init_kwargs and init_kwargs_per_instance, the value given by + init_kwargs_per_instance will be used. """ def __init__(self, @@ -193,9 +207,12 @@ def __init__(self, n_fit=4, output_dir=None, parallelization='thread', - thread_logging_level='INFO', + worker_logging_level='INFO', seed=None, - create_unique_out_dir=True): + enable_tensorboard=False, + create_unique_out_dir=True, + default_writer_kwargs=None, + init_kwargs_per_instance=None): # agent_class should only be None when the constructor is called # by the class method AgentManager.load(), since the agent class # will be loaded. @@ -218,9 +235,7 @@ def __init__(self, eval_env, Tuple), "[AgentManager]train_env must be Tuple (constructor, kwargs)" # create oject identifier - timestamp = datetime.timestamp(datetime.now()) - self.timestamp = str(timestamp).replace('.', '') - self.unique_id = str(id(self)) + self.timestamp + self.unique_id = metadata_utils.get_unique_id(self) # Agent class self.agent_class = agent_class @@ -235,17 +250,17 @@ def __init__(self, self._eval_env = eval_env # check kwargs - init_kwargs = init_kwargs or {} fit_kwargs = fit_kwargs or {} eval_kwargs = eval_kwargs or {} # params - self.init_kwargs = deepcopy(init_kwargs) + base_init_kwargs = init_kwargs or {} + self._base_init_kwargs = deepcopy(base_init_kwargs) self.fit_kwargs = deepcopy(fit_kwargs) self.eval_kwargs = deepcopy(eval_kwargs) self.n_fit = n_fit self.parallelization = parallelization - self.thread_logging_level = thread_logging_level + self.worker_logging_level = worker_logging_level if fit_budget is not None: self.fit_budget = fit_budget else: @@ -253,18 +268,50 @@ def __init__(self, self.fit_budget = self.fit_kwargs.pop('fit_budget') except KeyError: raise ValueError('[AgentManager] fit_budget missing in __init__().') + # extra params per instance + if init_kwargs_per_instance is not None: + assert len(init_kwargs_per_instance) == n_fit + init_kwargs_per_instance = deepcopy(init_kwargs_per_instance) + self.init_kwargs_per_instance = init_kwargs_per_instance or [dict() for _ in range(n_fit)] # output dir if output_dir is None: - output_dir = 'temp/' + output_dir = metadata_utils.RLBERRY_TEMP_DATA_DIR self.output_dir = Path(output_dir) / 'manager_data' if create_unique_out_dir: - self.output_dir = self.output_dir / (self.agent_name + '_id' + self.unique_id) + self.output_dir = self.output_dir / (self.agent_name + '_' + self.unique_id) # Create list of writers for each agent that will be trained + # 'default' will keep Agent's use of DefaultWriter. self.writers = [('default', None) for _ in range(n_fit)] - # + # Parameters to setup Agent's DefaultWriter + self.agent_default_writer_kwargs = [ + dict( + name=self.agent_name, + log_interval=3, + tensorboard_kwargs=None, + execution_metadata=metadata_utils.ExecutionMetadata(obj_worker_id=idx) + ) + for idx in range(n_fit) + ] + self.tensorboard_dir = None + if enable_tensorboard: + self.tensorboard_dir = self.output_dir / 'tensorboard' + for idx, params in enumerate(self.agent_default_writer_kwargs): + params['tensorboard_kwargs'] = dict( + log_dir=self.tensorboard_dir / str(idx) + ) + # Update DefaultWriter according to user's settings. + default_writer_kwargs = default_writer_kwargs or {} + if default_writer_kwargs: + logger.warning('(Re)defining the following DefaultWriter' + f' parameters in AgentManager: {list(default_writer_kwargs.keys())}') + for ii in range(n_fit): + self.agent_default_writer_kwargs[ii].update(default_writer_kwargs) + + # agent handlers and init kwargs + self._set_init_kwargs() # init_kwargs for each agent self.agent_handlers = None self._reset_agent_handlers() self.default_writer_data = None @@ -285,6 +332,26 @@ def _init_optuna_storage_url(self): self.optuna_storage_url = "sqlite:///:memory:" logger.warning(f'Unable to create databate {self.db_filename}. Using sqlite:///:memory:') + def _set_init_kwargs(self): + init_seeders = self.seeder.spawn(self.n_fit, squeeze=False) + self.init_kwargs = [] + for ii in range(self.n_fit): + kwargs_ii = deepcopy(self._base_init_kwargs) + kwargs_ii.update( + dict( + env=self.train_env, + eval_env=self._eval_env, + copy_env=False, + seeder=init_seeders[ii], + output_dir=Path(self.output_dir) / f"output_{ii}", + _execution_metadata=self.agent_default_writer_kwargs[ii]['execution_metadata'], + _default_writer_kwargs=self.agent_default_writer_kwargs[ii], + ) + ) + per_instance_kwargs = self.init_kwargs_per_instance[ii] + kwargs_ii.update(per_instance_kwargs) + self.init_kwargs.append(kwargs_ii) + def _reset_agent_handlers(self): handlers_seeders = self.seeder.spawn(self.n_fit, squeeze=False) self.agent_handlers = [ @@ -295,8 +362,7 @@ def _reset_agent_handlers(self): agent_class=self.agent_class, agent_instance=None, # kwargs - env=self.train_env, - **self.init_kwargs, + agent_kwargs=self.init_kwargs[ii], ) for ii in range(self.n_fit) ] @@ -311,6 +377,11 @@ def build_eval_env(self): def get_writer_data(self): return self.default_writer_data + def get_agent_instances(self): + if self.agent_handlers: + return [agent_handler.get_instance() for agent_handler in self.agent_handlers] + return [] + def eval_agents(self, n_simulations: Optional[int] = None) -> list: """ Call .eval() method in fitted agents and returns a list with the results. @@ -405,20 +476,17 @@ def fit(self, budget=None, **kwargs): raise ValueError(f'Invalid backend for parallelization: {self.parallelization}') args = [( - idx, lock, handler, self.agent_class, - self.train_env, - self._eval_env, budget, - deepcopy(self.init_kwargs), + init_kwargs, deepcopy(self.fit_kwargs), writer, - self.thread_logging_level, + self.worker_logging_level, seeder) - for idx, (handler, seeder, writer) - in enumerate(zip(self.agent_handlers, seeders, self.writers))] + for init_kwargs, handler, seeder, writer + in zip(self.init_kwargs, self.agent_handlers, seeders, self.writers)] if len(args) == 1: workers_output = [_fit_worker(args[0])] @@ -667,7 +735,7 @@ def optimize_hyperparams(self, # objective = functools.partial( _optuna_objective, - init_kwargs=self.init_kwargs, # self.init_kwargs + base_init_kwargs=self._base_init_kwargs, # self._base_init_kwargs agent_class=self.agent_class, # self.agent_class train_env=self.train_env, # self.train_env eval_env=self._eval_env, @@ -731,9 +799,10 @@ def optimize_hyperparams(self, self.best_hyperparams = best_trial.params # update using best parameters - self.init_kwargs.update(best_trial.params) + self._base_init_kwargs.update(best_trial.params) - # reset agent handlers, so that they take the new parameters + # reset init_kwargs and agent handlers, so that they take the new parameters + self._set_init_kwargs() self._reset_agent_handlers() return deepcopy(best_trial.params) @@ -748,37 +817,29 @@ def _fit_worker(args): """ Create and fit an agent instance """ - idx, lock, agent_handler, agent_class, train_env, eval_env, fit_budget, init_kwargs, \ - fit_kwargs, writer, thread_logging_level, seeder = args + (lock, agent_handler, agent_class, fit_budget, init_kwargs, + fit_kwargs, writer, worker_logging_level, seeder) = args # reseed external libraries set_external_seed(seeder) # logging level in thread - configure_logging(thread_logging_level) + configure_logging(worker_logging_level) # Using a lock when creating envs and agents, to avoid problems # as here: https://github.com/openai/gym/issues/281 with lock: if agent_handler.is_empty(): # create agent - agent = agent_class( - env=train_env, - eval_env=eval_env, - copy_env=False, - seeder=seeder, - _metadata=dict( - worker=idx, - ), - **init_kwargs) + agent = agent_class(**init_kwargs) # seed agent - agent.reseed(seeder) + agent.reseed(seeder) # TODO: check if extra reseeding here is necessary agent_handler.set_instance(agent) # set writer if writer[0] is None: agent_handler.set_writer(None) - elif writer[0] != 'default': + elif writer[0] != 'default': # 'default' corresponds to DefaultWriter created by Agent.__init__() writer_fn = writer[0] writer_kwargs = writer[1] agent_handler.set_writer(writer_fn(**writer_kwargs)) @@ -813,7 +874,7 @@ def default(obj): def _optuna_objective( trial, - init_kwargs, # self.init_kwargs + base_init_kwargs, # self._base_init_kwargs agent_class, # self.agent_class train_env, # self.train_env eval_env, @@ -824,7 +885,7 @@ def _optuna_objective( disable_evaluation_writers, fit_fraction ): - kwargs = deepcopy(init_kwargs) + kwargs = deepcopy(base_init_kwargs) # will raise exception if sample_parameters() is not # implemented by the agent class @@ -841,11 +902,12 @@ def _optuna_objective( eval_env=eval_env, init_kwargs=kwargs, # kwargs are being optimized eval_kwargs=deepcopy(eval_kwargs), - agent_name='optim_' + uuid.uuid4().hex, + agent_name='optim', n_fit=n_fit, - thread_logging_level='INFO', + worker_logging_level='INFO', parallelization='thread', output_dir=temp_dir, + enable_tensorboard=False, create_unique_out_dir=True) if disable_evaluation_writers: diff --git a/rlberry/manager/evaluation.py b/rlberry/manager/evaluation.py index 785cb31ae..dd3a64b47 100644 --- a/rlberry/manager/evaluation.py +++ b/rlberry/manager/evaluation.py @@ -87,6 +87,7 @@ def evaluate_agents(agent_manager_list, def plot_writer_data(agent_manager, tag, + xtag=None, fignum=None, show=True, preprocess_func=None, @@ -100,7 +101,9 @@ def plot_writer_data(agent_manager, ---------- agent_manager : AgentManager, or list of AgentManager tag : str - Tag of data to plot. + Tag of data to plot on y-axis. + xtag : str + Tag of data to plot on x-axis. If None, use 'global_step'. fignum: string or int Identifier of plot figure. show: bool @@ -112,6 +115,10 @@ def plot_writer_data(agent_manager, Optional title to plot. If None, set to tag. sns_kwargs: dict Optional extra params for seaborn lineplot. + + Returns + ------- + Pandas DataFrame with processed data used by seaborn's lineplot. """ sns_kwargs = sns_kwargs or {'ci': 'sd'} @@ -136,11 +143,15 @@ def plot_writer_data(agent_manager, if writer_data is not None: for idx in writer_data: df = writer_data[idx] - df = pd.DataFrame(df[df['tag'] == tag]) - df['value'] = preprocess_func(df['value'].values) + processed_df = pd.DataFrame(df[df['tag'] == tag]) + processed_df['value'] = preprocess_func(processed_df['value'].values) # update name according to AgentManager name - df['name'] = agent_name - data_list.append(df) + processed_df['name'] = agent_name + # add column with xtag, if given + if xtag is not None: + df_xtag = pd.DataFrame(df[df['tag'] == xtag]) + processed_df[xtag] = df_xtag['value'].values + data_list.append(processed_df) if len(data_list) == 0: logger.error('[plot_writer_data]: No data to be plotted.') return @@ -148,17 +159,24 @@ def plot_writer_data(agent_manager, all_writer_data = pd.concat(data_list, ignore_index=True) data = all_writer_data[all_writer_data['tag'] == tag] - if data['global_step'].notnull().sum() > 0: - xx = 'global_step' + if xtag is None: + xtag = 'global_step' + + if data[xtag].notnull().sum() > 0: + xx = xtag if data['global_step'].isna().sum() > 0: - logger.warning(f'Plotting {tag} vs global_step, but global_step might be missing for some agents.') + logger.warning(f'Plotting {tag} vs {xtag}, but {xtag} might be missing for some agents.') else: xx = data.index plt.figure(fignum) - sns.lineplot(x=xx, y='value', hue='name', style='name', data=data, **sns_kwargs) + lineplot_kwargs = dict(x=xx, y='value', hue='name', style='name', data=data) + lineplot_kwargs.update(sns_kwargs) + sns.lineplot(**lineplot_kwargs) plt.title(title) plt.ylabel(ylabel) if show: plt.show() + + return data diff --git a/rlberry/manager/remote_agent_manager.py b/rlberry/manager/remote_agent_manager.py index fffaaa8fb..5d296bf34 100644 --- a/rlberry/manager/remote_agent_manager.py +++ b/rlberry/manager/remote_agent_manager.py @@ -1,9 +1,11 @@ +import base64 import dill import io import logging import pandas as pd import pathlib import pickle +import zipfile from typing import Any, Mapping, Optional from rlberry.network import interface from rlberry.network.client import BerryClient @@ -41,6 +43,9 @@ def __init__( data=None, ) ) + if msg.command == interface.Command.RAISE_EXCEPTION: + raise Exception(msg.message) + self._remote_agent_manager_filename = pathlib.Path( msg.info['filename'] ) @@ -57,6 +62,11 @@ def remote_file(self): return str(self._remote_agent_manager_filename) def get_writer_data(self): + """ + * Calls get_writer_data() in the remote AgentManager and returns the result locally. + * If tensorboard data is available in the remote AgentManager, the data is zipped, + received locally and unzipped. + """ msg = self._client.send( interface.Message.create( command=interface.Command.AGENT_MANAGER_GET_WRITER_DATA, @@ -65,18 +75,32 @@ def get_writer_data(self): ) if msg.command == interface.Command.RAISE_EXCEPTION: raise Exception(msg.message) - raw_data = msg.data + raw_data = msg.data['writer_data'] writer_data = dict() for idx in raw_data: csv_content = raw_data[idx] writer_data[idx] = pd.read_csv(io.StringIO(csv_content), sep=',') + + # check if tensorboard data was received + # If so, read file and unzip it. + tensorboard_bin_data = msg.data['tensorboard_bin_data'] + if tensorboard_bin_data is not None: + tensorboard_bin_data = base64.b64decode(tensorboard_bin_data.encode('ascii')) + zip_file = open(self.output_dir / 'tensorboard_data.zip', "wb") + zip_file.write(tensorboard_bin_data) + zip_file.close() + with zipfile.ZipFile(self.output_dir / 'tensorboard_data.zip', 'r') as zip_ref: + zip_ref.extractall(self.output_dir) return writer_data - def fit(self): + def fit(self, budget=None, **kwargs): msg = self._client.send( interface.Message.create( command=interface.Command.AGENT_MANAGER_FIT, - params=dict(filename=self.remote_file), + params=dict( + filename=self.remote_file, + budget=budget, + extra_params=kwargs), data=None, ) ) diff --git a/rlberry/manager/tests/test_agent_manager.py b/rlberry/manager/tests/test_agent_manager.py index f92ec369a..22500a92c 100644 --- a/rlberry/manager/tests/test_agent_manager.py +++ b/rlberry/manager/tests/test_agent_manager.py @@ -2,7 +2,6 @@ from rlberry.envs import GridWorld from rlberry.agents import AgentWithSimplePolicy from rlberry.manager import AgentManager, plot_writer_data, evaluate_agents -from rlberry.utils.writers import DefaultWriter class DummyAgent(AgentWithSimplePolicy): @@ -18,7 +17,6 @@ def __init__(self, self.hyperparameter2 = hyperparameter2 self.total_budget = 0.0 - self.writer = DefaultWriter(self.name, metadata=self._metadata) def fit(self, budget, **kwargs): del kwargs @@ -48,7 +46,7 @@ def test_agent_manager_1(): train_env = (GridWorld, {}) # Parameters - params = {} + params = dict(hyperparameter1=-1, hyperparameter2=100) eval_kwargs = dict(eval_horizon=10) # Check DummyAgent @@ -57,9 +55,10 @@ def test_agent_manager_1(): agent.policy(None) # Run AgentManager + params_per_instance = [dict(hyperparameter2=ii) for ii in range(4)] stats_agent1 = AgentManager( DummyAgent, train_env, fit_budget=5, eval_kwargs=eval_kwargs, - init_kwargs=params, n_fit=4, seed=123) + init_kwargs=params, n_fit=4, seed=123, init_kwargs_per_instance=params_per_instance) stats_agent2 = AgentManager( DummyAgent, train_env, fit_budget=5, eval_kwargs=eval_kwargs, init_kwargs=params, n_fit=4, seed=123) @@ -67,6 +66,14 @@ def test_agent_manager_1(): for st in agent_manager_list: st.fit() + for ii, instance in enumerate(stats_agent1.agent_handlers): + assert instance.hyperparameter1 == -1 + assert instance.hyperparameter2 == ii + + for ii, instance in enumerate(stats_agent2.agent_handlers): + assert instance.hyperparameter1 == -1 + assert instance.hyperparameter2 == 100 + # learning curves plot_writer_data(agent_manager_list, tag='episode_rewards', show=False) diff --git a/rlberry/metadata_utils.py b/rlberry/metadata_utils.py new file mode 100644 index 000000000..7a4302451 --- /dev/null +++ b/rlberry/metadata_utils.py @@ -0,0 +1,42 @@ +from datetime import datetime +import uuid +import hashlib +from typing import Optional, NamedTuple + + +# Default output directory used by the library. +RLBERRY_DEFAULT_DATA_DIR = 'rlberry_data/' + +# Temporary directory used by the library +RLBERRY_TEMP_DATA_DIR = 'rlberry_data/temp/' + + +def get_unique_id(obj): + """ + Get a unique id for an obj. Use it in __init__ methods when necessary. + """ + # id() is guaranteed to be unique among simultaneously existing objects (uses memory address). + # uuid4() is an universal id, but there might be issues if called simultaneously in different processes. + # This function combines id(), uuid4(), and a timestamp in a single ID, and hashes it. + timestamp = datetime.timestamp(datetime.now()) + timestamp = str(timestamp).replace('.', '') + str_id = timestamp + str(id(obj)) + uuid.uuid4().hex + str_id = hashlib.md5(str_id.encode()).hexdigest() + return str_id + + +class ExecutionMetadata(NamedTuple): + """ + Metadata for objects handled by rlberry. + + Attributes + ---------- + obj_worker_id : int, default: -1 + If given, must be >= 0, and inform the worker id (thread or process) where the + object was created. It is not necessarity unique across all the workers launched by + rlberry, it is mainly for debug purposes. + obj_info : dict, default: None + Extra info about the object. + """ + obj_worker_id: int = -1 + obj_info: Optional[dict] = None diff --git a/rlberry/network/interface.py b/rlberry/network/interface.py index 815722887..7ee4b5cc8 100644 --- a/rlberry/network/interface.py +++ b/rlberry/network/interface.py @@ -81,6 +81,7 @@ def send_data(socket, data): """ adapted from: https://stackoverflow.com/a/63532988 """ + print(f'[rlberry.network] sending {len(data)} bytes...') socket.sendall(struct.pack('>I', len(data)) + data) @@ -97,4 +98,5 @@ def receive_data(socket): while remaining_size > 0: received_data += socket.recv(remaining_size) remaining_size = data_size - len(received_data) + print(f'[rlberry.network] ... received {len(received_data)}/{data_size} bytes.') return received_data diff --git a/rlberry/network/server_utils.py b/rlberry/network/server_utils.py index 29dcc8030..e5beed83c 100644 --- a/rlberry/network/server_utils.py +++ b/rlberry/network/server_utils.py @@ -1,5 +1,9 @@ +import pathlib from rlberry.network import interface from rlberry.manager import AgentManager +from rlberry import metadata_utils +import rlberry.utils.io +import base64 def execute_message( @@ -15,25 +19,28 @@ def execute_message( # AGENT_MANAGER_CREATE_INSTANCE elif message.command == interface.Command.AGENT_MANAGER_CREATE_INSTANCE: params = message.params + base_dir = pathlib.Path(metadata_utils.RLBERRY_DEFAULT_DATA_DIR) if 'output_dir' in params: - params['output_dir'] = 'client_data' / params['output_dir'] + params['output_dir'] = base_dir / 'server_data' / params['output_dir'] else: - params['output_dir'] = 'client_data/' + params['output_dir'] = base_dir / 'server_data/' agent_manager = AgentManager(**params) filename = str(agent_manager.save()) response = interface.Message.create( info=dict( filename=filename, agent_name=agent_manager.agent_name, - output_dir=str(agent_manager.output_dir).replace('client_data/', 'remote_data/') + output_dir=str(agent_manager.output_dir).replace('server_data/', 'client_data/') ) ) del agent_manager # AGENT_MANAGER_FIT elif message.command == interface.Command.AGENT_MANAGER_FIT: filename = message.params['filename'] + budget = message.params['budget'] + extra_params = message.params['extra_params'] agent_manager = AgentManager.load(filename) - agent_manager.fit() + agent_manager.fit(budget, **extra_params) agent_manager.save() response = interface.Message.create(command=interface.Command.ECHO) del agent_manager @@ -76,13 +83,27 @@ def execute_message( response = interface.Message.create(data=best_params_dict) # AGENT_MANAGER_GET_WRITER_DATA elif message.command == interface.Command.AGENT_MANAGER_GET_WRITER_DATA: + # writer scalar data filename = message.params['filename'] agent_manager = AgentManager.load(filename) writer_data = agent_manager.get_writer_data() writer_data = writer_data or dict() for idx in writer_data: writer_data[idx] = writer_data[idx].to_csv(index=False) + # tensoboard data + tensorboard_bin_data = None + if agent_manager.tensorboard_dir is not None: + tensorboard_zip_file = rlberry.utils.io.zipdir( + agent_manager.tensorboard_dir, + agent_manager.output_dir / 'tensorboard_data.zip') + if tensorboard_zip_file is not None: + tensorboard_bin_data = open(tensorboard_zip_file, "rb").read() + tensorboard_bin_data = base64.b64encode(tensorboard_bin_data).decode('ascii') + response = interface.Message.create( + data=dict( + writer_data=writer_data, + tensorboard_bin_data=tensorboard_bin_data) + ) del agent_manager - response = interface.Message.create(data=writer_data) # end return response diff --git a/rlberry/seeding/seeding.py b/rlberry/seeding/seeding.py index 0f82f07bc..0edce4be2 100644 --- a/rlberry/seeding/seeding.py +++ b/rlberry/seeding/seeding.py @@ -27,7 +27,7 @@ def set_external_seed(seeder): torch.manual_seed(seeder.seed_seq.generate_state(1, dtype=np.uint32)[0]) -def safe_reseed(obj, seeder): +def safe_reseed(obj, seeder, reseed_spaces=True): """ Calls obj.reseed(seed_seq) method if available; If a obj.seed() method is available, call obj.seed(seed_val), @@ -40,6 +40,9 @@ def safe_reseed(obj, seeder): Object to be reseeded. seeder: seeding.Seeder Seeder object from which to generate random seeds. + reseed_spaces: bool, default = True. + If False, do not try to reseed observation_space and action_space (if + they exist as attributes of `obj`). Returns ------- @@ -58,10 +61,11 @@ def safe_reseed(obj, seeder): reseeded = False # check if the object has observation and action spaces to be reseeded. - try: - safe_reseed(obj.observation_space, seeder) - safe_reseed(obj.action_space, seeder) - except AttributeError: - pass + if reseed_spaces: + try: + safe_reseed(obj.observation_space, seeder) + safe_reseed(obj.action_space, seeder) + except AttributeError: + pass return reseeded diff --git a/rlberry/spaces/from_gym.py b/rlberry/spaces/from_gym.py new file mode 100644 index 000000000..939a3740c --- /dev/null +++ b/rlberry/spaces/from_gym.py @@ -0,0 +1,36 @@ +import rlberry.spaces +import gym.spaces + + +def convert_space_from_gym(space): + if isinstance(space, gym.spaces.Box) and (not isinstance(space, rlberry.spaces.Box)): + return rlberry.spaces.Box( + space.low, + space.high, + shape=space.shape, + dtype=space.dtype + ) + if isinstance(space, gym.spaces.Discrete) and (not isinstance(space, rlberry.spaces.Discrete)): + return rlberry.spaces.Discrete( + n=space.n + ) + if isinstance(space, gym.spaces.MultiBinary) and (not isinstance(space, rlberry.spaces.MultiBinary)): + return rlberry.spaces.MultiBinary( + n=space.n + ) + if isinstance(space, gym.spaces.MultiDiscrete) and (not isinstance(space, rlberry.spaces.MultiDiscrete)): + return rlberry.spaces.MultiDiscrete( + nvec=space.nvec, + dtype=space.dtype, + ) + if isinstance(space, gym.spaces.Tuple) and (not isinstance(space, rlberry.spaces.Tuple)): + return rlberry.spaces.Tuple( + spaces=[convert_space_from_gym(sp) for sp in space.spaces] + ) + if isinstance(space, gym.spaces.Dict) and (not isinstance(space, rlberry.spaces.Dict)): + converted_spaces = dict() + for key in space.spaces: + converted_spaces[key] = convert_space_from_gym(space.spaces[key]) + return rlberry.spaces.Dict(spaces=converted_spaces) + + return space diff --git a/rlberry/spaces/multi_discrete.py b/rlberry/spaces/multi_discrete.py index fbe78aaba..36bd836e0 100644 --- a/rlberry/spaces/multi_discrete.py +++ b/rlberry/spaces/multi_discrete.py @@ -1,4 +1,5 @@ import gym +import numpy as np from rlberry.seeding import Seeder @@ -21,8 +22,8 @@ class MultiDiscrete(gym.spaces.MultiDiscrete): get new random number generator """ - def __init__(self, nvec): - gym.spaces.MultiDiscrete.__init__(self, nvec) + def __init__(self, nvec, dtype=np.int64): + gym.spaces.MultiDiscrete.__init__(self, nvec, dtype=dtype) self.seeder = Seeder() @property diff --git a/rlberry/spaces/tests/test_from_gym.py b/rlberry/spaces/tests/test_from_gym.py new file mode 100644 index 000000000..779c55597 --- /dev/null +++ b/rlberry/spaces/tests/test_from_gym.py @@ -0,0 +1,136 @@ +import numpy as np +import pytest +import gym.spaces +import rlberry.spaces +from rlberry.spaces.from_gym import convert_space_from_gym + + +@pytest.mark.parametrize("n", list(range(1, 10))) +def test_discrete_space(n): + gym_sp = gym.spaces.Discrete(n) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.Discrete) + sp.reseed(123) + for ii in range(n): + assert sp.contains(ii) + + for ii in range(2 * n): + assert sp.contains(sp.sample()) + + +@pytest.mark.parametrize("low, high, dim", + [ + (1.0, 10.0, 1), + (1.0, 10.0, 2), + (1.0, 10.0, 4), + (-10.0, 1.0, 1), + (-10.0, 1.0, 2), + (-10.0, 1.0, 4), + (-np.inf, 1.0, 1), + (-np.inf, 1.0, 2), + (-np.inf, 1.0, 4), + (1.0, np.inf, 1), + (1.0, np.inf, 2), + (1.0, np.inf, 4), + (-np.inf, np.inf, 1), + (-np.inf, np.inf, 2), + (-np.inf, np.inf, 4), + ]) +def test_box_space_case_1(low, high, dim): + shape = (dim, 1) + gym_sp = gym.spaces.Box(low, high, shape=shape) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.Box) + sp.reseed(123) + for _ in range(2 ** dim): + assert (sp.contains(sp.sample())) + + +@pytest.mark.parametrize( + "low, high", + [ + (np.array([0.0, 0.0, 0.0]), np.array([1.0, 1.0, 1.0])), + (np.array([-10.0, -10.0, -10.0]), np.array([10.0, 10.0, 10.0])), + (np.array([-10.0, -10.0, -10.0]), np.array([10.0, 10.0, np.inf])), + (np.array([-np.inf, -10.0, -10.0]), np.array([10.0, 10.0, np.inf])), + (np.array([-np.inf, -10.0, -10.0]), np.array([np.inf, 10.0, np.inf])) + ]) +def test_box_space_case_2(low, high): + gym_sp = gym.spaces.Box(low, high, dtype=np.float64) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.Box) + sp.reseed(123) + if (-np.inf in low) or (np.inf in high): + assert not sp.is_bounded() + else: + assert sp.is_bounded() + for ii in range(2 ** sp.shape[0]): + assert (sp.contains(sp.sample())) + + +def test_tuple(): + sp1 = gym.spaces.Box(0.0, 1.0, shape=(3, 2)) + sp2 = gym.spaces.Discrete(2) + gym_sp = gym.spaces.Tuple([sp1, sp2]) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.Tuple) + assert isinstance(sp.spaces[0], rlberry.spaces.Box) + assert isinstance(sp.spaces[1], rlberry.spaces.Discrete) + sp.reseed(123) + for _ in range(10): + assert sp.contains(sp.sample()) + + +def test_multidiscrete(): + gym_sp = gym.spaces.MultiDiscrete([5, 2, 2]) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.MultiDiscrete) + sp.reseed(123) + for _ in range(10): + assert sp.contains(sp.sample()) + + +def test_multibinary(): + for n in [1, 5, [3, 4]]: + gym_sp = gym.spaces.MultiBinary(n) + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.MultiBinary) + for _ in range(10): + assert sp.contains(sp.sample()) + sp.reseed(123) + + +def test_dict(): + nested_observation_space = gym.spaces.Dict({ + 'sensors': gym.spaces.Dict({ + 'position': gym.spaces.Box(low=-100, high=100, shape=(3,)), + 'velocity': gym.spaces.Box(low=-1, high=1, shape=(3,)), + 'front_cam': gym.spaces.Tuple(( + gym.spaces.Box(low=0, high=1, shape=(10, 10, 3)), + gym.spaces.Box(low=0, high=1, shape=(10, 10, 3)) + )), + 'rear_cam': gym.spaces.Box(low=0, high=1, shape=(10, 10, 3)), + }), + 'ext_controller': gym.spaces.MultiDiscrete((5, 2, 2)), + 'inner_state': gym.spaces.Dict({ + 'charge': gym.spaces.Discrete(100), + 'system_checks': gym.spaces.MultiBinary(10), + 'job_status': gym.spaces.Dict({ + 'task': gym.spaces.Discrete(5), + 'progress': gym.spaces.Box(low=0, high=100, shape=()), + }) + }) + }) + gym_sp = nested_observation_space + sp = convert_space_from_gym(gym_sp) + assert isinstance(sp, rlberry.spaces.Dict) + for _ in range(10): + assert sp.contains(sp.sample()) + sp.reseed(123) + + gym_sp2 = gym.spaces.Dict(sp.spaces) + sp2 = convert_space_from_gym(gym_sp2) + assert isinstance(sp2, rlberry.spaces.Dict) + for _ in range(10): + assert sp.contains(sp2.sample()) + sp2.reseed(123) diff --git a/rlberry/types.py b/rlberry/types.py index 739b1f396..42f41d1d5 100644 --- a/rlberry/types.py +++ b/rlberry/types.py @@ -6,4 +6,4 @@ Env = Union[gym.Env, Tuple[Callable[..., gym.Env], Mapping[str, Any]]] # -Seed = Union[Seeder, int] \ No newline at end of file +Seed = Union[Seeder, int] diff --git a/rlberry/utils/io.py b/rlberry/utils/io.py new file mode 100644 index 000000000..94f7092ce --- /dev/null +++ b/rlberry/utils/io.py @@ -0,0 +1,33 @@ + +import os +import zipfile +import pathlib + + +def zipdir(dir_path, ouput_fname): + """ + Zip a directory. + + Parameters + ---------- + dir_path : Path or str + Directory to be compressed. + output_fname : str + Name of output zip file. + + Returns + ------- + path to zip file, or None if dir_path does not exist. + """ + dir_path = pathlib.Path(dir_path) + if not dir_path.exists(): + return None + ouput_fname = pathlib.Path(ouput_fname).with_suffix('.zip') + zipf = zipfile.ZipFile(ouput_fname, 'w', zipfile.ZIP_DEFLATED) + for root, _, files in os.walk(dir_path): + for file in files: + zipf.write(os.path.join(root, file), + os.path.relpath(os.path.join(root, file), + os.path.join(dir_path, '..'))) + zipf.close() + return ouput_fname diff --git a/rlberry/utils/writers.py b/rlberry/utils/writers.py index eea991ab4..7e714117b 100644 --- a/rlberry/utils/writers.py +++ b/rlberry/utils/writers.py @@ -1,15 +1,21 @@ import logging import numpy as np import pandas as pd +from collections import deque from typing import Optional from timeit import default_timer as timer +from rlberry import check_packages +from rlberry import metadata_utils + +if check_packages.TENSORBOARD_INSTALLED: + from torch.utils.tensorboard import SummaryWriter logger = logging.getLogger(__name__) class DefaultWriter: """ - Default writer to be used by the agents. + Default writer to be used by the agents, optionally wraps an instance of tensorboard.SummaryWriter. Can be used in the fit() method of the agents, so that training data can be handled by AgentManager and RemoteAgentManager. @@ -19,26 +25,53 @@ class DefaultWriter: name : str Name of the writer. log_interval : int - Minimum number of seconds between consecutive logs. - metadata : dict - Extra information to be logged. + Minimum number of seconds between consecutive logs (with logging module). + tensorboard_kwargs : Optional[dict] + Parameters for tensorboard SummaryWriter. If provided, DefaultWriter + will behave as tensorboard.SummaryWriter, and will keep utilities to handle + data added with the add_scalar method. + execution_metadata : metadata_utils.ExecutionMetadata + Execution metadata about the object that is using the writer. + maxlen : Optional[int], default: None + If given, data stored by self._data (accessed through the property self.data) is limited + to `maxlen` entries. """ - def __init__(self, name: str, log_interval: int = 3, metadata: Optional[dict] = None): + def __init__( + self, name: str, + log_interval: int = 3, + tensorboard_kwargs: Optional[dict] = None, + execution_metadata: Optional[metadata_utils.ExecutionMetadata] = None, + maxlen: Optional[int] = None): self._name = name self._log_interval = log_interval - self._metadata = metadata or dict() + self._execution_metadata = execution_metadata self._data = None self._time_last_log = None self._log_time = True + self._maxlen = maxlen self.reset() + # initialize tensorboard + if (tensorboard_kwargs is not None) and (not check_packages.TENSORBOARD_INSTALLED): + logger.warning('[DefaultWriter]: received tensorboard_kwargs, but tensorboard is not installed.') + self._tensorboard_kwargs = tensorboard_kwargs + self._tensorboard_logdir = None + self._summary_writer = None + if (tensorboard_kwargs is not None) and check_packages.TENSORBOARD_INSTALLED: + self._summary_writer = SummaryWriter(**self._tensorboard_kwargs) + self._tensorboard_logdir = self._summary_writer.get_logdir() + def reset(self): - """Clear all data.""" + """Clear data.""" self._data = dict() self._initial_time = timer() self._time_last_log = timer() + @property + def summary_writer(self): + return self._summary_writer + @property def data(self): df = pd.DataFrame(columns=('name', 'tag', 'value', 'global_step')) @@ -46,9 +79,10 @@ def data(self): df = df.append(pd.DataFrame(self._data[tag]), ignore_index=True) return df - def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = None): + def add_scalar( + self, tag: str, scalar_value: float, global_step: Optional[int] = None, walltime=None, new_style=False): """ - Store scalar value. + Behaves as SummaryWriter.add_scalar(). Note: the tag 'dw_time_elapsed' is reserved and updated internally. It logs automatically the number of seconds elapsed @@ -61,14 +95,27 @@ def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = Value of the scalar. global_step : int Step where scalar was added. If None, global steps will no longer be stored for the current tag. + walltime : float + Optional override default walltime (time.time()) with seconds after epoch of event + new_style : bool + Whether to use new style (tensor field) or old + style (simple_value field). New style could lead to faster data loading. + """ + if self._summary_writer: + self._summary_writer.add_scalar(tag, scalar_value, global_step, walltime, new_style) + self._add_scalar(tag, scalar_value, global_step) + + def _add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = None): + """ + Store scalar value in self._data. """ # Update data structures if tag not in self._data: self._data[tag] = dict() - self._data[tag]['name'] = [] - self._data[tag]['tag'] = [] - self._data[tag]['value'] = [] - self._data[tag]['global_step'] = [] + self._data[tag]['name'] = deque(maxlen=self._maxlen) + self._data[tag]['tag'] = deque(maxlen=self._maxlen) + self._data[tag]['value'] = deque(maxlen=self._maxlen) + self._data[tag]['global_step'] = deque(maxlen=self._maxlen) self._data[tag]['name'].append(self._name) # used in plots, when aggregating several writers self._data[tag]['tag'].append(tag) # useful to convert all data to a single DataFrame @@ -82,7 +129,7 @@ def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = if global_step is not None and self._log_time: assert tag != 'dw_time_elapsed', 'The tag dw_time_elapsed is reserved.' self._log_time = False - self.add_scalar(tag='dw_time_elapsed', scalar_value=timer() - self._initial_time, global_step=global_step) + self._add_scalar(tag='dw_time_elapsed', scalar_value=timer() - self._initial_time, global_step=global_step) self._log_time = True # Log @@ -106,21 +153,39 @@ def _log(self): max_global_step = max(max_global_step, gstep) header = self._name - if self._metadata: - header += f' | {self._metadata}' + if self._execution_metadata: + header += f'[worker: {self._execution_metadata.obj_worker_id}]' message = f'[{header}] | max_global_step = {max_global_step} | ' + message logger.info(message) def __getattr__(self, attr): """ - Avoid raising exceptions when invalid method is called, so - that DefaultWriter does not raise exceptions when - the code expects a tensorboard writer. + Calls SummaryWriter methods, if self._summary_writer is not None. + Otherwise, does nothing. """ if attr[:2] == '__': raise AttributeError(attr) + if attr in self.__dict__: + return getattr(self, attr) + if self._summary_writer: + return getattr(self._summary_writer, attr) def method(*args, **kwargs): pass - return method + + # + # For pickle + # + def __getstate__(self): + if self._summary_writer: + self._summary_writer.close() + state = self.__dict__.copy() + return state + + def __setstate__(self, newstate): + # Re-create summary writer with the same logdir + if newstate['_summary_writer']: + newstate['_tensorboard_kwargs'].update(dict(log_dir=newstate['_tensorboard_logdir'])) + newstate['_summary_writer'] = SummaryWriter(**newstate['_tensorboard_kwargs']) + self.__dict__.update(newstate) diff --git a/setup.py b/setup.py index 5134471d7..a1ad82a9d 100644 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setup( name='rlberry', - version='0.2', + version='0.2.1', description='An easy-to-use reinforcement learning library for research and education', long_description=long_description, long_description_content_type="text/markdown",