Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dead code from dataset.py #114

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions tests/unit/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@


class TestDataset:

def test_limit(self):
limit = 123
name = "mnist"
dataset = Dataset(name, limit=limit)
# Sanity check that the complete dataset size is greater than what
# we are going to limit to.
dataset_info = [d for d in dataset.list() if d["name"] == name][0]
assert (
dataset_info["documents"] > limit
), "Too few documents in dataset to be able to limit"

dataset.load_documents()
assert len(dataset.documents) == limit

def test_get_batch_iter_all(self):
# Test a batch iter for a single chunk yields the entire dataset.
dataset = Dataset("mnist")
Expand Down
121 changes: 4 additions & 117 deletions vsb/workloads/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def __init__(self, name: str = "", cache_dir: str = "", limit: int = 0):
self.name = name
self.cache = pathlib.Path(cache_dir)
self.limit = limit
self.documents = pandas.DataFrame()
self.queries = pandas.DataFrame()

@staticmethod
Expand All @@ -78,17 +77,6 @@ def list():
datasets.append(json.loads(m.download_as_bytes()))
return datasets

def load_documents(self, skip_download: bool = False):
"""
Load the dataset, populating the 'documents' and 'queries' DataFrames.
"""
if not skip_download:
self._download_dataset_files()

# Load the parquet dataset (made up of one or more parquet files),
# to use for documents into a pandas dataframe.
self.documents = self._load_parquet_dataset("passages", limit=self.limit)

def get_batch_iterator(
self, num_chunks: int, chunk_id: int, batch_size: int
) -> Iterator[pyarrow.RecordBatch]:
Expand Down Expand Up @@ -159,79 +147,13 @@ def files_to_batches(files: list):

return files_to_batches(my_chunks)

def setup_queries(
self, load_queries: bool = True, doc_sample_fraction: float = 1.0, query_limit=0
):
def setup_queries(self, query_limit=0):
# If there is an explicit 'queries' dataset, then load that and use
# for querying, otherwise use documents directly.
if load_queries:
self._download_dataset_files()
self.queries = self._load_parquet_dataset("queries", limit=query_limit)
if not self.queries.empty:
logger.debug(
f"Using {len(self.queries)} query vectors loaded from dataset 'queries' table"
)
else:
# Queries expect a different schema than documents.
# Documents looks like:
# ["id", "values", "sparse_values", "metadata"]
# Queries looks like:
# ["vector", "sparse_vector", "filter", "top_k"]
#
# Extract 'values' and rename to query schema (only
# 'vector' field of queries is currently used).
if self.documents.empty:
self.load_documents()
assert (
not self.documents.empty
), "Cannot sample 'documents' to use for queries as it is empty"
self.queries = self.documents[["values"]].copy()
self.queries.rename(columns={"values": "vector"}, inplace=True)

# Use a sampling of documents for queries (to avoid
# keeping a large complete dataset in memory for each
# worker process).
self.queries = self.queries.sample(frac=doc_sample_fraction, random_state=1)
logger.info(
f"Using {doc_sample_fraction * 100}% of documents' dataset "
f"for query data ({len(self.queries)} sampled)"
)

def upsert_into_index(
self, index_host, api_key, skip_if_count_identical: bool = False
):
"""
Upsert the datasets' documents into the specified index.
:param index_host: Pinecone index to upsert into (must already exist)
:param skip_if_count_identical: If true then skip upsert if the index already contains the same number of
vectors as the dataset.
"""
pinecone = PineconeGRPC(api_key)
index = pinecone.Index(host=index_host)
if skip_if_count_identical:
if index.describe_index_stats()["total_vector_count"] == len(
self.documents
):
logger.info(
f"Skipping upsert as index already has same number of documents as dataset ({len(self.documents)}"
)
return

upserted_count = self._upsert_from_dataframe(index)
if upserted_count != len(self.documents):
logger.warning(
f"Not all records upserted successfully. Dataset count:{len(self.documents)},"
f" upserted count:{upserted_count}"
)

def prune_documents(self):
"""
Discard the contents of self.documents once it is no longer required
(it can consume a significant amount of memory).
"""
del self.documents
self._download_dataset_files()
self.queries = self._load_parquet_dataset("queries", limit=query_limit)
logger.debug(
f"After pruning, 'queries' memory usage:{self.queries.memory_usage()}"
f"Using {len(self.queries)} query vectors loaded from dataset 'queries' table"
)

def _download_dataset_files(self):
Expand Down Expand Up @@ -384,41 +306,6 @@ def prepare_metadata(metadata):
logger.debug(f"Loaded {len(df)} vectors of kind '{kind}'")
return df

def _upsert_from_dataframe(self, index):
"""
Note: using PineconeGRPC.Index.upsert_from_dataframe() directly
results in intermittent failures against serverless indexes as
we can hit the request limit:
grpc._channel._MultiThreadedRendezvous: < _MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Too many requests. Please retry shortly"
I haven't observed this with the HTTP Pinecone.Index, however the
gRPC one is so much faster for bulk loads we really want to keep using
gRPC. As such, we have our own version of upsert from dataframe which
handles this error with backoff and retry.
"""

# Solution is somewhat naive - simply chunk the dataframe into
# chunks of a smaller size, and pass each chunk to upsert_from_dataframe.
# We still end up with multiple vectors in progress at once, but we
# limit it to a finite amount and not the entire dataset.
upserted_count = 0
for sub_frame in Dataset.split_dataframe(self.documents, 10000):
# The 'values' column in the DataFrame is a pyarrow type (list<item: double>[pyarrow])
# as it was read using the pandas.ArrowDtype types_mapper (see _load_parquet_dataset).
# This _can_ be automatically converted to a Python list object inside upsert_from_dataframe,
# but it is slow, as at that level the DataFrame is iterated row-by-row and the conversion
# happens one element at a time.
# However, converting the entire sub-frame's column back to a Python object before calling
# upsert_from_dataframe() is significantly faster, such that the overall upsert throughput
# (including the actual server-side work) is around 2x greater if we pre-convert.
converted = sub_frame.astype(dtype={"values": object})
resp = index.upsert_from_dataframe(
converted, batch_size=200, show_progress=False
)
upserted_count += resp.upserted_count
return upserted_count

def _get_set_of_passages_columns_to_read(self, dset: ds.Dataset):
# 'passages' format used by benchmarking datasets (e.g. mnist,
# nq-769-tasb, yfcc, ...) always have 'id' and 'values' fields;
Expand Down
2 changes: 1 addition & 1 deletion vsb/workloads/parquet_workload/parquet_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
super().__init__(name)
self.dataset = Dataset(dataset_name, cache_dir=cache_dir, limit=limit)

self.dataset.setup_queries(load_queries=True, query_limit=query_limit)
self.dataset.setup_queries(query_limit=query_limit)
self.queries = self.dataset.queries.itertuples(index=False)

def get_sample_record(self) -> Record:
Expand Down