diff --git a/esrally/track/loader.py b/esrally/track/loader.py index e256970a6..67b928c2f 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -621,9 +621,9 @@ def is_locally_available(self, file_name): def has_expected_size(self, file_name, expected_size): return expected_size is None or os.path.getsize(file_name) == expected_size - def create_file_offset_table(self, document_file_path, expected_number_of_lines): + def create_file_offset_table(self, document_file_path, expected_number_of_lines, corpus_base_url=None): # just rebuild the file every time for the time being. Later on, we might check the data file fingerprint to avoid it - lines_read = io.prepare_file_offset_table(document_file_path) + lines_read = io.prepare_file_offset_table(document_file_path, corpus_base_url) if lines_read and lines_read != expected_number_of_lines: io.remove_file_offset_table(document_file_path) raise exceptions.DataError( @@ -680,7 +680,7 @@ def prepare_document_set(self, document_set, data_root): ) from None raise - self.create_file_offset_table(doc_path, document_set.number_of_lines) + self.create_file_offset_table(doc_path, document_set.number_of_lines, document_set.base_url) def prepare_bundled_document_set(self, document_set, data_root): """ @@ -707,7 +707,7 @@ def prepare_bundled_document_set(self, document_set, data_root): while True: if self.is_locally_available(doc_path): if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): - self.create_file_offset_table(doc_path, document_set.number_of_lines) + self.create_file_offset_table(doc_path, document_set.number_of_lines, document_set.base_url) return True else: raise exceptions.DataError( diff --git a/esrally/utils/io.py b/esrally/utils/io.py index be6bd5296..a58981ba4 100644 --- a/esrally/utils/io.py +++ b/esrally/utils/io.py @@ -36,7 +36,7 @@ # but they are treated the same by mypy, so I'm not going to use conditional imports here from typing_extensions import Self -from esrally.utils import console +from esrally.utils import console, net SUPPORTED_ARCHIVE_FORMATS = [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".zst"] @@ -514,6 +514,41 @@ def is_valid(self) -> bool: """ return self.exists() and os.path.getmtime(self.offset_table_path) >= os.path.getmtime(self.data_file_path) + def try_download_from_corpus_location(self, corpus_base_url: str | None) -> bool: + """ + Attempts to download a pre-computed offset file from the corpus location. + + :param corpus_base_url: The base URL where the corpus data is hosted. If None, tries to infer from data file path. + :return: True if download was successful, False otherwise. + """ + if not corpus_base_url: + # Try to infer corpus URL from common Rally corpus locations + # This is a best-effort approach for common scenarios + return False + + logger = logging.getLogger(__name__) + data_file_name = os.path.basename(self.data_file_path) + offset_file_name = f"{data_file_name}.offset" + + # Construct the remote offset file URL + if corpus_base_url.endswith("/"): + separator = "" + else: + separator = "/" + remote_offset_url = f"{corpus_base_url}{separator}{offset_file_name}" + + logger.info("Attempting to download offset file from [%s]", remote_offset_url) + + # Ensure the directory exists + os.makedirs(os.path.dirname(self.offset_table_path), exist_ok=True) + try: + net.download(remote_offset_url, self.offset_table_path) + logger.info("Successfully downloaded offset file from [%s]", remote_offset_url) + return True + except Exception as e: + logger.debug("Download failed: %s", str(e)) + return False + def __enter__(self) -> Self: self.offset_file = open(self.offset_table_path, self.mode) return self @@ -590,16 +625,32 @@ def remove(data_file_path: str) -> None: os.remove(f"{data_file_path}.offset") -def prepare_file_offset_table(data_file_path: str) -> Optional[int]: +def prepare_file_offset_table(data_file_path: str, corpus_base_url: str | None) -> int | None: """ Creates a file that contains a mapping from line numbers to file offsets for the provided path. This file is used internally by #skip_lines(data_file_path, data_file) to speed up line skipping. :param data_file_path: The path to a text file that is readable by this process. + :param corpus_base_url: Optional base URL where the corpus data (and potentially offset files) are hosted. + If provided, Rally will attempt to download a pre-computed .offset file before creating one locally. :return The number of lines read or ``None`` if it did not have to build the file offset table. """ file_offset_table = FileOffsetTable.create_for_data_file(data_file_path) if not file_offset_table.is_valid(): + # First, try to download the offset file from the corpus location + if corpus_base_url: + console.info("Attempting to download offset file for [%s] from corpus location ... " % data_file_path, end="", flush=True) + if file_offset_table.try_download_from_corpus_location(corpus_base_url): + console.println("[DOWNLOADED]") + # Verify the downloaded file is valid + if file_offset_table.is_valid(): + return None + else: + console.println("[INVALID - will create locally]") + else: + console.println("[NOT FOUND - will create locally]") + + # Fall back to creating the offset file locally console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True) line_number = 0 with file_offset_table: diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index e60271528..743a80d68 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -188,7 +188,7 @@ def test_does_nothing_if_document_file_available(self, is_file, get_size, prepar data_root="/tmp", ) - prepare_file_offset_table.assert_called_with("/tmp/docs.json") + prepare_file_offset_table.assert_called_with("/tmp/docs.json", None) @mock.patch("esrally.utils.io.prepare_file_offset_table") @mock.patch("os.path.getsize") @@ -214,7 +214,7 @@ def test_decompresses_if_archive_available(self, is_file, get_size, prepare_file data_root="/tmp", ) - prepare_file_offset_table.assert_called_with("/tmp/docs.json") + prepare_file_offset_table.assert_called_with("/tmp/docs.json", None) @mock.patch("esrally.utils.io.decompress") @mock.patch("os.path.getsize") @@ -329,7 +329,7 @@ def test_download_document_archive_if_no_file_available( download.assert_called_with( "http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json.bz2", "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY ) - prepare_file_offset_table.assert_called_with("/tmp/docs.json") + prepare_file_offset_table.assert_called_with("/tmp/docs.json", "http://benchmarks.elasticsearch.org/corpora/unit-test") @mock.patch("esrally.utils.io.prepare_file_offset_table") @mock.patch("esrally.utils.io.decompress") @@ -346,18 +346,18 @@ def test_download_document_with_trailing_baseurl_slash( is_file.side_effect = [False, True, True] # uncompressed file size is 2000 get_size.return_value = 2000 - scheme = random.choice(["http", "https", "s3", "gs"]) + scheme = str(random.choice(["http", "https", "s3", "gs"])) prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator( track_name="unit-test", downloader=loader.Downloader(offline=False, test_mode=False), decompressor=loader.Decompressor() ) - + url = f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/" p.prepare_document_set( document_set=track.Documents( source_format=track.Documents.SOURCE_FORMAT_BULK, - base_url=f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/", + base_url=url, document_file="docs.json", # --> We don't provide a document archive here <-- document_archive=None, @@ -372,7 +372,7 @@ def test_download_document_with_trailing_baseurl_slash( download.assert_called_with( f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY ) - prepare_file_offset_table.assert_called_with("/tmp/docs.json") + prepare_file_offset_table.assert_called_with("/tmp/docs.json", url) @mock.patch("esrally.utils.io.prepare_file_offset_table") @mock.patch("esrally.utils.net.download") @@ -411,7 +411,7 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en download.assert_called_with( "http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY ) - prepare_file_offset_table.assert_called_with("/tmp/docs.json") + prepare_file_offset_table.assert_called_with("/tmp/docs.json", "http://benchmarks.elasticsearch.org/corpora/unit-test") @mock.patch("esrally.utils.net.download") @mock.patch("esrally.utils.io.ensure_dir") @@ -608,7 +608,7 @@ def test_prepare_bundled_document_set_if_document_file_available(self, is_file, data_root=".", ) - prepare_file_offset_table.assert_called_with("./docs.json") + prepare_file_offset_table.assert_called_with("./docs.json", None) @mock.patch("esrally.utils.io.prepare_file_offset_table") @mock.patch("esrally.utils.io.decompress") @@ -795,7 +795,7 @@ def test_prepare_bundled_document_set_decompresses_compressed_docs(self, is_file data_root=".", ) - prepare_file_offset_table.assert_called_with("./docs.json") + prepare_file_offset_table.assert_called_with("./docs.json", None) @mock.patch("os.path.getsize") @mock.patch("os.path.isfile")