From 9a824b0a6494e94cd9b376bbc6967f25e5cf8b44 Mon Sep 17 00:00:00 2001 From: Morgan Rockett Date: Fri, 30 Aug 2024 02:27:11 -0400 Subject: [PATCH] feat: adding flexible num agents and shards to parsec run local #276 This commit made with the assistance of github copilot Signed-off-by: Morgan Rockett --- scripts/parsec-run-local.py | 646 ++++++++++++++++++++++++++++++++++++ 1 file changed, 646 insertions(+) create mode 100755 scripts/parsec-run-local.py diff --git a/scripts/parsec-run-local.py b/scripts/parsec-run-local.py new file mode 100755 index 000000000..8a09d4045 --- /dev/null +++ b/scripts/parsec-run-local.py @@ -0,0 +1,646 @@ +# #!/usr/bin/env python3 + +import os +import sys +import argparse +import logging +import time +import subprocess +import shlex +import math +import heapq +# FIXME: either use or remove unused imports +import multiprocessing +import threading +import asyncio +import concurrent.futures + + +# pylint: disable=C0103 +# using caps to mimic the bash script +IP = "localhost" +PORT = 8888 +LOG_LEVEL = "WARN" +RUNNER_TYPE = "evm" # lua, evm, pyrunner (future) + +NUM_AGENTS, NUM_SHARDS, NUM_TICKET_MACHINES, REPL_FACTOR = 1, 1, 1, 1 +MAX_SHARDS, MAX_AGENTS, MAX_TICKET_MACHINES = 5000, 5000, 5000 + +# mac doesn't connect to ports above 10k it seems, memory error +# look into this more, possibly a firewall issue +SHARD_EP_PORT = 50000 # to 54999 +SHARD_RAFT_EP_PORT = 55000 # to 59999 +AGENT_EP_PORT = 60000 # to 64999 +TMC_PORT_ID = 49990 +AGENT_PORT_ID = 50000 +SHARD_EP_PORT_ID = 55000 +SHARD_RAFT_EP_PORT_ID = 60000 +# pylint: enable=C0103 + + +pids_other_min_heap = heapq.heapify([]) +pids_shards_min_heap = heapq.heapify([]) +pids_agents_min_heap = heapq.heapify([]) + + +logger = logging.getLogger(__name__) + + +def parse_args() -> argparse.Namespace: + """ + Overwrite the default values if a user chooses + """ + parser = argparse.ArgumentParser(description="Run a local Parsec agent") + # pylint: disable=C0301 + parser.add_argument("--ip", type=str, default="localhost", dest="IP", + help="The IP address to use. Default is localhost.") + parser.add_argument("--port", type=int, default=8888, dest="PORT", + help="The port number to use. Default is 8888.") + parser.add_argument("--log_level", type=str, default="WARN", dest="LOG_LEVEL", + help="The log level to use. Default is WARN. \ + Choose from DEBUG, INFO, WARN, ERROR, CRITICAL.") + parser.add_argument("--runner_type", type=str, default="evm", dest="RUNNER_TYPE", + help="The runner type to use in the agent. Defaults to EVM.") + parser.add_argument("--num_agents", type=int, default=1, dest="NUM_AGENTS", + help="The number of agents to run. Defaults to 1.") + parser.add_argument("--num_shards", type=int, default=1, dest="NUM_SHARDS", + help="The number of shards to run. Defaults to 1.") + parser.add_argument("--num_ticket_machines", type=int, default=1, dest="NUM_TICKET_MACHINES", + help="The number of ticket machines to run. Defaults to 1.") + parser.add_argument("--replication_factor", type=int, default=1, dest="REPLICATION_FACTOR", + help="The replication factor to use. Defaults to 1.") + # pylint: enable=C0301 + return parser.parse_args() + + +def initiate_logging(): + """ + Set up and log the initial configuration message. + + This function logs various configuration details including: + - IP address and port + - Log level + - Number of agents and shards + """ + logging.basicConfig(level=logging.WARN) + if LOG_LEVEL in ["DEBUG", "INFO", "WARN", "ERROR", "CRITICAL"]: + logging.basicConfig(level=getattr(logging, LOG_LEVEL)) + # log file + handler = logging.FileHandler("logs/parsec-run-local.log") + # log format + formatter = logging.Formatter(\ + "%(asctime)s - %(name)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + +def setup_msg(): + """ + Set up and log the initial configuration message. + + This function logs various configuration details including: + - IP address and port + - Log level + - Number of agents and shards + - Runner type + - Port numbers for shard endpoints, raft endpoints, and agent endpoints + + The function doesn't take any parameters and doesn't return anything. + It uses the global logger object to log the information. + """ + logger.info("Running agent on %s:%s", IP, PORT) + logger.info("log level = %s", LOG_LEVEL) + logger.info("Running %s agents on %s shards", NUM_AGENTS, NUM_SHARDS) + logger.info("Runner type = %s", RUNNER_TYPE) + logger.info("Shard endpoint port begin = %s", SHARD_EP_PORT) + logger.info("Shard raft endpoint port begin = %s", SHARD_RAFT_EP_PORT) + logger.info("Agent endpoint port begin = %s", AGENT_EP_PORT) + logger.info("Number of logical shards = %s", NUM_SHARDS) + logger.info("Replication factor = %s", REPL_FACTOR) + logger.info("Number of physical shards = %s", NUM_SHARDS * REPL_FACTOR) + logger.info("Number of agents = %s", NUM_AGENTS) + + +def check_port(port_id, max_attempts=100): + """ + Find an open port starting from port_id. + Try to find an open port in the range of max_num_test_ports. + Return the open port if found, otherwise sys exit on -1. + """ + for i in range(max_attempts): + if subprocess.run(["nc", "-z", "localhost", str(port_id + i)], \ + check=False).returncode != 0: + open_port = port_id + i + logger.info("Found open port %d", open_port) + return open_port + logger.error("Could not find an open port in range %d to %d", + port_id, port_id + max_attempts) + sys.exit(1) + + +def setup() -> None: + global PORT + initiate_logging() + os.makedirs("logs", exist_ok=True) + setup_msg() + open_port = check_port(PORT) + PORT = open_port + + +def calc_num_phys_shards(): + """ + Calculate the total number of shards based on the number of shards + aka logical shards, multiplied by the replication factor + """ + num_physical_shards = NUM_SHARDS * REPL_FACTOR + if num_physical_shards > MAX_SHARDS: + logger.error("Total number of shards exceeds 5000") + sys.exit(1) + + return num_physical_shards + + +def validate_args() -> bool: + """ + Validate the arguments provided by the user + # FIXME: add type checking and check bounds for new variables + """ + if NUM_AGENTS < 1 or NUM_AGENTS > MAX_AGENTS: + logger.error("Number of agents must be between 1 and %d", MAX_AGENTS) + sys.exit(1) + if NUM_SHARDS < 1 or NUM_SHARDS > MAX_SHARDS: + logger.error("Number of shards must be between 1 and %d", MAX_SHARDS) + sys.exit(1) + if REPL_FACTOR < 1: + logger.error("Replication factor must be at least 1") + sys.exit(1) + + NUM_PHYS_SHARDS = calc_num_phys_shards() + if NUM_PHYS_SHARDS > MAX_SHARDS: + logger.error("Number of agents must be less than or equal to " + "the number of shards") + sys.exit(1) + + return True + + +def check_cmd_return_pid(result_proc: subprocess.CompletedProcess, machine_name: str) -> int: + """ + Check the return code of the process and return the pid if successful + """ + # validate input type + if not isinstance(result_proc, subprocess.CompletedProcess): + logger.error("Invalid input type for result_proc") + return None + + if result_proc.returncode == 0: + pid = int(result_proc.stdout.strip()) + return pid + else: + logger.error("Failed to launch %s process: %s", machine_name, result_proc.stderr) + + return None + + +def launch_one_runtime_locking_shardd(): + """ + designed for 1 shard, 1 shard cluster, 1 agent + """ + cmd = [ + "./build/src/parsec/runtime_locking_shard/runtime_locking_shardd", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:5556", + f"--shard00_raft_endpoint={IP}:5557", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:6666", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:7777", + f"--log_level={LOG_LEVEL}" + ] + result = subprocess.run(cmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + check=True, text=True, timeout=5) + pid = int(result.stdout.strip()) + logger.info("Shardd pid = %s", pid) + heapq.heappush(pids_shards_min_heap, pid) + # FIXME: create log file for each category of machine (shardd, ticket_machined, agentd)? + + +def launch_one_ticket_machined(): + """ + designed for 1 shard, 1 shard cluster, 1 agent + """ + cmd = [ + "./scripts/wait-for-it.sh", "-s", f"{IP}:5556", "-t", "60", "--", + "./build/src/parsec/ticket_machine/ticket_machined", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:5556", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:6666", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:7777", + f"--log_level={LOG_LEVEL}" + ] + result = subprocess.run(cmd, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + pid = int(result.stdout.strip()) + logger.info("Ticket machined pid = %s", pid) + heapq.heappush(pids_other_min_heap, pid) + # with open("logs/ticket_machined.log", "w", encoding="utf-8") as log_file: + # result = subprocess.run(cmd, stdout=log_file, check=True) + # FIXME: create log file for each category of machine (shardd, ticket_machined, agentd)? + + +def launch_one_agentd() -> int: + """ + designed for 1 shard, 1 shard cluster, 1 agent + """ + agent_port = check_port(AGENT_EP_PORT + 1, max_attempts=5) + + cmd = [ + "./scripts/wait-for-it.sh", "-s", f"{IP}:{TMC_PORT_ID}", "-t", "60", "--", + "./scripts/wait-for-it.sh", "-s", f"{IP}:{agent_port}", "-t", "60", "--", + "./build/src/parsec/agent/agentd", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:5556", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:{PORT}", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:{TMC_PORT_ID}", + f"--log_level={LOG_LEVEL}", + f"--runner_type={RUNNER_TYPE}" + ] + result = subprocess.run(cmd, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + pid = int(result.stdout.strip()) + logger.info("Agentd pid = %s", pid) + heapq.heappush(pids_agents_min_heap, pid) + # FIXME: create log file for each category of machine (shardd, ticket_machined, agentd)? + + +def launch_one_parsec(): + """ + designed for 1 shard, 1 shard cluster, 1 agent + """ + + agent_port_id = check_port(AGENT_EP_PORT, max_attempts=10) + shard_ep_port_id = check_port(SHARD_EP_PORT, max_attempts=10) + shard_raft_ep_port_id = check_port(SHARD_RAFT_EP_PORT, max_attempts=10) + tmc_port_id = check_port(TMC_PORT_ID, max_attempts=10) + + # generate the commands to launch the processes for each shard, agent, ticket machine + + cmd_shards = [ + "./build/src/parsec/runtime_locking_shard/runtime_locking_shardd", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:{shard_ep_port_id}", + f"--shard00_raft_endpoint={IP}:{shard_raft_ep_port_id}", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:{agent_port_id}", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:{tmc_port_id}", + f"--log_level={LOG_LEVEL}" + ] + + cmd_tmc = [ + "./scripts/wait-for-it.sh", "-s", f"{IP}:{SHARD_EP_PORT_ID}", "-t", "60", "--", + "./build/src/parsec/ticket_machine/ticket_machined", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:5556", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:{agent_port_id}", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:{tmc_port_id}", + f"--log_level={LOG_LEVEL}" + ] + + cmd_agent = [ + "./scripts/wait-for-it.sh", "-s", f"{IP}:{TMC_PORT_ID}", "-t", "60", "--", + "./scripts/wait-for-it.sh", "-s", f"{IP}:{agent_port_id}", "-t", "60", "--", + "./build/src/parsec/agent/agentd", + "--shard_count=1", + "--shard0_count=1", + f"--shard00_endpoint={IP}:{shard_ep_port_id}", + "--node_id=0", + "--component_id=0", + "--agent_count=1", + f"--agent0_endpoint={IP}:{agent_port_id}", + "--ticket_machine_count=1", + f"--ticket_machine0_endpoint={IP}:{tmc_port_id}", + f"--log_level={LOG_LEVEL}", + f"--runner_type={RUNNER_TYPE}" + ] + + # execute the commands + logger.info("Attempting to launch shardd") + result_shards = subprocess.run(cmd_shards, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + time.sleep(1) + logger.info("Attempting to launch ticket machine") + result_tmc = subprocess.run(cmd_tmc, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + time.sleep(1) + logger.info("Attempting to launch agent") + result_agent = subprocess.run(cmd_agent, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + + + pid_shards = check_cmd_return_pid(result_shards, "shardd") + pid_tmc = check_cmd_return_pid(result_tmc, "ticket_machined") + pid_agent = check_cmd_return_pid(result_agent, "agentd") + + if any([result_shards.returncode, result_tmc.returncode, result_agent.returncode]) is None: + logger.error("Failed to launch all processes") + logger.error("Undoing this batch of launches") + for pid in [pid_shards, pid_tmc, pid_agent]: + if pid is not None: + kill_pid(pid) + else: + heapq.heappush(pids_shards_min_heap, pid_shards) + heapq.heappush(pids_other_min_heap, pid_tmc) + heapq.heappush(pids_agents_min_heap, pid_agent) + + logger.info("pid = %s created; shardd", pid_shards) + logger.info("pid = %s created; ticket_machined", pid_tmc) + logger.info("pid = %s created; agentd", pid_agent) + + # FIXME: create log file for each category of machine (shardd, ticket_machined, agentd)? + # think about user argument and do we want to do teardown automatically or let user choose + # we can also run teardown to supercede the user choice and kill all processes + # matching runtime_locking_shardd, ticket_machined, agentd + # at the start of the program - but proceed with caution + # or even specify how long we want parsec processes active before killing this script + all processes + + +def show_min_heap(min_heap: list[int]) -> None: + """ + Show the min heap in binary search tree format + """ + def print_tree(heap, index=0, prefix="", is_left=True): + if index < len(heap): + print(prefix + ("└── " if is_left else "┌── ") + str(heap[index])) + new_prefix = prefix + (" " if is_left else "│ ") + print_tree(heap, 2 * index + 2, new_prefix, False) + print_tree(heap, 2 * index + 1, new_prefix, True) + + print("Min Heap as Binary Search Tree:") + print_tree(min_heap) + + + +def launch_full_parsec(): + """ + FIXME: explain what this function does + # poor runtime complexity - can split into asyncio or threading to speed up + # do several batches of launches and specify starting port IDs / ranges + # if our user commands demand a decent amount of machines like 100 + + """ + + num_active_agents = 0 + prev_agent_port = AGENT_EP_PORT + + num_active_tmcs = 0 + prev_tmc_port = TMC_PORT_ID + + num_active_shards = 0 + prev_shard_port = SHARD_EP_PORT + prev_shard_raft_port = SHARD_RAFT_EP_PORT + + for agent_num in range(NUM_AGENTS): + agent_port = check_port(prev_agent_port + 1, max_attempts=5) + + # FUXNE: unused variables ticket_machine_num, repl_tmc_num + for ticket_machine_num in range(NUM_TICKET_MACHINES): + # FIXME: we don't have support for ticket machine replication yet in the bash code + for repl_tmc_num in range(REPL_FACTOR): + curr_tmc_num = num_active_tmcs + tmc_port = check_port(prev_tmc_port + 1, max_attempts=5) + + ### + # shard_count - the number of shard clusters + # shardN_count - the number of shard replicas in the Nth cluster + # node_id - which node the cluster is this shard + # component_id - which cluster is this shard in + ### + + for shard_lgc_num in range(NUM_SHARDS): # aka cluster ID + for shard_repl_num in range(REPL_FACTOR): # replica count, shard ID within cluster + + cluster_id = shard_lgc_num + shard_phys_num = shard_repl_num + shard_repl_factor = REPL_FACTOR + shard_ep_port = check_port(prev_shard_port + 1, max_attempts=5) + shard_cluster_raft_ep_port = check_port(prev_shard_raft_port + 1, max_attempts=5) + + # FIXME: validate user input before subprocess runs as that can be a security risk + # pylint: disable=C0301 + # gemerate the commands to launch the processes for each shard, agent, ticket machine + cmd_shards = [ + "./build/src/parsec/runtime_locking_shard/replicated_shard", + f"--shard_count={NUM_SHARDS}", + f"--shard{shard_lgc_num}_count={shard_repl_factor}", # number of shard replicas per cluster + f"--shard{shard_lgc_num}-{shard_phys_num}_endpoint={IP}:{shard_ep_port}", + f"--shard{shard_lgc_num}-{shard_phys_num}_raft_endpoint={IP}:{shard_cluster_raft_ep_port}", + f"--node_id={num_active_shards}", + f"--component_id={cluster_id}", + f"--agent_count={NUM_AGENTS} --agent{agent_num}_endpoint={IP}:{agent_port}", + f"--ticket_machine_count={NUM_TICKET_MACHINES}" + f"--ticket_machine{curr_tmc_num}_endpoint={IP}:{tmc_port}", + f"--log_level={LOG_LEVEL} > logs/shardd_{shard_lgc_num}_{shard_lgc_num}.log", + "&" + ] + + cmd_tmc = [ + "-s", f"{IP}:{SHARD_EP_PORT_ID}", "-t", "60", "--", + "./build/src/parsec/ticket_machine/ticket_machined", + f"--shard_count={NUM_SHARDS}", + f"--shard{shard_lgc_num}_count={shard_repl_factor}", + f"--shard{shard_lgc_num}-{shard_phys_num}_endpoint={IP}:{shard_ep_port}", + "--node_id={num_active_shards}", + "--component_id={cluster_id}", + f"--agent_count={NUM_AGENTS}", + f"--agent0_endpoint={IP}:{agent_port}", + f"--ticket_machine_count={NUM_TICKET_MACHINES}", + f"--ticket_machine{curr_tmc_num}_endpoint={IP}:{tmc_port}", + f"--log_level={LOG_LEVEL} > logs/ticket_machined_{curr_tmc_num}.log", + "&" + ] + + cmd_agent = [ + "-s", f"{IP}:{TMC_PORT_ID}", "-t", "60", "--", + "-s", f"{IP}:{shard_ep_port}", "-t", "60", "--", + "./build/src/parsec/agent/agentd", + f"--shard_count={NUM_SHARDS}", + f"--shard{shard_lgc_num}_count={shard_repl_factor}", + f"--shard{shard_lgc_num}_{shard_phys_num}_endpoint={IP}:{shard_ep_port}", + f"--node_id={num_active_shards}", + f"--component_id={cluster_id}", + f"--agent_count={NUM_AGENTS}", + f"--agent{agent_num}_endpoint={IP}:{agent_port}", + f"--ticket_machine_count={NUM_TICKET_MACHINES}", + f"--ticket_machine{curr_tmc_num}_endpoint={IP}:{tmc_port}", + f"--log_level={LOG_LEVEL}", + f"--runner_type={RUNNER_TYPE}" + ] + # pylint: enable=C0301 + + # execute the commands + # either all work and update variables and pid min-heaps + # or gen new ports and retry, unroll any processes that were launched + result_shards = subprocess.run(cmd_shards, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + + result_tmc = subprocess.run(cmd_tmc, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + + result_agent = subprocess.run(cmd_agent, \ + stdout=subprocess.PIPE, stderr=subprocess.PIPE, \ + check=True, text=True, timeout=5) + + + pid_shards = check_cmd_return_pid(result_shards, "shardd") + pid_tmc = check_cmd_return_pid(result_tmc, "ticket_machined") + pid_agent = check_cmd_return_pid(result_agent, "agentd") + + # by doing this we can accurately hit our target counts for each machine + # FIXME: add in a while loop countdown after this to launch any remaining process batches + if any([result_shards.returncode, result_tmc.returncode, result_agent.returncode]) is None: + logger.error("Failed to launch all processes") + logger.error("Undoing this batch of launches") + for pid in [pid_shards, pid_tmc, pid_agent]: + if pid is not None: + kill_pid(pid) + + # successful launch + else: + heapq.heappush(pids_shards_min_heap, pid_shards) + heapq.heappush(pids_other_min_heap, pid_tmc) + heapq.heappush(pids_agents_min_heap, pid_agent) + + num_active_shards += 1 + num_active_agents += 1 + num_active_tmcs += 1 + + logger.info("pid = %s created; shardd #%i", pid_shards, num_active_shards) + logger.info("pid = %s created; ticket_machined #%i", pid_tmc, num_active_tmcs) + logger.info("pid = %s created; agentd #%i", pid_agent, num_active_agents) + + prev_agent_port, prev_tmc_port, prev_shard_port, prev_shard_raft_port = \ + agent_port, tmc_port, shard_ep_port, shard_cluster_raft_ep_port + + + # did we create the desired number of agents and ticket machines and shards? + if num_active_tmcs < NUM_TICKET_MACHINES * REPL_FACTOR: + logger.error("Total ticket machine count does not match expected ticket machine count") + + if num_active_agents < NUM_AGENTS: + logger.error("Total agent count does not match expected agent count") + + if num_active_shards < NUM_SHARDS * REPL_FACTOR: + logger.error("Total shard count does not match expected shard count") + + # FIXME: we can do a while loop here to retry the failed launches or just exit or change the for-loops + # could build in redundancy for the port allocation and process launching like ports wanted * 1.2 + + +def kill_pid(pid: int) -> None: + """ + Kill a pid and log the error if it fails + """ + try: + subprocess.run(["kill", str(pid)], check=True) + except (subprocess.SubprocessError, OSError) as e: + logger.error("Failed to kill pid %s: %s", pid, e) + + +def kill_pids_in_parallel(pids_min_heap: list[int]) -> None: + """ + Kill a pids in parallel by popping from the pids min-heap + """ + num_processes = math.ceil(math.sqrt(len(pids_min_heap))) + with concurrent.futures.ThreadPoolExecutor(max_workers=num_processes) \ + as executor: + while pids_min_heap: + pid = heapq.heappop(pids_min_heap) + executor.submit(kill_pid, pid) + + +def teardown() -> None: + """ + Kill all running processes + """ + logger.info("Shutting down processes") + logger.info("Killing shards") + kill_pids_in_parallel(pids_shards_min_heap) + logger.info("Killing agents") + kill_pids_in_parallel(pids_agents_min_heap) + logger.info("Killing other processes") + kill_pids_in_parallel(pids_other_min_heap) + logger.info("Processes killed") + logger.info("Shutting down done") + # close log files + # for log_file in ["logs/shardd.log", "logs/ticket_machined.log", "logs/agentd.log"]: + for log_file in ["logs/parsec-run-local.log"]: + with open(log_file, "w", encoding="utf-8") as log_file: + log_file.write("Log file closed\n") + + +def main() -> None: + """ + Main function to run the program + """ + args = parse_args() + + global IP, PORT, LOG_LEVEL, RUNNER_TYPE + IP = args.IP + PORT = args.PORT + LOG_LEVEL = args.LOG_LEVEL + RUNNER_TYPE = args.RUNNER_TYPE + + global NUM_AGENTS, NUM_SHARDS, REPL_FACTOR + NUM_AGENTS = args.NUM_AGENTS + NUM_SHARDS = args.NUM_SHARDS + REPL_FACTOR = args.REPLICATION_FACTOR + + setup() + + validate_args() + + if max(NUM_AGENTS, NUM_SHARDS, REPL_FACTOR, NUM_TICKET_MACHINES) == 1: + launch_one_parsec() + else: + launch_full_parsec() + + if LOG_LEVEL == "DEBUG": + show_min_heap(pids_shards_min_heap) + show_min_heap(pids_agents_min_heap) + show_min_heap(pids_other_min_heap) + + teardown() + + +if __name__ == "__main__": + main() +