From 8885e77d68f4621020a66d5bd93eef69e6f1f51a Mon Sep 17 00:00:00 2001 From: Joost VandeVondele Date: Thu, 13 Jun 2024 15:05:03 +0200 Subject: [PATCH] Introduce a worker global cache introduce a global_cache to reduce downloads from github and the net server if multiple workers having a shared filesystem are run by one user. This should allow a fleet to run without overloading github. If the user specifies the path that corresponds to the global cache, the network and the source zipball are cached. They are used if available, and only downloaded (and stored) if not. Storing the data in the cache has been done in such a way that it becomes available atomically on most file systems. --- worker/games.py | 97 ++++++++++++++++++++++++++++++++++++++++-------- worker/sri.txt | 2 +- worker/worker.py | 25 ++++++++++++- 3 files changed, 106 insertions(+), 18 deletions(-) diff --git a/worker/games.py b/worker/games.py index 1caa1ed1b..64463a3c5 100644 --- a/worker/games.py +++ b/worker/games.py @@ -135,6 +135,41 @@ def send_sigint(p): p.send_signal(signal.SIGINT) +def cache_read(cache, name): + """Read a binary blob of data from a global cache on disk, None if not available""" + if cache == "": + return None + + try: + return (Path(cache) / name).read_bytes() + except Exception as e: + return None + + +def cache_write(cache, name, data): + """Write a binary blob of data to a global cache on disk in an atomic way, skip if not available""" + if cache == "": + return + + try: + temp_file = tempfile.NamedTemporaryFile(dir=cache, delete=False) + temp_file.write(data) + temp_file.flush() + os.fsync(temp_file.fileno()) # Ensure data is written to disk + temp_file.close() + + # try linking, which is atomic, and will fail if the file exists + try: + os.link(temp_file.name, Path(cache) / name) + except OSError: + pass + + # Remove the temporary file + os.remove(temp_file.name) + except Exception as e: + return + + # See https://stackoverflow.com/questions/16511337/correct-way-to-try-except-using-python-requests-module # for background. # It may be useful to introduce more refined http exception handling in the future. @@ -286,11 +321,18 @@ def required_nets_from_source(): return nets -def download_net(remote, testing_dir, net): - url = remote + "/api/nn/" + net - print("Downloading {}".format(net)) - r = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT) - (testing_dir / net).write_bytes(r.content) +def download_net(remote, testing_dir, net, global_cache): + content = cache_read(global_cache, net) + + if content is None: + url = remote + "/api/nn/" + net + print("Downloading {}".format(net)) + content = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT).content + cache_write(global_cache, net, content) + else: + print("Using {} from global cache".format(net)) + + (testing_dir / net).write_bytes(content) def validate_net(testing_dir, net): @@ -298,13 +340,13 @@ def validate_net(testing_dir, net): return hash[:12] == net[3:15] -def establish_validated_net(remote, testing_dir, net): +def establish_validated_net(remote, testing_dir, net, global_cache): if not (testing_dir / net).exists() or not validate_net(testing_dir, net): attempt = 0 while True: try: attempt += 1 - download_net(remote, testing_dir, net) + download_net(remote, testing_dir, net, global_cache) if not validate_net(testing_dir, net): raise WorkerException( "Failed to validate the network: {}".format(net) @@ -666,22 +708,37 @@ def find_arch(compiler): def setup_engine( - destination, worker_dir, testing_dir, remote, sha, repo_url, concurrency, compiler + destination, + worker_dir, + testing_dir, + remote, + sha, + repo_url, + concurrency, + compiler, + global_cache, ): """Download and build sources in a temporary directory then move exe to destination""" tmp_dir = Path(tempfile.mkdtemp(dir=worker_dir)) try: - item_url = github_api(repo_url) + "/zipball/" + sha - print("Downloading {}".format(item_url)) - blob = requests_get(item_url).content + blob = cache_read(global_cache, sha + ".zip") + + if blob is None: + item_url = github_api(repo_url) + "/zipball/" + sha + print("Downloading {}".format(item_url)) + blob = requests_get(item_url).content + cache_write(global_cache, sha + ".zip", blob) + else: + print("Using {} from global cache".format(sha + ".zip")) + file_list = unzip(blob, tmp_dir) prefix = os.path.commonprefix([n.filename for n in file_list]) os.chdir(tmp_dir / prefix / "src") for net in required_nets_from_source(): print("Build uses default net:", net) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) shutil.copyfile(testing_dir / net, net) arch = find_arch(compiler) @@ -1182,7 +1239,15 @@ def launch_cutechess( def run_games( - worker_info, current_state, password, remote, run, task_id, pgn_file, clear_binaries + worker_info, + current_state, + password, + remote, + run, + task_id, + pgn_file, + clear_binaries, + global_cache, ): # This is the main cutechess-cli driver. # It is ok, and even expected, for this function to @@ -1316,6 +1381,7 @@ def parse_options(s): repo_url, worker_info["concurrency"], worker_info["compiler"], + global_cache, ) if not base_engine.exists(): setup_engine( @@ -1327,6 +1393,7 @@ def parse_options(s): repo_url, worker_info["concurrency"], worker_info["compiler"], + global_cache, ) os.chdir(testing_dir) @@ -1360,11 +1427,11 @@ def parse_options(s): # Add EvalFile* with full path to cutechess options, and download the networks if missing. for option, net in required_nets(base_engine).items(): base_options.append("option.{}={}".format(option, net)) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) for option, net in required_nets(new_engine).items(): new_options.append("option.{}={}".format(option, net)) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) # PGN files output setup. pgn_name = "results-" + worker_info["unique_key"] + ".pgn" diff --git a/worker/sri.txt b/worker/sri.txt index b16ced5c6..2c6685836 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 239, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "+ubGHk3rIV0ILVhg1dxpsOkRF8GUaO5otPVnAyw8kKKq9Rqzksv02xj6wjYpSTmA", "games.py": "6vKH51UtL56oNvA539hLXRzgE1ADXy3QZNJohoK94RntM72+iMancSJZHaNjEb5+"} +{"__version": 240, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "lHqjGBQl9RVBcssjCUV912/PL6Bi2zGe44XuUxt8HLbGtb2nb+voyNQnW8Gwk+Rd", "games.py": "9dFaa914vpqT7q4LLx2LlDdYwK6QFVX3h7+XRt18ATX0lt737rvFeBIiqakkttNC"} diff --git a/worker/worker.py b/worker/worker.py index cb78506e7..91e59e7f2 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -62,7 +62,7 @@ MIN_CLANG_MAJOR = 8 MIN_CLANG_MINOR = 0 -WORKER_VERSION = 239 +WORKER_VERSION = 240 FILE_LIST = ["updater.py", "worker.py", "games.py"] HTTP_TIMEOUT = 30.0 INITIAL_RETRY_TIME = 15.0 @@ -661,6 +661,7 @@ def setup_parameters(worker_dir): ("parameters", "uuid_prefix", "_hw", _alpha_numeric, None), ("parameters", "min_threads", "1", int, None), ("parameters", "fleet", "False", _bool, None), + ("parameters", "global_cache", "", str, None), ("parameters", "compiler", default_compiler, compiler_names, None), ("private", "hw_seed", str(random.randint(0, 0xFFFFFFFF)), int, None), ] @@ -748,6 +749,17 @@ def _get_help_string(self, action): choices=[False, True], # useful for usage message help="if 'True', quit in case of errors or if no task is available", ) + parser.add_argument( + "-g", + "--global_cache", + dest="global_cache", + default=config.get("parameters", "global_cache"), + type=str, + help="""Useful only when running multiple workers concurrently: + an existing absolute path to be used to globally cache on disk + certain downloads, reducing load on github or net server. + The default empty string ("") disables using a cache.""", + ) parser.add_argument( "-C", "--compiler", @@ -869,6 +881,7 @@ def my_error(e): ) config.set("parameters", "min_threads", str(options.min_threads)) config.set("parameters", "fleet", str(options.fleet)) + config.set("parameters", "global_cache", str(options.global_cache)) config.set("parameters", "compiler", options.compiler_) with open(config_file, "w") as f: @@ -1345,7 +1358,13 @@ def verify_worker_version(remote, username, password): def fetch_and_handle_task( - worker_info, password, remote, lock_file, current_state, clear_binaries + worker_info, + password, + remote, + lock_file, + current_state, + clear_binaries, + global_cache, ): # This function should normally not raise exceptions. # Unusual conditions are handled by returning False. @@ -1443,6 +1462,7 @@ def fetch_and_handle_task( task_id, pgn_file, clear_binaries, + global_cache, ) success = True except FatalException as e: @@ -1675,6 +1695,7 @@ def worker(): lock_file, current_state, clear_binaries, + options.global_cache, ) if not current_state["alive"]: # the user may have pressed Ctrl-C... break