Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,9 @@ def is_locally_available(self, file_name):
def has_expected_size(self, file_name, expected_size):
return expected_size is None or os.path.getsize(file_name) == expected_size

def create_file_offset_table(self, document_file_path, expected_number_of_lines):
def create_file_offset_table(self, document_file_path, expected_number_of_lines, corpus_base_url=None):
# just rebuild the file every time for the time being. Later on, we might check the data file fingerprint to avoid it
lines_read = io.prepare_file_offset_table(document_file_path)
lines_read = io.prepare_file_offset_table(document_file_path, corpus_base_url)
if lines_read and lines_read != expected_number_of_lines:
io.remove_file_offset_table(document_file_path)
raise exceptions.DataError(
Expand Down Expand Up @@ -641,7 +641,7 @@ def prepare_document_set(self, document_set, data_root):
) from None
raise

self.create_file_offset_table(doc_path, document_set.number_of_lines)
self.create_file_offset_table(doc_path, document_set.number_of_lines, document_set.base_url)

def prepare_bundled_document_set(self, document_set, data_root):
"""
Expand All @@ -668,7 +668,7 @@ def prepare_bundled_document_set(self, document_set, data_root):
while True:
if self.is_locally_available(doc_path):
if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes):
self.create_file_offset_table(doc_path, document_set.number_of_lines)
self.create_file_offset_table(doc_path, document_set.number_of_lines, document_set.base_url)
return True
else:
raise exceptions.DataError(
Expand Down
73 changes: 71 additions & 2 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
# but they are treated the same by mypy, so I'm not going to use conditional imports here
from typing_extensions import Self

from esrally.utils import console
from esrally.utils import console, net

SUPPORTED_ARCHIVE_FORMATS = [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".zst"]

Expand Down Expand Up @@ -520,6 +520,59 @@ def is_valid(self) -> bool:
"""
return self.exists() and os.path.getmtime(self.offset_table_path) >= os.path.getmtime(self.data_file_path)

def try_download_from_corpus_location(self, corpus_base_url: Optional[str] = None) -> bool:
"""
Attempts to download a pre-computed offset file from the corpus location.

:param corpus_base_url: The base URL where the corpus data is hosted. If None, tries to infer from data file path.
:return: True if download was successful, False otherwise.
"""
if not corpus_base_url:
# Try to infer corpus URL from common Rally corpus locations
# This is a best-effort approach for common scenarios
return False

logger = logging.getLogger(__name__)
data_file_name = os.path.basename(self.data_file_path)
offset_file_name = f"{data_file_name}.offset"

# Construct the remote offset file URL
if corpus_base_url.endswith("/"):
separator = ""
else:
separator = "/"
remote_offset_url = f"{corpus_base_url}{separator}{offset_file_name}"
Comment on lines +529 to +538
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or simply write these two lines. :-)

offset_file_name = os.path.basename(f"{self.data_file_path}.offset")
remote_offset_url = f"{corpus_base_url.rstrip('/')}/{offset_file_name}"


try:
logger.info("Attempting to download offset file from [%s]", remote_offset_url)

# Ensure the directory exists
os.makedirs(os.path.dirname(self.offset_table_path), exist_ok=True)

# Download the offset file using Rally's existing HTTP download functionality
if remote_offset_url.startswith(("http://", "https://")):
net.download_http(remote_offset_url, self.offset_table_path)
elif remote_offset_url.startswith(("s3://", "gs://")):
# Extract scheme for bucket downloads
scheme = remote_offset_url.split("://")[0]
net.download_from_bucket(scheme, remote_offset_url, self.offset_table_path)
else:
logger.debug("Unsupported URL scheme for remote offset download: [%s]", remote_offset_url)
return False

logger.info("Successfully downloaded offset file from [%s]", remote_offset_url)
return True

except Exception as e:
logger.debug("Failed to download offset file from [%s]: %s", remote_offset_url, str(e))
# Clean up any partially downloaded file
if os.path.exists(self.offset_table_path):
try:
os.remove(self.offset_table_path)
except Exception:
pass # Best effort cleanup
return False

def __enter__(self) -> Self:
self.offset_file = open(self.offset_table_path, self.mode)
return self
Expand Down Expand Up @@ -596,16 +649,32 @@ def remove(data_file_path: str) -> None:
os.remove(f"{data_file_path}.offset")


def prepare_file_offset_table(data_file_path: str) -> Optional[int]:
def prepare_file_offset_table(data_file_path: str, corpus_base_url: Optional[str] = None) -> Optional[int]:
"""
Creates a file that contains a mapping from line numbers to file offsets for the provided path. This file is used internally by
#skip_lines(data_file_path, data_file) to speed up line skipping.

:param data_file_path: The path to a text file that is readable by this process.
:param corpus_base_url: Optional base URL where the corpus data (and potentially offset files) are hosted.
If provided, Rally will attempt to download a pre-computed .offset file before creating one locally.
:return The number of lines read or ``None`` if it did not have to build the file offset table.
"""
file_offset_table = FileOffsetTable.create_for_data_file(data_file_path)
if not file_offset_table.is_valid():
# First, try to download the offset file from the corpus location
if corpus_base_url:
console.info("Attempting to download offset file for [%s] from corpus location ... " % data_file_path, end="", flush=True)
if file_offset_table.try_download_from_corpus_location(corpus_base_url):
console.println("[DOWNLOADED]")
# Verify the downloaded file is valid
if file_offset_table.is_valid():
return None
else:
console.println("[INVALID - will create locally]")
else:
console.println("[NOT FOUND - will create locally]")

# Fall back to creating the offset file locally
console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True)
line_number = 0
with file_offset_table:
Expand Down
20 changes: 10 additions & 10 deletions tests/track/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def test_does_nothing_if_document_file_available(self, is_file, get_size, prepar
data_root="/tmp",
)

prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", None)

@mock.patch("esrally.utils.io.prepare_file_offset_table")
@mock.patch("os.path.getsize")
Expand All @@ -214,7 +214,7 @@ def test_decompresses_if_archive_available(self, is_file, get_size, prepare_file
data_root="/tmp",
)

prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", None)

@mock.patch("esrally.utils.io.decompress")
@mock.patch("os.path.getsize")
Expand Down Expand Up @@ -329,7 +329,7 @@ def test_download_document_archive_if_no_file_available(
download.assert_called_with(
"http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json.bz2", "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY
)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", "http://benchmarks.elasticsearch.org/corpora/unit-test")

@mock.patch("esrally.utils.io.prepare_file_offset_table")
@mock.patch("esrally.utils.io.decompress")
Expand All @@ -346,18 +346,18 @@ def test_download_document_with_trailing_baseurl_slash(
is_file.side_effect = [False, True, True]
# uncompressed file size is 2000
get_size.return_value = 2000
scheme = random.choice(["http", "https", "s3", "gs"])
scheme = str(random.choice(["http", "https", "s3", "gs"]))

prepare_file_offset_table.return_value = 5

p = loader.DocumentSetPreparator(
track_name="unit-test", downloader=loader.Downloader(offline=False, test_mode=False), decompressor=loader.Decompressor()
)

url = f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/"
p.prepare_document_set(
document_set=track.Documents(
source_format=track.Documents.SOURCE_FORMAT_BULK,
base_url=f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/",
base_url=url,
document_file="docs.json",
# --> We don't provide a document archive here <--
document_archive=None,
Expand All @@ -372,7 +372,7 @@ def test_download_document_with_trailing_baseurl_slash(
download.assert_called_with(
f"{scheme}://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY
)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", url)

@mock.patch("esrally.utils.io.prepare_file_offset_table")
@mock.patch("esrally.utils.net.download")
Expand Down Expand Up @@ -411,7 +411,7 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en
download.assert_called_with(
"http://benchmarks.elasticsearch.org/corpora/unit-test/docs.json", "/tmp/docs.json", 2000, progress_indicator=mock.ANY
)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", "http://benchmarks.elasticsearch.org/corpora/unit-test")

@mock.patch("esrally.utils.net.download")
@mock.patch("esrally.utils.io.ensure_dir")
Expand Down Expand Up @@ -608,7 +608,7 @@ def test_prepare_bundled_document_set_if_document_file_available(self, is_file,
data_root=".",
)

prepare_file_offset_table.assert_called_with("./docs.json")
prepare_file_offset_table.assert_called_with("./docs.json", None)

@mock.patch("esrally.utils.io.prepare_file_offset_table")
@mock.patch("esrally.utils.io.decompress")
Expand Down Expand Up @@ -795,7 +795,7 @@ def test_prepare_bundled_document_set_decompresses_compressed_docs(self, is_file
data_root=".",
)

prepare_file_offset_table.assert_called_with("./docs.json")
prepare_file_offset_table.assert_called_with("./docs.json", None)

@mock.patch("os.path.getsize")
@mock.patch("os.path.isfile")
Expand Down