diff --git a/worker/games.py b/worker/games.py index 1caa1ed1b..64463a3c5 100644 --- a/worker/games.py +++ b/worker/games.py @@ -135,6 +135,41 @@ def send_sigint(p): p.send_signal(signal.SIGINT) +def cache_read(cache, name): + """Read a binary blob of data from a global cache on disk, None if not available""" + if cache == "": + return None + + try: + return (Path(cache) / name).read_bytes() + except Exception as e: + return None + + +def cache_write(cache, name, data): + """Write a binary blob of data to a global cache on disk in an atomic way, skip if not available""" + if cache == "": + return + + try: + temp_file = tempfile.NamedTemporaryFile(dir=cache, delete=False) + temp_file.write(data) + temp_file.flush() + os.fsync(temp_file.fileno()) # Ensure data is written to disk + temp_file.close() + + # try linking, which is atomic, and will fail if the file exists + try: + os.link(temp_file.name, Path(cache) / name) + except OSError: + pass + + # Remove the temporary file + os.remove(temp_file.name) + except Exception as e: + return + + # See https://stackoverflow.com/questions/16511337/correct-way-to-try-except-using-python-requests-module # for background. # It may be useful to introduce more refined http exception handling in the future. @@ -286,11 +321,18 @@ def required_nets_from_source(): return nets -def download_net(remote, testing_dir, net): - url = remote + "/api/nn/" + net - print("Downloading {}".format(net)) - r = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT) - (testing_dir / net).write_bytes(r.content) +def download_net(remote, testing_dir, net, global_cache): + content = cache_read(global_cache, net) + + if content is None: + url = remote + "/api/nn/" + net + print("Downloading {}".format(net)) + content = requests_get(url, allow_redirects=True, timeout=HTTP_TIMEOUT).content + cache_write(global_cache, net, content) + else: + print("Using {} from global cache".format(net)) + + (testing_dir / net).write_bytes(content) def validate_net(testing_dir, net): @@ -298,13 +340,13 @@ def validate_net(testing_dir, net): return hash[:12] == net[3:15] -def establish_validated_net(remote, testing_dir, net): +def establish_validated_net(remote, testing_dir, net, global_cache): if not (testing_dir / net).exists() or not validate_net(testing_dir, net): attempt = 0 while True: try: attempt += 1 - download_net(remote, testing_dir, net) + download_net(remote, testing_dir, net, global_cache) if not validate_net(testing_dir, net): raise WorkerException( "Failed to validate the network: {}".format(net) @@ -666,22 +708,37 @@ def find_arch(compiler): def setup_engine( - destination, worker_dir, testing_dir, remote, sha, repo_url, concurrency, compiler + destination, + worker_dir, + testing_dir, + remote, + sha, + repo_url, + concurrency, + compiler, + global_cache, ): """Download and build sources in a temporary directory then move exe to destination""" tmp_dir = Path(tempfile.mkdtemp(dir=worker_dir)) try: - item_url = github_api(repo_url) + "/zipball/" + sha - print("Downloading {}".format(item_url)) - blob = requests_get(item_url).content + blob = cache_read(global_cache, sha + ".zip") + + if blob is None: + item_url = github_api(repo_url) + "/zipball/" + sha + print("Downloading {}".format(item_url)) + blob = requests_get(item_url).content + cache_write(global_cache, sha + ".zip", blob) + else: + print("Using {} from global cache".format(sha + ".zip")) + file_list = unzip(blob, tmp_dir) prefix = os.path.commonprefix([n.filename for n in file_list]) os.chdir(tmp_dir / prefix / "src") for net in required_nets_from_source(): print("Build uses default net:", net) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) shutil.copyfile(testing_dir / net, net) arch = find_arch(compiler) @@ -1182,7 +1239,15 @@ def launch_cutechess( def run_games( - worker_info, current_state, password, remote, run, task_id, pgn_file, clear_binaries + worker_info, + current_state, + password, + remote, + run, + task_id, + pgn_file, + clear_binaries, + global_cache, ): # This is the main cutechess-cli driver. # It is ok, and even expected, for this function to @@ -1316,6 +1381,7 @@ def parse_options(s): repo_url, worker_info["concurrency"], worker_info["compiler"], + global_cache, ) if not base_engine.exists(): setup_engine( @@ -1327,6 +1393,7 @@ def parse_options(s): repo_url, worker_info["concurrency"], worker_info["compiler"], + global_cache, ) os.chdir(testing_dir) @@ -1360,11 +1427,11 @@ def parse_options(s): # Add EvalFile* with full path to cutechess options, and download the networks if missing. for option, net in required_nets(base_engine).items(): base_options.append("option.{}={}".format(option, net)) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) for option, net in required_nets(new_engine).items(): new_options.append("option.{}={}".format(option, net)) - establish_validated_net(remote, testing_dir, net) + establish_validated_net(remote, testing_dir, net, global_cache) # PGN files output setup. pgn_name = "results-" + worker_info["unique_key"] + ".pgn" diff --git a/worker/sri.txt b/worker/sri.txt index b16ced5c6..2c6685836 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 239, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "+ubGHk3rIV0ILVhg1dxpsOkRF8GUaO5otPVnAyw8kKKq9Rqzksv02xj6wjYpSTmA", "games.py": "6vKH51UtL56oNvA539hLXRzgE1ADXy3QZNJohoK94RntM72+iMancSJZHaNjEb5+"} +{"__version": 240, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "lHqjGBQl9RVBcssjCUV912/PL6Bi2zGe44XuUxt8HLbGtb2nb+voyNQnW8Gwk+Rd", "games.py": "9dFaa914vpqT7q4LLx2LlDdYwK6QFVX3h7+XRt18ATX0lt737rvFeBIiqakkttNC"} diff --git a/worker/worker.py b/worker/worker.py index cb78506e7..91e59e7f2 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -62,7 +62,7 @@ MIN_CLANG_MAJOR = 8 MIN_CLANG_MINOR = 0 -WORKER_VERSION = 239 +WORKER_VERSION = 240 FILE_LIST = ["updater.py", "worker.py", "games.py"] HTTP_TIMEOUT = 30.0 INITIAL_RETRY_TIME = 15.0 @@ -661,6 +661,7 @@ def setup_parameters(worker_dir): ("parameters", "uuid_prefix", "_hw", _alpha_numeric, None), ("parameters", "min_threads", "1", int, None), ("parameters", "fleet", "False", _bool, None), + ("parameters", "global_cache", "", str, None), ("parameters", "compiler", default_compiler, compiler_names, None), ("private", "hw_seed", str(random.randint(0, 0xFFFFFFFF)), int, None), ] @@ -748,6 +749,17 @@ def _get_help_string(self, action): choices=[False, True], # useful for usage message help="if 'True', quit in case of errors or if no task is available", ) + parser.add_argument( + "-g", + "--global_cache", + dest="global_cache", + default=config.get("parameters", "global_cache"), + type=str, + help="""Useful only when running multiple workers concurrently: + an existing absolute path to be used to globally cache on disk + certain downloads, reducing load on github or net server. + The default empty string ("") disables using a cache.""", + ) parser.add_argument( "-C", "--compiler", @@ -869,6 +881,7 @@ def my_error(e): ) config.set("parameters", "min_threads", str(options.min_threads)) config.set("parameters", "fleet", str(options.fleet)) + config.set("parameters", "global_cache", str(options.global_cache)) config.set("parameters", "compiler", options.compiler_) with open(config_file, "w") as f: @@ -1345,7 +1358,13 @@ def verify_worker_version(remote, username, password): def fetch_and_handle_task( - worker_info, password, remote, lock_file, current_state, clear_binaries + worker_info, + password, + remote, + lock_file, + current_state, + clear_binaries, + global_cache, ): # This function should normally not raise exceptions. # Unusual conditions are handled by returning False. @@ -1443,6 +1462,7 @@ def fetch_and_handle_task( task_id, pgn_file, clear_binaries, + global_cache, ) success = True except FatalException as e: @@ -1675,6 +1695,7 @@ def worker(): lock_file, current_state, clear_binaries, + options.global_cache, ) if not current_state["alive"]: # the user may have pressed Ctrl-C... break