Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use greedy approach to finding batch sizes in SortedSeqLoader #14

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ jobs:
pip install -e '.[dev]'
- name: Run tests
run: |
pytest --cov="crossfit"

pytest -m "not (singlegpu or multigpu)" --cov="crossfit"
4 changes: 2 additions & 2 deletions .github/workflows/cf_backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
python setup.py develop
- name: Run Pytorch tests
run: |
pytest --cov="crossfit/array" -m "pytorch"
pytest --cov="crossfit/array" -m "pytorch and not (singlegpu or multigpu)"

# jax:
# runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -66,4 +66,4 @@ jobs:
# python setup.py develop
# - name: Run Jax tests
# run: |
# pytest --cov="crossfit/array" -m "jax"
# pytest --cov="crossfit/array" -m "jax"
31 changes: 28 additions & 3 deletions .github/workflows/gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,33 @@ jobs:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Install dependencies
run: |
python -m pip install "torch${{ matrix.torch-version }}"
python -m pip install .[pytorch-dev]
- name: Run tests
run: |
python -m pip install -r requirements/base.txt
python -m pip install -r requirements/pytorch.txt
pytest tests/
pytest -m singlegpu tests/

#multi-gpu-ci:
# runs-on: linux-amd64-gpu-p100-latest-2
# container:
# image: nvcr.io/nvidia/pytorch:23.09-py3
# env:
# NVIDIA_VISIBLE_DEVICES: ${{ env.NVIDIA_VISIBLE_DEVICES }}
# options: --shm-size=1G
# credentials:
# username: $oauthtoken
# password: ${{ secrets.NGC_TOKEN }}
#
# steps:
# - uses: actions/checkout@v3
# with:
# fetch-depth: 0
# - name: Install dependencies
# run: |
# python -m pip install "torch${{ matrix.torch-version }}"
# python -m pip install .[pytorch-dev]
# - name: Run tests
# run: |
# pytest -m multigpu tests/
3 changes: 2 additions & 1 deletion crossfit/backend/torch/hf/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ def load_model(self, device="cuda"):

return SentenceTransformer(self.path_or_name, device="cuda").to(device)

@lru_cache(maxsize=1)
def load_cfg(self):
return AutoConfig.from_pretrained("sentence-transformers/" + self.path_or_name)
return AutoConfig.from_pretrained(self.path_or_name)
100 changes: 27 additions & 73 deletions crossfit/backend/torch/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ def __init__(
self.sorted_indices = seq_length.argsort()
frame = frame.apply(lambda x: x[self.sorted_indices])
frame = frame.assign(seq_length=seq_length[self.sorted_indices])

super().__init__(frame, initial_batch_size, progress_bar=progress_bar)
self.splits = self._find_optimal_splits()

def sort_column(self, col):
indices = convert_array(self.sorted_indices, type(col))
Expand All @@ -106,89 +104,45 @@ def sort_df(self, df):
return output

def __next__(self):
if self.current_idx >= len(self.splits):
if self.current_idx >= self.num_rows:
self.current_idx = 0
raise StopIteration

if self.current_idx == 0:
start = 0
else:
start = self.splits[self.current_idx - 1]
end = min(self.splits[self.current_idx], self.num_rows)
_tokens = self.tensor_dict["seq_length"]
batch_size = 1

def batch_seq_len(batch_size):
end = self.current_idx + batch_size
return int(
min(
self.tensor_dict["seq_length"][end - 1], self.model.max_seq_length()
)
)

while (
self.current_idx + batch_size
) < self.num_rows and self.model.estimate_memory(
batch_seq_len(batch_size), batch_size
) < (
(self.model.max_mem_gb * 1024)
):
batch_size += 1

end = batch_size + self.current_idx

batch = {
key: val[start:end]
key: val[self.current_idx : end]
for key, val in self.tensor_dict.items()
if key not in self.to_ignore
}
clip_len = min(_tokens[end - 1], self.model.max_seq_length())
batch = {key: val[:, :clip_len] for key, val in batch.items()}
max_seq_len = batch_seq_len(batch_size)
batch = {key: val[:, :max_seq_len] for key, val in batch.items()}

self.current_idx += 1
self.current_idx += batch_size

for fn in self._to_map:
batch = fn(batch)

if self.progress_bar is not None:
self.progress_bar.update(end - start)
self.progress_bar.update(batch_size)

return batch

def _find_optimal_splits(self):
splits = []
i = 0
doubling_factor = 2
max_doubling_attempts, max_steps = 8, 8
dynamic_step_size = self.batch_size
decreasing_attempts = 0

num_tokens = self.tensor_dict["seq_length"]
max_seq_length = self.model.max_seq_length()

while i < len(num_tokens):
best_fit_e_ind = (
i + self.batch_size
) # Initialize to at least initial_batch_size

# Try aggressive doubling first
for doubling_i in range(max_doubling_attempts):
tentative_e_ind = (
i + best_fit_e_ind * doubling_factor
) # Double the last best fit
tentative_e_ind = min(tentative_e_ind, len(num_tokens))
max_token = int(num_tokens[tentative_e_ind - 1])
est_memory = self.model.estimate_memory(
max_token, int(tentative_e_ind - i)
)

if est_memory <= self.model.max_mem_gb:
best_fit_e_ind = tentative_e_ind
else:
max_doubling_attempts = doubling_i # Reduce max doubling attempts
break # Exit loop if we exceed memory limit

for _ in range(max_steps):
tentative_e_ind = (
best_fit_e_ind + dynamic_step_size
) # Add dynamic step size
tentative_e_ind = min(tentative_e_ind, len(num_tokens))
max_token = int(num_tokens[tentative_e_ind - 1])

est_memory = self.model.estimate_memory(
max_token, int(tentative_e_ind - i)
)
# The closer we are to the end, the more we penalize the batch size
penalty_factor = 1 + 5.0 * ((max_token / max_seq_length) ** 2)
est_memory *= penalty_factor

if est_memory <= self.model.max_mem_gb:
best_fit_e_ind = tentative_e_ind
break
else:
dynamic_step_size //= 2 # halve the step size
decreasing_attempts += 1

splits.append(best_fit_e_ind)
i = best_fit_e_ind # Move to the next batch

return splits
4 changes: 2 additions & 2 deletions crossfit/backend/torch/op/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ def __init__(
pre=None,
cols=False,
keep_cols=None,
default_batch_size=1024,
batch_size=1024,
max_mem: str = "16GB",
sorted_data_loader: bool = True,
):
super().__init__(pre=pre, cols=cols, keep_cols=keep_cols)
self.model = model
self.batch_size = default_batch_size
self.batch_size = batch_size
self.max_mem = max_mem
self.max_mem_gb = int(self.max_mem.split("GB")[0]) / 2.5
self.sorted_data_loader = sorted_data_loader
Expand Down
7 changes: 6 additions & 1 deletion crossfit/dataset/beir/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,12 @@ def sample_raw(name, out_dir=None, overwrite=False, sample_size=100, blocksize=2
import cudf
import dask_cudf

full_path = download_raw(name, overwrite=overwrite)
# if we are running tests with `pytest tests/`, use testdata.
testdata_dir = os.path.join(os.getcwd(), "tests", "testdata", "beir", name)
if os.path.exists(testdata_dir):
full_path = testdata_dir
else:
full_path = download_raw(name, overwrite=overwrite)

out_dir = out_dir or CF_HOME
sampled_dir = os.path.join(out_dir, "sampled")
Expand Down
19 changes: 14 additions & 5 deletions crossfit/metric/ranking/precision.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import numpy as np

from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels
from crossfit.data.array.conversion import convert_array
from crossfit.data.array.masked import MaskedArray
from crossfit.metric.ranking.base import (
BinaryRankingMetric,
SparseBinaryLabels,
SparseLabels,
)


class Precision(BinaryRankingMetric):
Expand All @@ -12,7 +17,9 @@ def __init__(self, k, truncated=False):
def _precision(self, y_true: SparseBinaryLabels, y_pred_labels: MaskedArray):
n_pos = y_true.get_n_positives(y_pred_labels.shape[0])
n_relevant = np.sum(
(y_pred_labels.data[:, : self._k] == 1) & (~y_pred_labels.mask[:, : self._k]), axis=-1
(y_pred_labels.data[:, : self._k] == 1)
& (~y_pred_labels.mask[:, : self._k]),
axis=-1,
)

if self._truncated:
Expand All @@ -38,16 +45,18 @@ def _score(self, y_true, y_pred_labels):


class AP(BinaryRankingMetric):
def _score(self, y_true: SparseBinaryLabels, y_pred_labels: MaskedArray):
def _score(self, y_true: SparseLabels, y_pred_labels: MaskedArray):
n_pos = y_true.get_n_positives(y_pred_labels.shape[0])
labels = y_pred_labels[:, : self._k].filled(0)

ranks = np.arange(1, labels.shape[1] + 1, dtype=float).reshape(1, -1)
ranks = convert_array(ranks, type(y_pred_labels.data))

precision = np.cumsum(labels, axis=-1) / ranks
relevant = labels >= 1
precision = np.cumsum(relevant, axis=-1) / ranks

scores = np.zeros_like(n_pos, dtype=float)
scores[n_pos > 0] = np.sum(precision * labels, axis=-1)[n_pos > 0] / np.clip(
scores[n_pos > 0] = np.sum(precision * relevant, axis=-1)[n_pos > 0] / np.clip(
n_pos[n_pos > 0], None, self._k
)
scores[n_pos == 0] = np.NaN
Expand Down
4 changes: 2 additions & 2 deletions crossfit/report/beir/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from crossfit.report.beir.embed import embed
from crossfit.calculate.aggregate import Aggregator
from crossfit.metric.continuous.mean import Mean
from crossfit.metric.ranking import NDCG, Precision, Recall, SparseBinaryLabels, SparseRankings
from crossfit.metric.ranking import AP, NDCG, Precision, Recall, SparseBinaryLabels, SparseRankings
from crossfit.report.base import Report
from crossfit.op.vector_search import VectorSearchOp
from crossfit.backend.torch.model import Model
Expand All @@ -27,7 +27,7 @@ def __init__(
post_group=None,
post=None,
groupby=None,
metrics=[NDCG, Precision, Recall],
metrics=[NDCG, AP, Precision, Recall],
):
super().__init__(None, pre=pre, post_group=post_group, post=post, groupby=groupby)
self.ks = ks
Expand Down
3 changes: 1 addition & 2 deletions tests/backend/dask_backend/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

from crossfit.calculate.aggregate import Aggregator, metric_key
from crossfit.data.dataframe.dispatch import CrossFrame
from crossfit.metric.continuous.range import Range
from crossfit.metric import Mean

from crossfit.metric.continuous.range import Range
from tests.utils import is_leaf_node_instance_of, sample_df


Expand Down
3 changes: 1 addition & 2 deletions tests/backend/pytorch_backend/test_torch_convert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest

import numpy as np
import pytest
import torch

from crossfit.data import convert_array
Expand Down
6 changes: 2 additions & 4 deletions tests/backend/test_sklearn.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import pytest

import numpy as np
import pytest
from sklearn import metrics
from sklearn.utils.multiclass import type_of_target
from sklearn.utils._array_api import get_namespace
from sklearn.utils.multiclass import type_of_target

from crossfit.data import crossarray, np_backend_dispatch


arr1 = [1, 2, 3]
arr2 = [4, 5, 6]

Expand Down
3 changes: 1 addition & 2 deletions tests/data/array/test_conversion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest

import numpy as np
import pytest

from crossfit.data import convert_array

Expand Down
4 changes: 1 addition & 3 deletions tests/data/array/test_decorator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import pytest

import numpy as np

import pytest

from crossfit.data import crossarray
from crossfit.utils import test_utils
Expand Down
4 changes: 1 addition & 3 deletions tests/data/dataframe/test_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import pytest

import numpy as np
import pytest

from crossfit.backend.pandas.dataframe import PandasDataFrame
from crossfit.data.dataframe.core import ArrayBundle
from crossfit.data.dataframe.dispatch import CrossFrame


def test_pandas_frame():

arr1 = np.arange(10)
arr2 = np.ones(10)
arr3 = np.array(["cat", "dog"] * 5)
Expand Down
30 changes: 30 additions & 0 deletions tests/dataset/test_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

beir = pytest.importorskip("beir")

import os
import random

import crossfit as cf
from crossfit.dataset.beir.raw import BEIR_DATASETS

DATASETS = set(BEIR_DATASETS.keys())
DATASETS.discard("cqadupstack")
DATASETS.discard("germanquad")


@pytest.mark.singlegpu
@pytest.mark.parametrize("dataset", DATASETS)
def test_load_beir(dataset):
data = cf.load_dataset(f"beir/{dataset}", overwrite=True, tiny_sample=True)

for split in ["train", "val", "test"]:
split_data = getattr(data, split)

if split_data is None:
continue

split = split_data.ddf().compute()

assert split["query-index"].nunique() == split["query-id"].nunique()
assert split["query-id"].nunique() <= 100
Empty file.
Loading