Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ To enable chunk window expansion (context retrieval around matched chunks), conf

| Field | Description | Required | OKP RAG Proto Example |
| ------- | ------------- | ---------- | --------- |
| `chunk_parent_id_field` | Chunk's parent document ID | Yes | `"parent_id"` |
| `chunk_index_field` | Chunk's sequential position position | Yes | `"chunk_index"` |
| `chunk_token_count_field` | Token count for the chunk | Yes | `"num_tokens"` |
| `chunk_filter_query` | Filter to identify chunk documents | Yes | `"is_chunk:true"` |
| `chunk_parent_id_field` | Chunk's parent document ID | Yes | `"parent_id"` |
| `chunk_index_field` | Chunk's sequential position | Yes | `"chunk_index"` |
| `chunk_token_count_field` | Token count for the chunk | Yes | `"num_tokens"` |
| `chunk_filter_query` | Filter to identify chunk documents | Yes | `"is_chunk:true"` |
| `chunk_expansion_boundary_fields` | Fields that constrain window expansion | No | `["parent_id", "heading"]` |

**Parent Document Fields:**

Expand Down Expand Up @@ -115,7 +116,8 @@ config = SolrVectorIOConfig(
parent_content_id_field="doc_id",
parent_content_title_field="title",
parent_content_url_field="reference_url",
chunk_filter_query="is_chunk:true"
chunk_filter_query="is_chunk:true",
chunk_expansion_boundary_fields=["parent_id"],
)
)
```
Expand Down Expand Up @@ -148,6 +150,12 @@ config = SolrVectorIOConfig(
token_budget=2048, # Max tokens per context window
min_chunk_gap=4, # Min spacing between chunks from same doc
min_chunk_window=4, # Min chunks before windowing applies

# Optional: Boundary fields constrain expansion to chunks sharing
# the same values for these fields as the matched chunk.
# chunk_parent_id_field is always used as a boundary regardless
# of this setting. Default is None (no additional boundaries).
chunk_expansion_boundary_fields=["parent_id", "heading"],
)
)

Expand All @@ -168,6 +176,7 @@ This feature:
- Expands bidirectionally within token budget limits
- Prevents duplicate context from the same document
- Returns concatenated chunk content as a single result
- Optionally constrains expansion to chunks sharing the same values for configured boundary fields (e.g. heading/section)

## Hybrid Search

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ class ChunkWindowConfig(BaseModel):
default=4, description="Min number of chunks before windowing applies"
)

# ---- Expansion boundary fields ----
chunk_expansion_boundary_fields: list[str] | None = Field(
default=None,
description=(
"Additional chunk field names that constrain window expansion (e.g. ['heading']). "
"chunk_parent_id_field is always used as a boundary regardless of this setting. "
"When set, only chunks sharing the anchor chunk's values for these fields are included."
),
)


@json_schema_type
class SolrVectorIOConfig(BaseModel):
Expand Down Expand Up @@ -173,5 +183,6 @@ def sample_run_config(
# "token_budget": 2048,
# "min_chunk_gap": 4,
# "min_chunk_window": 4,
# "chunk_expansion_boundary_fields": ["parent_id", "heading"],
# }
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async def query_keyword(

# Apply score threshold
if score < score_threshold:
log.info(
log.debug(
f"Filtering out document with score {score} < threshold {
score_threshold
}"
Expand Down Expand Up @@ -358,9 +358,6 @@ async def query_hybrid(
}
# Note: keyword_boost can be incorporated in future by discovering schema fields

print("========== HYBRID SEARCH PARAMS ==========")
print(data_params)

# Add filter query for chunk documents if schema is configured
if self.chunk_window_config and self.chunk_window_config.chunk_filter_query:
data_params["fq"] = self.chunk_window_config.chunk_filter_query
Expand Down Expand Up @@ -392,7 +389,7 @@ async def query_hybrid(

# Apply score threshold
if score < score_threshold:
log.info(
log.debug(
f"Filtering out document with score {score} < threshold {
score_threshold
}"
Expand Down Expand Up @@ -503,6 +500,7 @@ async def _fetch_context_chunks(
parent_id: str,
window_start: int,
window_end: int,
boundary_values: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""
Fetch chunks within a specified index range for a parent document.
Expand All @@ -512,6 +510,8 @@ async def _fetch_context_chunks(
parent_id: ID of the parent document
window_start: Start index (inclusive)
window_end: End index (inclusive)
boundary_values: Optional dict of field name -> value pairs that
chunks must match (e.g. {'parent_id': 'doc1', 'heading': 'Intro'})

Returns:
List of chunk documents sorted by chunk_index
Expand All @@ -526,12 +526,26 @@ async def _fetch_context_chunks(
schema.chunk_parent_id_field,
]

# Add boundary fields to the field list so they're returned
if schema.chunk_expansion_boundary_fields:
for field in schema.chunk_expansion_boundary_fields:
if field not in fields:
fields.append(field)

# Build query
query_parts = [
f'{schema.chunk_parent_id_field}:"{parent_id}"',
f"{schema.chunk_index_field}:[{window_start} TO {window_end}]",
]

# Add boundary field filters
if boundary_values:
for field_name, field_value in boundary_values.items():
# Skip parent_id since it's already in the query
if field_name == schema.chunk_parent_id_field:
continue
query_parts.append(f'{field_name}:"{field_value}"')

# Add filter query if configured
if schema.chunk_filter_query:
query_parts.append(schema.chunk_filter_query)
Expand All @@ -541,7 +555,8 @@ async def _fetch_context_chunks(
try:
log.info(
f"Fetching context chunks: parent_id={parent_id}, "
f"range=[{window_start}, {window_end}]"
f"range=[{window_start}, {window_end}], "
f"boundary_values={boundary_values}"
)
response = await client.get(
f"{self.base_url}/select",
Expand Down Expand Up @@ -646,14 +661,24 @@ async def _apply_chunk_window_expansion(
total_chunks = parent_doc.get(schema.parent_total_chunks_field, 0)
total_tokens = parent_doc.get(schema.parent_total_tokens_field, 0)

# Build boundary values from matched chunk metadata
boundary_values = None
if schema.chunk_expansion_boundary_fields:
boundary_values = {}
for field in schema.chunk_expansion_boundary_fields:
value = chunk.metadata.get(field)
if value is not None:
boundary_values[field] = value

# If short doc, return all chunks
if total_chunks < min_chunk_window or total_tokens <= token_budget:
log.info(
f"Document is short (total_chunks={total_chunks}, "
f"total_tokens={total_tokens}), fetching all chunks"
)
context_chunks = await self._fetch_context_chunks(
client, parent_id, 0, max(0, total_chunks - 1)
client, parent_id, 0, max(0, total_chunks - 1),
boundary_values=boundary_values,
)
selected_chunks = context_chunks
else:
Expand All @@ -670,7 +695,8 @@ async def _apply_chunk_window_expansion(
f"around match at index {matched_chunk_index}"
)
context_chunks = await self._fetch_context_chunks(
client, parent_id, window_start, window_end
client, parent_id, window_start, window_end,
boundary_values=boundary_values,
)

if not context_chunks:
Expand Down Expand Up @@ -795,7 +821,7 @@ def _expand_chunk_window(
total_tokens += next_tokens
left -= 1
added = True
log.info(
log.debug(
f"Added left chunk at index {left}, total_tokens={total_tokens}"
)

Expand All @@ -808,7 +834,7 @@ def _expand_chunk_window(
total_tokens += next_tokens
right += 1
added = True
log.info(f"Added right chunk at index {right - 1}, total_tokens={
log.debug(f"Added right chunk at index {right - 1}, total_tokens={
total_tokens
}")

Expand All @@ -829,7 +855,7 @@ def _expand_chunk_window(
def _doc_to_chunk(self, doc: dict[str, Any]) -> Chunk | None:
try:
if not doc.get("is_chunk", True):
log.info("Skipping non-chunk document")
log.debug("Skipping non-chunk document")
return None

content = doc.get(self.content_field)
Expand Down Expand Up @@ -1066,7 +1092,7 @@ async def _get_and_cache_vector_store_index(
self, vector_store_id: str
) -> VectorStoreWithIndex:
if vector_store_id in self.cache:
log.info(f"Retrieved vector store from cache: {vector_store_id}")
log.debug(f"Retrieved vector store from cache: {vector_store_id}")
return self.cache[vector_store_id]

log.info(f"Vector store not in cache, loading from table: {vector_store_id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def config_with_chunk_window():
parent_content_title_field="title",
parent_content_url_field="reference_url",
chunk_filter_query="is_chunk:true",
chunk_expansion_boundary_fields=["parent_id"],
),
)

Expand Down
Loading