Skip to content

Commit

Permalink
several fixes and last minute adds
Browse files Browse the repository at this point in the history
  • Loading branch information
astooke committed Aug 22, 2019
1 parent 6821c53 commit b308097
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 77 deletions.
12 changes: 5 additions & 7 deletions examples/atari_dqn_async_cpu.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@

"""
DQN in async mode with CPU parallel sampler.
"""


from rlpyt.utils.launching.affinity import make_affinity
# from rlpyt.samplers.gpu.parallel_sampler import GpuParallelSampler
# from rlpyt.samplers.async_.async_serial_sampler import AsyncSerialSampler
from rlpyt.samplers.async_.cpu_sampler import AsyncCpuSampler
# from rlpyt.samplers.cpu.collectors import ResetCollector
from rlpyt.envs.atari.atari_env import AtariEnv
from rlpyt.algos.dqn.dqn import DQN
from rlpyt.agents.dqn.atari.atari_dqn_agent import AtariDqnAgent
# from rlpyt.agents.pg.atari import AtariFfAgent
# from rlpyt.runners.multigpu_sync import MultiGpuRl
from rlpyt.runners.async_rl import AsyncRlEval
from rlpyt.utils.logging.context import logger_context


def build_and_train(game="pong", run_ID=0):
# Seems like we should be able to skip the intermediate step of the code,
# but so far have just always run that way.
# Change these inputs to match local machine and desired parallelism.
affinity = make_affinity(
run_slot=0,
Expand Down
13 changes: 5 additions & 8 deletions examples/atari_dqn_async_gpu.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@

"""
DQN in asynchronous mode with GPU sampler.
(Or could use alternating GPU sampler).
"""

from rlpyt.utils.launching.affinity import make_affinity
# from rlpyt.samplers.gpu.parallel_sampler import GpuParallelSampler
# from rlpyt.samplers.async_.async_serial_sampler import AsyncSerialSampler
from rlpyt.samplers.async_.gpu_sampler import AsyncGpuSampler
# from rlpyt.samplers.cpu.collectors import ResetCollector
# from rlpyt.samplers.async_.collectors import DbGpuResetCollector
from rlpyt.envs.atari.atari_env import AtariEnv
from rlpyt.algos.dqn.dqn import DQN
from rlpyt.agents.dqn.atari.atari_dqn_agent import AtariDqnAgent
# from rlpyt.agents.pg.atari import AtariFfAgent
# from rlpyt.runners.multigpu_sync import MultiGpuRl
from rlpyt.runners.async_rl import AsyncRlEval
from rlpyt.utils.logging.context import logger_context


def build_and_train(game="pong", run_ID=0):
# Seems like we should be able to skip the intermediate step of the code,
# but so far have just always run that way.
# Change these inputs to match local machine and desired parallelism.
affinity = make_affinity(
run_slot=0,
Expand Down
17 changes: 9 additions & 8 deletions examples/atari_dqn_async_serial.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@

"""
Runs DQN in asynchronous mode, with separate proceses for sampling and
optimization. Serial sampling here. Inputs and outputs from the affinity
constructors will be different for this mode.
"""


from rlpyt.utils.launching.affinity import make_affinity
# from rlpyt.samplers.gpu.parallel_sampler import GpuParallelSampler
from rlpyt.samplers.async_.async_serial_sampler import AsyncSerialSampler
# from rlpyt.samplers.cpu.collectors import ResetCollector
from rlpyt.samplers.async_.collectors import DbCpuResetCollector
from rlpyt.envs.atari.atari_env import AtariEnv
from rlpyt.algos.dqn.dqn import DQN
from rlpyt.agents.dqn.atari.atari_dqn_agent import AtariDqnAgent
# from rlpyt.agents.pg.atari import AtariFfAgent
# from rlpyt.runners.multigpu_sync import MultiGpuRl
from rlpyt.runners.async_rl import AsyncRlEval
from rlpyt.utils.logging.context import logger_context


def build_and_train(game="pong", run_ID=0):
# Seems like we should be able to skip the intermediate step of the code,
# but so far have just always run that way.
# Change these inputs to match local machine and desired parallelism.
affinity = make_affinity(
run_slot=0,
n_cpu_core=2, # Use 16 cores across all experiments.
n_gpu=1, # Use 8 gpus across all experiments.
sample_gpu_per_run=0,
async_sample=True,
async_sample=True, # Different affinity structure fo async.
# hyperthread_offset=24, # If machine has 24 cores.
# n_socket=2, # Presume CPU socket affinity to lower/upper half GPUs.
# gpu_per_run=2, # How many GPUs to parallelize one run across.
# gpu_per_run=2, # How many optimizer GPUs to parallelize one run.
# cpu_per_run=1,
)

Expand Down
9 changes: 8 additions & 1 deletion examples/example_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
Can use a GPU for the agent (applies to both sample and train). No parallelism
employed, so everything happens in one python process; can be easier to debug.
The kwarg snapshot_mode="last" to logger context will save the latest model at
every log point (see inside the logger for other options).
In viskit, whatever (nested) key-value pairs appear in config will become plottable
keys for showing several experiments. If you need to add more after an experiment,
use rlpyt.utils.logging.context.add_exp_param().
"""

from rlpyt.samplers.serial.sampler import SerialSampler
Expand Down Expand Up @@ -39,7 +46,7 @@ def build_and_train(game="pong", run_ID=0, cuda_idx=None):
config = dict(game=game)
name = "dqn_" + game
log_dir = "example_1"
with logger_context(log_dir, run_ID, name, config):
with logger_context(log_dir, run_ID, name, config, snapshot_mode="last"):
runner.train()


Expand Down
10 changes: 10 additions & 0 deletions examples/example_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
algorithm. Can choose between configurations for use of CPU/GPU for sampling
(serial or parallel) and optimization (serial).
Alternating sampler is another option. For recurrent agents, a different mixin
is required for alternating sampling (see rlpyt.agents.base.py), feedforward agents
remain unaffected.
"""
from rlpyt.samplers.serial.sampler import SerialSampler
from rlpyt.samplers.parallel.cpu.sampler import CpuSampler
from rlpyt.samplers.parallel.gpu.sampler import GpuSampler
from rlpyt.samplers.parallel.gpu.alternating_sampler import AlternatingSampler
from rlpyt.envs.atari.atari_env import AtariEnv
from rlpyt.algos.pg.a2c import A2C
from rlpyt.agents.pg.atari import AtariFfAgent
Expand All @@ -27,6 +32,11 @@ def build_and_train(game="pong", run_ID=0, cuda_idx=None, sample_mode="serial",
elif sample_mode == "gpu":
Sampler = GpuParallelSampler
print(f"Using GPU parallel sampler (agent in master), {gpu_cpu} for sampling and optimizing.")
elif sample_mode == "alternating":
Sampler = AlternatingSampler
affinity["workers_cpus"] += affinity["workers_cpus"] # (Double list)
affinity["alternating"] = True # Sampler will check for this.
print(f"Using Alternating GPU parallel sampler, {gpu_cpu} for sampling and optimizing.")

sampler = Sampler(
EnvCls=AtariEnv,
Expand Down
7 changes: 5 additions & 2 deletions examples/example_6.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
experiments than fit on the machine, and they will run in order over time.
To understand rules and settings for affinities, try using
affinity = affinity.make_affinity(..)
OR
code = affinity.encode_affinity(..)
slot_code = affinity.prepend_run_slot(code, slot)
affinity = affinity.affinity_from_code(slot_code)
Expand All @@ -25,9 +27,10 @@
affinity_code = encode_affinity(
n_cpu_core=2,
n_gpu=0,
# hyperthread_offset=8,
# n_socket=1,
# hyperthread_offset=8, # if auto-detect doesn't work, number of CPU cores
# n_socket=1, # if auto-detect doesn't work, can force (or force to 1)
cpu_per_run=1,
set_affinity=True, # it can help to restrict workers to individual CPUs
)
# Or try an automatic one, but results may vary:
# affinity_code = quick_affinity_code(n_parallel=None, use_gpu=True)
Expand Down
19 changes: 5 additions & 14 deletions examples/example_6a.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@

"""
Runs multiple instances of the Atari environment and optimizes using A2C
algorithm and a recurrent agent. Uses GPU parallel sampler, with option for
algorithm and a feed-forward agent. Uses GPU parallel sampler, with option for
whether to reset environments in middle of sampling batch.
Standard recurrent agents cannot train with a reset in the middle of a
sequence, so all data after the environment 'done' signal will be ignored (see
variable 'valid' in algo). So it may be preferable to pause those environments
and wait to reset them for the beginning of the next iteration.
If the environment takes a long time to reset relative to step, this may also
give a slight speed boost, as resets will happen in the workers while the master
is optimizing. Feedforward agents are compatible with this arrangement by same
use of 'valid' mask.
"""
import sys

from rlpyt.utils.launching.affinity import affinity_from_code
from rlpyt.samplers.gpu.parallel_sampler import GpuParallelSampler
from rlpyt.samplers.gpu.collectors import WaitResetCollector
from rlpyt.samplers.parallel.gpu.sampler import GpuSampler
from rlpyt.samplers.parallel.gpu.collectors import GpuWaitResetCollector
from rlpyt.envs.atari.atari_env import AtariEnv
from rlpyt.algos.pg.a2c import A2C
from rlpyt.agents.pg.atari import AtariFfAgent
Expand All @@ -41,10 +32,10 @@ def build_and_train(slot_affinity_code, log_dir, run_ID):
global config
config = update_config(config, variant)

sampler = GpuParallelSampler(
sampler = GpuSampler(
EnvCls=AtariEnv,
env_kwargs=config["env"],
CollectorCls=WaitResetCollector,
CollectorCls=GpuWaitResetCollector,
batch_T=5,
# batch_B=16, # Get from config.
max_decorrelation_steps=400,
Expand Down
18 changes: 9 additions & 9 deletions examples/example_7.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@

"""
Runs one experiment using multiple GPUs. In the MultiGPU_Sync configuration,
the entire template of sampler and optimizer on a GPU and sampler CPUs is
replicated across the machine, with a separate python process for each GPU. So
each parallel runner gathers its own samples for optimization. The models are
parallelized using Torch's DistributedDataParallel, which all-reduces every
gradient computed, pipelined with backpropagation; all GPUs maintain identical
copies of the model throughout. The same technique can be applied to any
algorithm, PG, QPG, DQN for multi-GPU training synchronous with sampling.
Runs one experiment using multiple GPUs. In the SyncRl runner, the entire
template of optimizer and sampler GPU and CPUs is replicated across the
machine, with a separate python process for each GPU. So each parallel runner
gathers its own samples for optimization. The models are parallelized using
Torch's DistributedDataParallel, which all-reduces every gradient computed,
pipelined with backpropagation; all GPUs maintain identical copies of the
model throughout. The same technique can be applied to any algorithm, PG,
QPG, DQN for multi-GPU training synchronous with sampling.
Currently, the batch size specified to the sampler/algo is used on each process,
so batch sizes grow with the number of parallel runners (might change this).
so total batch size grows with the number of parallel runners.
Try different affinity inputs to see where the jobs run on the machine.
Expand Down
2 changes: 1 addition & 1 deletion linux_cpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- pytorch
dependencies:
- python=3.7
- pytorch-cpu=1.1
- pytorch-cpu=1.2
- numpy
- psutil
- opencv # atari.
Expand Down
2 changes: 1 addition & 1 deletion linux_cuda10.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- pytorch
dependencies:
- python=3.7
- pytorch=1.1
- pytorch=1.2
- cudatoolkit=10.
- numpy
- psutil
Expand Down
2 changes: 1 addition & 1 deletion linux_cuda9.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- pytorch
dependencies:
- python=3.7
- pytorch=1.1
- pytorch=1.2
- cudatoolkit=9.
- numpy
- psutil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
n_socket=2,
# cpu_per_run=2,
)
runs_per_setting = 3
runs_per_setting = 5
variant_levels = list()

env_ids = ["Hopper-v3", "HalfCheetah-v3",
Expand Down
8 changes: 5 additions & 3 deletions rlpyt/runners/async_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def startup(self):
def optim_startup(self):
main_affinity = self.affinity.optimizer[0]
p = psutil.Process()
p.cpu_affinity(main_affinity["cpus"])
if main_affinity.get("set_affinity", True):
p.cpu_affinity(main_affinity["cpus"])
logger.log(f"Optimizer master CPU affinity: {p.cpu_affinity()}.")
torch.set_num_threads(main_affinity["torch_threads"])
logger.log(f"Optimizer master Torch threads: {torch.get_num_threads()}.")
Expand Down Expand Up @@ -293,7 +294,7 @@ def log_diagnostics(self, itr, sampler_itr, throttle_time):
logger.record_tabular('CumUpdates', self.algo.update_counter)
logger.record_tabular('ReplayRatio', replay_ratio)
logger.record_tabular('CumReplayRatio', cum_replay_ratio)
logger.record_tabular('SamplesPerSecond', samples_per_second)
logger.record_tabular('StepsPerSecond', samples_per_second)
if self._eval:
logger.record_tabular('NonEvalSamplesPerSecond', non_eval_samples_per_second)
logger.record_tabular('UpdatesPerSecond', updates_per_second)
Expand Down Expand Up @@ -415,7 +416,8 @@ def startup(self):
init_method=f"tcp://127.0.0.1:{self.port}",
)
p = psutil.Process()
p.cpu_affinity(self.affinity["cpus"])
if self.affinity.get("set_affinity", True):
p.cpu_affinity(self.affinity["cpus"])
logger.log(f"Optimizer rank {self.rank} CPU affinity: {p.cpu_affinity()}.")
torch.set_num_threads(self.affinity["torch_threads"])
logger.log(f"Optimizer rank {self.rank} Torch threads: {torch.get_num_threads()}.")
Expand Down
5 changes: 3 additions & 2 deletions rlpyt/runners/minibatch_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(
def startup(self):
p = psutil.Process()
try:
if self.affinity.get("master_cpus", None) is not None:
if (self.affinity.get("master_cpus", None) is not None and
self.affinity.get("set_affinity", True)):
p.cpu_affinity(self.affinity["master_cpus"])
cpu_affin = p.cpu_affinity()
except AttributeError:
Expand Down Expand Up @@ -149,7 +150,7 @@ def log_diagnostics(self, itr, traj_infos=None, eval_time=0):
logger.record_tabular('CumSteps', cum_steps)
logger.record_tabular('CumCompletedTrajs', self._cum_completed_trajs)
logger.record_tabular('CumUpdates', self.algo.update_counter)
logger.record_tabular('SamplesPerSecond', samples_per_second)
logger.record_tabular('StepsPerSecond', samples_per_second)
logger.record_tabular('UpdatesPerSecond', updates_per_second)
logger.record_tabular('ReplayRatio', replay_ratio)
logger.record_tabular('CumReplayRatio', cum_replay_ratio)
Expand Down
3 changes: 2 additions & 1 deletion rlpyt/samplers/async_/cpu_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def __init__(self, *args, CollectorCls=DbCpuResetCollector,

def initialize(self, affinity):
p = psutil.Process()
p.cpu_affinity(affinity["master_cpus"])
if affinity.get("set_affinity", True):
p.cpu_affinity(affinity["master_cpus"])
torch.set_num_threads(1) # Needed to prevent MKL hang :( .
self.agent.async_cpu(share_memory=True)
super().initialize(
Expand Down
3 changes: 2 additions & 1 deletion rlpyt/samplers/async_/gpu_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def action_server_process(self, rank, env_ranks, double_buffer_slice,
pass args to env worker processes, forked from here."""
self.rank = rank
p = psutil.Process()
p.cpu_affinity(affinity["master_cpus"])
if affinity.get("set_affinity", True):
p.cpu_affinity(affinity["master_cpus"])
# torch.set_num_threads(affinity["master_torch_threads"])
torch.set_num_threads(1) # Possibly needed to avoid MKL hang.
self.launch_workers(double_buffer_slice, affinity, seed, n_envs_list)
Expand Down
3 changes: 2 additions & 1 deletion rlpyt/samplers/async_/serial_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def __init__(self, *args, CollectorCls=DbCpuResetCollector,

def initialize(self, affinity):
p = psutil.Process()
p.cpu_affinity(affinity["master_cpus"])
if affinity.get("set_affinity", True):
p.cpu_affinity(affinity["master_cpus"])
# torch.set_num_threads(affinity["master_torch_threads"])
torch.set_num_threads(1) # Needed to prevent MKL hang :( .
B = self.batch_spec.B
Expand Down
5 changes: 3 additions & 2 deletions rlpyt/samplers/parallel/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _assemble_common_kwargs(self, affinity, global_B=1):
traj_infos_queue=self.traj_infos_queue,
ctrl=self.ctrl,
max_decorrelation_steps=self.max_decorrelation_steps,
torch_threads=affinity.get("worker_torch_threads", None),
torch_threads=affinity.get("worker_torch_threads", 1),
global_B=global_B,
)
if self.eval_n_envs > 0:
Expand All @@ -194,7 +194,8 @@ def _assemble_workers_kwargs(self, affinity, seed, n_envs_list):
rank=rank,
env_ranks=env_ranks,
seed=seed + rank,
cpus=affinity["workers_cpus"][rank],
cpus=(affinity["workers_cpus"][rank]
if affinity.get("set_affinity", True) else None),
n_envs=n_envs,
samples_np=self.samples_np[:, slice_B],
sync=self.sync, # Only for eval, on CPU.
Expand Down
Loading

0 comments on commit b308097

Please sign in to comment.