Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,12 @@ Options:
# Memory Management
--cb-threshold TEXT k-NN Memory circuit breaker threshold

--ondisk Ondisk mode with binary quantization(32x compression)
--oversample-factor Controls the degree of oversampling applied to minority classes in imbalanced datasets to improve model performance by balancing class distributions.(default 1.0)


# Quantization Type
--quantization-type TEXT which type of quantization to use valid values [fp32, fp16]
--quantization-type TEXT which type of quantization to use valid values [fp32, fp16, bq]
--help Show this message and exit.
```
### Run OceanBase from command line
Expand Down
97 changes: 56 additions & 41 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,87 +65,104 @@ def __init__(
self._load_graphs_to_memory(client)

def _create_index(self, client: OpenSearch) -> None:
ef_search_value = self.case_config.ef_search
log.info(f"Creating index with ef_search: {ef_search_value}")
self._log_index_creation_info()
self._configure_cluster_settings(client)
settings = self._build_index_settings()
vector_field_config = self._build_vector_field_config()
mappings = self._build_mappings(vector_field_config)
self._create_opensearch_index(client, settings, mappings)

def _log_index_creation_info(self) -> None:
log.info(f"Creating index with ef_search: {self.case_config.ef_search}")
log.info(f"Creating index with number_of_replicas: {self.case_config.number_of_replicas}")

log.info(f"Creating index with engine: {self.case_config.engine}")
log.info(f"Creating index with metric type: {self.case_config.metric_type_name}")
log.info(f"All case_config parameters: {self.case_config.__dict__}")

def _configure_cluster_settings(self, client: OpenSearch) -> None:
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
"knn.memory.circuit_breaker.limit": self.case_config.cb_threshold,
}
}
client.cluster.put_settings(body=cluster_settings_body)
settings = {

def _build_index_settings(self) -> dict:
return {
"index": {
"knn": True,
"number_of_shards": self.case_config.number_of_shards,
"number_of_replicas": self.case_config.number_of_replicas,
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
"knn.advanced.approximate_threshold": "-1",
"knn.algo_param.ef_search": self.case_config.ef_search,
},
"refresh_interval": self.case_config.refresh_interval,
}
settings["index"]["knn.algo_param.ef_search"] = ef_search_value

# Get method configuration and log it for debugging
def _build_vector_field_config(self) -> dict:
method_config = self.case_config.index_param()
log.info(f"Raw method config from index_param(): {method_config}")

# For s3vector engine, ensure method only contains engine field
if self.case_config.engine == AWSOS_Engine.s3vector:
method_config = {"engine": "s3vector"}
log.info(f"Cleaned method config for s3vector: {method_config}")

# Prepare vector field configuration
vector_field_config = {
"type": "knn_vector",
"dimension": self.dim,
"method": method_config,
}
if self.case_config.on_disk:
space_type = self.case_config.parse_metric()
vector_field_config = {
"type": "knn_vector",
"dimension": self.dim,
"space_type": space_type,
"data_type": "float",
"mode": "on_disk",
"compression_level": "32x",
}
log.info("Using on-disk vector configuration with compression_level: 32x")
else:
vector_field_config = {
"type": "knn_vector",
"dimension": self.dim,
"method": method_config,
}

# For s3vector engine, space_type should be set at the vector field level
if self.case_config.engine == AWSOS_Engine.s3vector:
if self.case_config.on_disk:
log.info(f"Final on-disk vector field config: {vector_field_config}")
elif self.case_config.engine == AWSOS_Engine.s3vector:
space_type = self.case_config.parse_metric()
vector_field_config["space_type"] = space_type

# Ensure method config is absolutely clean for s3vector - remove any potential extra fields
vector_field_config["method"] = {"engine": "s3vector"}

log.info(f"Setting space_type '{space_type}' at vector field level for s3vector engine")
log.info(f"Final vector field config for s3vector: {vector_field_config}")
else:
log.info(f"Standard vector field config: {vector_field_config}")

# Configure mappings based on engine type
return vector_field_config

def _build_mappings(self, vector_field_config: dict) -> dict:
if self.case_config.engine == AWSOS_Engine.s3vector:
# For s3vector engine, use simplified mappings without _source configuration
mappings = {
"properties": {
# self.id_col_name: {"type": "integer", "store": True},
self.label_col_name: {"type": "keyword"},
self.vector_col_name: vector_field_config,
},
}
log.info("Using simplified mappings for s3vector engine (no _source configuration)")
else:
# For other engines (faiss, lucene), use standard mappings with _source configuration
mappings = {
"_source": {"excludes": [self.vector_col_name], "recovery_source_excludes": [self.vector_col_name]},
"properties": {
# self.id_col_name: {"type": "integer", "store": True},
self.label_col_name: {"type": "keyword"},
self.vector_col_name: vector_field_config,
},
}
log.info("Using standard mappings with _source configuration for non-s3vector engines")
return mappings

def _create_opensearch_index(self, client: OpenSearch, settings: dict, mappings: dict) -> None:
try:
log.info(f"Creating index with settings: {settings}")
log.info(f"Creating index with mappings: {mappings}")

# Additional logging for s3vector to confirm method config before sending
if self.case_config.engine == AWSOS_Engine.s3vector:
method_in_mappings = mappings["properties"][self.vector_col_name]["method"]
log.info(f"Final method config being sent to OpenSearch: {method_in_mappings}")
Expand All @@ -155,22 +172,21 @@ def _create_index(self, client: OpenSearch) -> None:
body={"settings": settings, "mappings": mappings},
)

# For s3vector, verify the actual index configuration after creation
if self.case_config.engine == AWSOS_Engine.s3vector:
try:
actual_mapping = client.indices.get_mapping(index=self.index_name)
actual_method = actual_mapping[self.index_name]["mappings"]["properties"][self.vector_col_name][
"method"
]
log.info(f"Actual method config in created index: {actual_method}")

except Exception as e:
log.warning(f"Failed to verify index configuration: {e}")
self._verify_s3vector_index_config(client)

except Exception as e:
log.warning(f"Failed to create index: {self.index_name} error: {e!s}")
raise e from None

def _verify_s3vector_index_config(self, client: OpenSearch) -> None:
try:
actual_mapping = client.indices.get_mapping(index=self.index_name)
actual_method = actual_mapping[self.index_name]["mappings"]["properties"][self.vector_col_name]["method"]
log.info(f"Actual method config in created index: {actual_method}")
except Exception as e:
log.warning(f"Failed to verify index configuration: {e}")

@contextmanager
def init(self) -> None:
"""connect to opensearch"""
Expand Down Expand Up @@ -366,11 +382,10 @@ def search_embedding(
"k": k,
"method_parameters": self.case_config.search_param(),
**({"filter": self.filter} if self.filter else {}),
**(
{"rescore": {"oversample_factor": self.case_config.oversample_factor}}
if self.case_config.use_quant
else {}
),
"rescore": {"oversample_factor": self.case_config.oversample_factor}
# if self.case_config.use_quant
# else {}
,
}
log.debug("Using standard knn query with method_parameters for non-s3vector engines")

Expand Down
24 changes: 23 additions & 1 deletion vectordb_bench/backend/clients/aws_opensearch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,33 @@ class AWSOpenSearchTypedDict(TypedDict):
str | None,
click.option(
"--quantization-type",
type=click.Choice(["fp32", "fp16"]),
type=click.Choice(["fp32", "fp16", "bq"]),
help="quantization type for vectors (in index)",
default="fp32",
required=False,
),
]

oversample_factor: Annotated[
float,
click.option(
"--oversample-factor",
type=float,
help="Oversample factor for vector search",
default=1.0,
),
]

on_disk: Annotated[
bool,
click.option(
"--on-disk",
is_flag=True,
help="Enable on-disk vector storage mode",
default=False,
),
]


class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor1): ...

Expand Down Expand Up @@ -187,6 +207,8 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
engine=engine,
quantization_type=AWSOSQuantization(parameters["quantization_type"]),
metric_type_name=parameters["metric_type"],
on_disk=parameters["on_disk"],
oversample_factor=parameters["oversample_factor"],
),
**parameters,
)
28 changes: 16 additions & 12 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class AWSOS_Engine(Enum):
class AWSOSQuantization(Enum):
fp32 = "fp32"
fp16 = "fp16"
bq = "bq"


class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
Expand All @@ -63,6 +64,7 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
use_routing: bool = False # for label-filter cases
oversample_factor: float = 1.0
quantization_type: AWSOSQuantization = AWSOSQuantization.fp32
on_disk: bool = False

def __eq__(self, obj: any):
return (
Expand All @@ -74,6 +76,7 @@ def __eq__(self, obj: any):
and self.number_of_segments == obj.number_of_segments
and self.use_routing == obj.use_routing
and self.quantization_type == obj.quantization_type
and self.on_disk == obj.on_disk
)

def __hash__(self) -> int:
Expand All @@ -87,6 +90,7 @@ def __hash__(self) -> int:
self.number_of_segments,
self.use_routing,
self.quantization_type,
self.on_disk,
)
)

Expand Down Expand Up @@ -116,6 +120,7 @@ def use_quant(self) -> bool:
def index_param(self) -> dict:
log.info(f"Using engine: {self.engine} for index creation")
log.info(f"Using metric_type: {self.metric_type_name} for index creation")
log.info(f"Using on_disk mode: {self.on_disk} for index creation")
space_type = self.parse_metric()
log.info(f"Resulting space_type: {space_type} for index creation")

Expand All @@ -124,26 +129,25 @@ def index_param(self) -> dict:
if self.engine == AWSOS_Engine.s3vector:
return {"engine": "s3vector"}

# For on-disk mode, return empty dict as no method config is needed
if self.on_disk:
return {}

parameters = {"ef_construction": self.efConstruction, "m": self.M}

if self.engine == AWSOS_Engine.faiss and self.quantization_type == AWSOSQuantization.fp16:
parameters["encoder"] = {"name": "sq", "parameters": {"type": "fp16"}}
# Add encoder configuration based on quantization type
if self.engine == AWSOS_Engine.faiss and self.use_quant:
if self.quantization_type == AWSOSQuantization.fp16:
parameters["encoder"] = {"name": "sq", "parameters": {"type": "fp16"}}
elif self.quantization_type == AWSOSQuantization.bq:
parameters["encoder"] = {"name": "binary", "parameters": {"bits": 1}}

# For other engines (faiss, lucene), space_type is set at method level
return {
"name": "hnsw",
"engine": self.engine.value,
"space_type": space_type,
"parameters": {
"ef_construction": self.efConstruction,
"m": self.M,
"ef_search": self.ef_search,
**(
{"encoder": {"name": "sq", "parameters": {"type": self.quantization_type.fp16.value}}}
if self.use_quant
else {}
),
},
"parameters": parameters,
}

def search_param(self) -> dict:
Expand Down