From 7a29771ccddaf63f730e2ac4ef3f4627ad0f9a9a Mon Sep 17 00:00:00 2001 From: Amaar Quadri Date: Thu, 25 Jun 2020 19:51:47 -0400 Subject: [PATCH 1/4] started to add root paralelization implementation --- src/move_selection/MCTS.py | 96 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/move_selection/MCTS.py b/src/move_selection/MCTS.py index d38a393..88d7456 100644 --- a/src/move_selection/MCTS.py +++ b/src/move_selection/MCTS.py @@ -3,10 +3,102 @@ from time import time +class AsyncMCTSRoot: + """ + Implementation of Monte Carlo Tree Search that uses the other player's time to continue thinking. + This is achieved using multiprocessing, and a Pipe for transferring data to and from the worker process. + The parallelization is done by doing several distinct MCTS searches completely in parallel, and aggregating the + results when a move is requested. + """ + def __init__(self, GameClass, position, c=np.sqrt(2), threads=7): + self.ui_pipe, manager_pipe = Pipe() + manager_pipes, worker_pipes = zip(*[Pipe() for _ in range(threads)]) + self.manger_process = Process(target=self.manager_loop_func, args=(manager_pipe, manager_pipes)) + self.worker_processes = [Process(target=self.worker_loop_func, + args=(worker_pipe, GameClass, c)) for worker_pipe in worker_pipes] + + def start(self): + self.worker_process.start() + + def choose_move(self, user_chosen_position): + """ + Instructs the worker thread that the user has chosen the move specified by the given position. + The worker thread will then continue thinking for time_limit, and then return its chosen move. + + :param user_chosen_position: The board position resulting from the user's move. + :return: The move chosen by monte carlo tree search. + """ + self.parent_pipe.send(user_chosen_position) + return self.parent_pipe.recv() + + def terminate(self): + self.worker_process.terminate() + self.worker_process.join() + + @staticmethod + def manager_loop_func(ui_pipe, worker_pipes, GameClass, position): + root = Node(position, None, GameClass) + + while True: + best_child = root.choose_rollout_node(c) + + if best_child is not None: + best_child.rollout(threads, pool) + + if root.children is not None and worker_pipe.poll(): + user_chosen_position = worker_pipe.recv() + + for child in root.children: + if np.all(child.position == user_chosen_position): + root = child + root.parent = None + break + else: + print(user_chosen_position) + raise Exception('Invalid user chosen move!') + + if GameClass.is_over(root.position): + print('Game Over in Async MCTS: ', GameClass.get_winner(root.position)) + break + + start_time = time() + while time() - start_time < time_limit: + best_node = root.choose_rollout_node(c) + + # best_node will be None if the tree is fully expanded + if best_node is None: + break + + best_node.rollout(threads, pool) + + print(f'MCTS choosing move based on {root.count_expanded_nodes()} expanded nodes and ' + f'{root.rollout_count} rollouts!') + root = root.choose_best_node() + root.parent = None + worker_pipe.send(root.position) + if GameClass.is_over(root.position): + print('Game Over in Async MCTS: ', GameClass.get_winner(root.position)) + break + + if pool is not None: + pool.close() + pool.join() + + @staticmethod + def worker_loop_func(worker_pipe, GameClass, c): + """ + Worker thread workflow: receive MCTS root node, start doing MCTS, once a dummy message is receive, + return root result, wait for new MCTS root node. + """ + pass + + class AsyncMCTS: """ Implementation of Monte Carlo Tree Search that uses the other player's time to continue thinking. This is achieved using multiprocessing, and a Pipe for transferring data to and from the worker process. + The parallelization of MCTS is done by doing several rollouts in parallel each time a rollout is requested, + waiting for all of them to finish, and aggregating the result. """ def __init__(self, GameClass, position, time_limit=3, c=np.sqrt(2), threads=7): self.parent_pipe, worker_pipe = Pipe() @@ -77,6 +169,10 @@ def loop_func(GameClass, position, time_limit, c, threads, worker_pipe): print('Game Over in Async MCTS: ', GameClass.get_winner(root.position)) break + if pool is not None: + pool.close() + pool.join() + class MCTS: """ From fe7adf576b09ce2fdabbc061ff7fbea1dce74855 Mon Sep 17 00:00:00 2001 From: Amaar Quadri Date: Fri, 26 Jun 2020 22:07:56 -0400 Subject: [PATCH 2/4] added node merging recursively, still need to debug --- src/move_selection/MCTS.py | 150 ++++++++++++++++++++++++------------- 1 file changed, 98 insertions(+), 52 deletions(-) diff --git a/src/move_selection/MCTS.py b/src/move_selection/MCTS.py index 88d7456..78dde90 100644 --- a/src/move_selection/MCTS.py +++ b/src/move_selection/MCTS.py @@ -1,6 +1,6 @@ import numpy as np from multiprocessing import Process, Pipe, Pool -from time import time +from time import time, sleep class AsyncMCTSRoot: @@ -9,16 +9,23 @@ class AsyncMCTSRoot: This is achieved using multiprocessing, and a Pipe for transferring data to and from the worker process. The parallelization is done by doing several distinct MCTS searches completely in parallel, and aggregating the results when a move is requested. + + UI Pipe protocol: send user's move, send dummy message to request move, receive move, repeat + Worker Pipe protocol: send root node, send dummy message to request results, receive results, repeat """ - def __init__(self, GameClass, position, c=np.sqrt(2), threads=7): - self.ui_pipe, manager_pipe = Pipe() - manager_pipes, worker_pipes = zip(*[Pipe() for _ in range(threads)]) - self.manger_process = Process(target=self.manager_loop_func, args=(manager_pipe, manager_pipes)) + def __init__(self, GameClass, position, c=np.sqrt(2), threads=7, time_limit=5): + self.time_limit = time_limit + self.ui_pipe, manager_ui_pipe = Pipe() + manager_worker_pipes, worker_pipes = zip(*[Pipe() for _ in range(threads)]) + self.manager_process = Process(target=self.manager_loop_func, + args=(manager_ui_pipe, manager_worker_pipes, GameClass, position)) self.worker_processes = [Process(target=self.worker_loop_func, - args=(worker_pipe, GameClass, c)) for worker_pipe in worker_pipes] + args=(worker_pipe, c)) for worker_pipe in worker_pipes] def start(self): - self.worker_process.start() + self.manager_process.start() + for worker_process in self.worker_processes: + worker_process.start() def choose_move(self, user_chosen_position): """ @@ -26,71 +33,84 @@ def choose_move(self, user_chosen_position): The worker thread will then continue thinking for time_limit, and then return its chosen move. :param user_chosen_position: The board position resulting from the user's move. + :param time_limit: The time to wait before requesting a move. :return: The move chosen by monte carlo tree search. """ - self.parent_pipe.send(user_chosen_position) - return self.parent_pipe.recv() + self.ui_pipe.send(user_chosen_position) + sleep(self.time_limit) + self.ui_pipe.send(None) + return self.ui_pipe.recv() def terminate(self): - self.worker_process.terminate() - self.worker_process.join() + self.manager_process.terminate() + for worker_process in self.worker_processes: + worker_process.terminate() + + self.manager_process.join() + for worker_process in self.worker_processes: + worker_process.join() @staticmethod def manager_loop_func(ui_pipe, worker_pipes, GameClass, position): root = Node(position, None, GameClass) - while True: - best_child = root.choose_rollout_node(c) - - if best_child is not None: - best_child.rollout(threads, pool) - - if root.children is not None and worker_pipe.poll(): - user_chosen_position = worker_pipe.recv() + while not GameClass.is_over(root.position): + for worker_pipe in worker_pipes: + worker_pipe.send(root) - for child in root.children: - if np.all(child.position == user_chosen_position): - root = child - root.parent = None - break - else: - print(user_chosen_position) - raise Exception('Invalid user chosen move!') - - if GameClass.is_over(root.position): - print('Game Over in Async MCTS: ', GameClass.get_winner(root.position)) - break - - start_time = time() - while time() - start_time < time_limit: - best_node = root.choose_rollout_node(c) + user_move_index = ui_pipe.recv() + root.ensure_children() + root = root.children[user_move_index] + root.parent = None - # best_node will be None if the tree is fully expanded - if best_node is None: - break + if GameClass.is_over(root.position): + break + for worker_pipe in worker_pipes: + worker_pipe.send(user_move_index) - best_node.rollout(threads, pool) + ui_pipe.recv() # dummy message + for worker_pipe in worker_pipes: + worker_pipe.send(None) - print(f'MCTS choosing move based on {root.count_expanded_nodes()} expanded nodes and ' - f'{root.rollout_count} rollouts!') - root = root.choose_best_node() - root.parent = None - worker_pipe.send(root.position) - if GameClass.is_over(root.position): - print('Game Over in Async MCTS: ', GameClass.get_winner(root.position)) - break + clones = [worker_pipe.recv() for worker_pipe in worker_pipes] + root.merge(clones) + root = root.choose_best_node() + ui_pipe.send(root.position) + root.parent = None - if pool is not None: - pool.close() - pool.join() + print('Game Over in Async MCTS Root: ', GameClass.get_winner(root.position)) @staticmethod - def worker_loop_func(worker_pipe, GameClass, c): + def worker_loop_func(worker_pipe, c): """ Worker thread workflow: receive MCTS root node, start doing MCTS, once a dummy message is receive, return root result, wait for new MCTS root node. """ - pass + while True: + root = worker_pipe.recv() + + while (not worker_pipe.poll()) or root.children is None: + best_child = root.choose_rollout_node(c) + if best_child is None: + break + + best_child.rollout(rollouts=1, pool=None) + + user_move_index = worker_pipe.recv() + root = root.children[user_move_index] + root.parent = None + + # should in theory check to ensure that game is not over, + # but manager_process would have done that already, so its redundant + + while not worker_pipe.poll(): + best_child = root.choose_rollout_node(c) + if best_child is None: + break + + best_child.rollout(rollouts=1, pool=None) + worker_pipe.recv() # flush dummy message indicated that results should be returned + worker_pipe.send(root) class AsyncMCTS: @@ -301,3 +321,29 @@ def execute_single_rollout(self): state = sub_states[np.random.randint(len(sub_states))] return self.GameClass.get_winner(state) + + def merge(self, clones): + additional_rollouts = 0 + additional_rollout_sum = 0 + for clone in clones: + if clone.rollout_count > self.rollout_count: + additional_rollouts += clone.rollout_count - self.rollout_count + additional_rollout_sum += clone.rollout_sum - self.rollout_sum + self.rollout_count += additional_rollouts + self.rollout_sum += additional_rollout_sum + + if self.children is None: + for i, clone in enumerate(clones): + if clone.children is not None: + # copy the clone's children to self's children, remove all clones with no children + self.children = clone.children + clones = clones[i + 1:] + break + else: + # neither self nor any clones have children so we're done + return + + # self has children (either because it originally had them, + # or they were copied from the first clone that had them + for i, child in enumerate(self.children): + child.merge([clone.children[i] for clone in clones if clone.children is not None]) From e16cb8c5fcb03a02b748e207c29df38fc91ec874 Mon Sep 17 00:00:00 2001 From: Amaar Quadri Date: Mon, 20 Jul 2020 23:15:56 -0400 Subject: [PATCH 3/4] temp changes --- src/move_selection/MCTS.py | 71 ++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/src/move_selection/MCTS.py b/src/move_selection/MCTS.py index fb52943..f55d3fd 100644 --- a/src/move_selection/MCTS.py +++ b/src/move_selection/MCTS.py @@ -14,17 +14,22 @@ class AsyncMCTSRoot: UI Pipe protocol: send user's move, send dummy message to request move, receive move, repeat Worker Pipe protocol: send root node, send dummy message to request results, receive results, repeat """ - def __init__(self, GameClass, position, c=np.sqrt(2), threads=7, time_limit=5): - self.time_limit = time_limit - self.ui_pipe, manager_ui_pipe = Pipe() - manager_worker_pipes, worker_pipes = zip(*[Pipe() for _ in range(threads)]) - self.manager_process = Process(target=self.manager_loop_func, - args=(manager_ui_pipe, manager_worker_pipes, GameClass, position)) + def __init__(self, GameClass, position, time_limit=5, networks=None, c=np.sqrt(2), d=1, threads=1): + self.GameClass = GameClass + if networks is None: + self.root = RolloutNode(position, None, GameClass, c, rollout_batch_size=1, pool=None) + else: + self.root = HeuristicNode(position, None, GameClass, ) + parent_worker_pipes, child_worker_pipes = zip(*[Pipe() for _ in range(threads)]) + + if networks is not None and threads != 1: + if threads != 1: + raise Exception() + self.worker_processes = [Process(target=self.worker_loop_func, - args=(worker_pipe, c)) for worker_pipe in worker_pipes] + args=(worker_pipe, time_limit)) for worker_pipe in child_worker_pipes] def start(self): - self.manager_process.start() for worker_process in self.worker_processes: worker_process.start() @@ -38,16 +43,12 @@ def choose_move(self, user_chosen_position): :return: The move chosen by monte carlo tree search. """ self.ui_pipe.send(user_chosen_position) - sleep(self.time_limit) - self.ui_pipe.send(None) return self.ui_pipe.recv() def terminate(self): - self.manager_process.terminate() for worker_process in self.worker_processes: worker_process.terminate() - self.manager_process.join() for worker_process in self.worker_processes: worker_process.join() @@ -82,11 +83,14 @@ def manager_loop_func(ui_pipe, worker_pipes, GameClass, position): print('Game Over in Async MCTS Root: ', GameClass.get_winner(root.position)) @staticmethod - def worker_loop_func(worker_pipe, c): + def worker_loop_func(worker_pipe, network, time_limit): """ - Worker thread workflow: receive MCTS root node, start doing MCTS, once a dummy message is receive, + Worker thread workflow: receive MCTS root node, do MCTS for time_limit, return root result, wait for new MCTS root node. """ + if network is not None: + network.initialize() + while True: root = worker_pipe.recv() @@ -158,6 +162,7 @@ def loop_func(GameClass, position, time_limit, network, c, d, threads, worker_pi pool = Pool(threads) if threads > 1 else None root = RolloutNode(position, parent=None, GameClass=GameClass, c=c, rollout_batch_size=threads, pool=pool) else: + pool = None network.initialize() root = HeuristicNode(position, None, GameClass, network, c, d) @@ -386,6 +391,27 @@ def depth_to_end_game(self): return 1 + max(child.depth_to_end_game() for child in self.children if child.fully_expanded and child.get_evaluation() == self.get_evaluation()) + @abstractmethod + def merge(self, clones): + pass + + def merge_children_clones(self, clones): + if self.children is None: + for i, clone in enumerate(clones): + if clone.children is not None: + # copy the clone's children to self's children, and remove it from list of clones + self.children = clone.children + clones = clones[i + 1:] + break + else: + # neither self nor any clones have children so we're done + return + + # self has children (either because it originally had them, + # or they were copied from the first clone that had them + for i, child in enumerate(self.children): + child.merge([clone.children[i] for clone in clones if clone.children is not None]) + class RolloutNode(AbstractNode): def __init__(self, position, parent, GameClass, c=np.sqrt(2), rollout_batch_size=1, pool=None): @@ -441,6 +467,15 @@ def execute_single_rollout(self): return self.GameClass.get_winner(state) + def merge(self, clones): + self.rollout_count += sum([clone.rollout_count - self.rollout_count + for clone in clones if clone.rollout_count > self.rollout_count]) + self.rollout_sum += sum([clone.rollout_sum - self.rollout_sum + for clone in clones if clone.rollout_count > self.rollout_count]) + + # root nodes are cloned first, and changes propagate downwards to leafs + self.merge_children_clones(clones) + class HeuristicNode(AbstractNode): def __init__(self, position, parent, GameClass, network, c=np.sqrt(2), d=1): @@ -513,3 +548,11 @@ def ensure_children(self): for move in self.GameClass.get_possible_moves(self.position)] self.policy = self.network.policy(self.position) self.expansions = 1 + + def merge(self, clones): + # leaf nodes are cloned first, and changes propagate upwards to root + self.merge_children_clones(clones) + + self.heuristic = max([child.heuristic for child in self.children]) if self.is_maximizing else \ + min([child.heuristic for child in self.children]) + self.expansions = 1 + sum([child.expansions for child in self.children]) From a966183e248e73fc6d8d516990992a9dd3d724d2 Mon Sep 17 00:00:00 2001 From: Amaar Quadri Date: Thu, 23 Jul 2020 21:14:58 -0400 Subject: [PATCH 4/4] clean up for updated MCTS code --- src/move_selection/mcts.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/move_selection/mcts.py b/src/move_selection/mcts.py index e3fddc8..814e807 100644 --- a/src/move_selection/mcts.py +++ b/src/move_selection/mcts.py @@ -39,7 +39,6 @@ def choose_move(self, user_chosen_position): The worker thread will then continue thinking for time_limit, and then return its chosen move. :param user_chosen_position: The board position resulting from the user's move. - :param time_limit: The time to wait before requesting a move. :return: The move chosen by monte carlo tree search. """ self.ui_pipe.send(user_chosen_position) @@ -83,9 +82,9 @@ def manager_loop_func(ui_pipe, worker_pipes, GameClass, position): print('Game Over in Async MCTS Root: ', GameClass.get_winner(root.position)) @staticmethod - def worker_loop_func(worker_pipe, network, time_limit): + def worker_loop_func(worker_pipe, network): """ - Worker thread workflow: receive MCTS root node, do MCTS for time_limit, + Worker thread workflow: receive MCTS root node, do MCTS until polled, return root result, wait for new MCTS root node. """ if network is not None: @@ -95,11 +94,11 @@ def worker_loop_func(worker_pipe, network, time_limit): root = worker_pipe.recv() while (not worker_pipe.poll()) or root.children is None: - best_child = root.choose_rollout_node(c) + best_child = root.choose_expansion_node() if best_child is None: break - best_child.rollout(rollouts=1, pool=None) + best_child.expand() user_move_index = worker_pipe.recv() root = root.children[user_move_index] @@ -109,11 +108,11 @@ def worker_loop_func(worker_pipe, network, time_limit): # but manager_process would have done that already, so its redundant while not worker_pipe.poll(): - best_child = root.choose_rollout_node(c) + best_child = root.choose_expansion_node() if best_child is None: break - best_child.rollout(rollouts=1, pool=None) + best_child.expand() worker_pipe.recv() # flush dummy message indicated that results should be returned worker_pipe.send(root) @@ -427,7 +426,7 @@ def merge_children_clones(self, clones): return # self has children (either because it originally had them, - # or they were copied from the first clone that had them + # or they were copied from the first clone that had them) for i, child in enumerate(self.children): child.merge([clone.children[i] for clone in clones if clone.children is not None])