Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Incremental Indexing v1 #1318

Merged
merged 32 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82d1c4a
Create entypoint for cli and api (#1067)
AlonsoGuevara Aug 30, 2024
41ea554
Merge from main
AlonsoGuevara Sep 3, 2024
3399e6e
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 4, 2024
3295e2b
Merge from main
AlonsoGuevara Sep 5, 2024
67f4b02
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 10, 2024
87fb93f
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 11, 2024
8f71a02
Incremental indexing/file delta (#1123)
AlonsoGuevara Sep 11, 2024
b440e83
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 12, 2024
6cee670
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 18, 2024
00048b3
Merge from main
AlonsoGuevara Sep 23, 2024
bf45f42
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 25, 2024
4d713f6
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Sep 27, 2024
336e6f9
Update relationships after inc index (#1236)
AlonsoGuevara Oct 1, 2024
a44788b
Collapse create final community reports (#1227)
natoverse Sep 30, 2024
f259d0c
Collapse create base entity graph (#1233)
natoverse Sep 30, 2024
3103ae3
Collapse create summarized entities (#1237)
natoverse Oct 1, 2024
d501813
Collapse create base extracted entities (#1235)
natoverse Oct 1, 2024
6d23d6a
Incremental indexing/update final text units (#1241)
AlonsoGuevara Oct 2, 2024
43ec92e
merge from main
AlonsoGuevara Oct 2, 2024
2704eaf
Merge from main
AlonsoGuevara Oct 15, 2024
cb7d5ed
Add v1 community merge using time period (#1257)
AlonsoGuevara Oct 21, 2024
08d2cec
Merge from main
AlonsoGuevara Oct 23, 2024
3eb5171
Add config for incremental index + Bug fixes (#1317)
AlonsoGuevara Oct 24, 2024
9832f53
Merge from main
AlonsoGuevara Oct 24, 2024
c762736
Semversioner
AlonsoGuevara Oct 24, 2024
e518d43
Small refactor
AlonsoGuevara Oct 24, 2024
3f55117
Remove unused file
AlonsoGuevara Oct 24, 2024
41be5d8
Ruff
AlonsoGuevara Oct 24, 2024
c7b3846
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Oct 28, 2024
c8a35f1
Merge branch 'main' into incremental_indexing/main
AlonsoGuevara Oct 29, 2024
470b35c
Update verb tests inputs
AlonsoGuevara Oct 30, 2024
ac47578
Update verb tests inputs
AlonsoGuevara Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/minor-20241024220555036046.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "minor",
"description": "Add Incremental Indexing"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240930234415130922.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add relationship merge"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241002002557586548.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add text units update"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241008011651057484.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add naive community merge using time period"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241024201857589092.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add config for incremental updates"
}
3 changes: 2 additions & 1 deletion graphrag/api/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async def build_index(
config: GraphRagConfig,
run_id: str = "",
is_resume_run: bool = False,
is_update_run: bool = False,
memory_profile: bool = False,
progress_reporter: ProgressReporter | None = None,
emit: list[TableEmitterType] = [TableEmitterType.Parquet], # noqa: B006
Expand Down Expand Up @@ -54,6 +53,8 @@ async def build_index(
list[PipelineRunResult]
The list of pipeline run results
"""
is_update_run = bool(config.update_index_storage)

if is_resume_run and is_update_run:
msg = "Cannot resume and update a run at the same time."
raise ValueError(msg)
Expand Down
4 changes: 1 addition & 3 deletions graphrag/cli/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def index_cli(
root_dir: Path,
verbose: bool,
resume: str | None,
update_index_id: str | None,
memprofile: bool,
cache: bool,
reporter: ReporterType,
Expand All @@ -82,7 +81,7 @@ def index_cli(
"""Run the pipeline with the given config."""
progress_reporter = create_progress_reporter(reporter)
info, error, success = _logger(progress_reporter)
run_id = resume or update_index_id or time.strftime("%Y%m%d-%H%M%S")
run_id = resume or time.strftime("%Y%m%d-%H%M%S")

config = load_config(root_dir, config_filepath)
config.storage.base_dir = str(output_dir) if output_dir else config.storage.base_dir
Expand Down Expand Up @@ -123,7 +122,6 @@ def index_cli(
config=config,
run_id=run_id,
is_resume_run=bool(resume),
is_update_run=bool(update_index_id),
memory_profile=memprofile,
progress_reporter=progress_reporter,
emit=emit,
Expand Down
11 changes: 0 additions & 11 deletions graphrag/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ def _index_cli(
help="Skip any preflight validation. Useful when running no LLM steps."
),
] = False,
update_index: Annotated[
str | None,
typer.Option(
help="Update an index run id, leveraging previous outputs and applying new indexes."
),
] = None,
output: Annotated[
Path | None,
typer.Option(
Expand All @@ -119,15 +113,10 @@ def _index_cli(
] = None,
):
"""Build a knowledge graph index."""
if resume and update_index:
msg = "Cannot resume and update a run at the same time"
raise ValueError(msg)

index_cli(
root_dir=root,
verbose=verbose,
resume=resume,
update_index_id=update_index,
memprofile=memprofile,
cache=cache,
reporter=ReporterType(reporter),
Expand Down
21 changes: 21 additions & 0 deletions graphrag/config/create_graphrag_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,25 @@ def hydrate_parallelization_params(
container_name=reader.str(Fragment.container_name),
base_dir=reader.str(Fragment.base_dir) or defs.STORAGE_BASE_DIR,
)

with (
reader.envvar_prefix(Section.update_index_storage),
reader.use(values.get("update_index_storage")),
):
s_type = reader.str(Fragment.type)
if s_type:
update_index_storage_model = StorageConfig(
type=StorageType(s_type) if s_type else defs.STORAGE_TYPE,
connection_string=reader.str(Fragment.conn_string),
storage_account_blob_url=reader.str(
Fragment.storage_account_blob_url
),
container_name=reader.str(Fragment.container_name),
base_dir=reader.str(Fragment.base_dir)
or defs.UPDATE_STORAGE_BASE_DIR,
)
else:
update_index_storage_model = None
AlonsoGuevara marked this conversation as resolved.
Show resolved Hide resolved
with reader.envvar_prefix(Section.chunk), reader.use(values.get("chunks")):
group_by_columns = reader.list("group_by_columns", "BY_COLUMNS")
if group_by_columns is None:
Expand Down Expand Up @@ -547,6 +566,7 @@ def hydrate_parallelization_params(
embed_graph=embed_graph_model,
reporting=reporting_model,
storage=storage_model,
update_index_storage=update_index_storage_model,
cache=cache_model,
input=input_model,
chunks=chunks_model,
Expand Down Expand Up @@ -624,6 +644,7 @@ class Section(str, Enum):
storage = "STORAGE"
summarize_descriptions = "SUMMARIZE_DESCRIPTIONS"
umap = "UMAP"
update_index_storage = "UPDATE_INDEX_STORAGE"
local_search = "LOCAL_SEARCH"
global_search = "GLOBAL_SEARCH"

Expand Down
1 change: 1 addition & 0 deletions graphrag/config/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
STORAGE_TYPE = StorageType.file
SUMMARIZE_DESCRIPTIONS_MAX_LENGTH = 500
UMAP_ENABLED = False
UPDATE_STORAGE_BASE_DIR = "update_output"

VECTOR_STORE = f"""
type: {VectorStoreType.LanceDB.value}
Expand Down
6 changes: 6 additions & 0 deletions graphrag/config/models/graph_rag_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def __str__(self):
)
"""The storage configuration."""

update_index_storage: StorageConfig | None = Field(
description="The storage configuration for the updated index.",
default=None,
)
"""The storage configuration for the updated index."""

cache: CacheConfig = Field(
description="The cache configuration.", default=CacheConfig()
)
Expand Down
12 changes: 12 additions & 0 deletions graphrag/config/resolve_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ def resolve_paths(
)
)

if (
config.update_index_storage
and config.update_index_storage.type == StorageType.file
):
config.update_index_storage.base_dir = str(
resolve_path(
config.update_index_storage.base_dir,
config.root_dir,
pattern_or_timestamp_value,
)
)

if config.reporting.type == ReportingType.file:
config.reporting.base_dir = str(
resolve_path(
Expand Down
5 changes: 5 additions & 0 deletions graphrag/index/config/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def __str__(self):
)
"""The storage configuration for the pipeline."""

update_index_storage: PipelineStorageConfigTypes | None = pydantic_Field(
default=None, discriminator="type"
)
"""The storage configuration for the updated index."""

cache: PipelineCacheConfigTypes | None = pydantic_Field(
default=None, discriminator="type"
)
Expand Down
29 changes: 16 additions & 13 deletions graphrag/index/create_pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
StorageType,
TextEmbeddingTarget,
)
from graphrag.config.models import (
GraphRagConfig,
TextEmbeddingConfig,
)
from graphrag.config.models import GraphRagConfig, StorageConfig, TextEmbeddingConfig
from graphrag.index.config.cache import (
PipelineBlobCacheConfig,
PipelineCacheConfigTypes,
Expand Down Expand Up @@ -118,7 +115,10 @@ def create_pipeline_config(settings: GraphRagConfig, verbose=False) -> PipelineC
root_dir=settings.root_dir,
input=_get_pipeline_input_config(settings),
reporting=_get_reporting_config(settings),
storage=_get_storage_config(settings),
storage=_get_storage_config(settings, settings.storage),
update_index_storage=_get_storage_config(
settings, settings.update_index_storage
),
cache=_get_cache_config(settings),
workflows=[
*_document_workflows(settings, embedded_fields),
Expand Down Expand Up @@ -469,23 +469,26 @@ def _get_reporting_config(

def _get_storage_config(
settings: GraphRagConfig,
) -> PipelineStorageConfigTypes:
storage_settings: StorageConfig | None,
) -> PipelineStorageConfigTypes | None:
"""Get the storage type from the settings."""
if not storage_settings:
return None
root_dir = settings.root_dir
match settings.storage.type:
match storage_settings.type:
case StorageType.memory:
return PipelineMemoryStorageConfig()
case StorageType.file:
# relative to the root_dir
base_dir = settings.storage.base_dir
base_dir = storage_settings.base_dir
if base_dir is None:
msg = "Base directory must be provided for file storage."
raise ValueError(msg)
return PipelineFileStorageConfig(base_dir=str(Path(root_dir) / base_dir))
case StorageType.blob:
connection_string = settings.storage.connection_string
storage_account_blob_url = settings.storage.storage_account_blob_url
container_name = settings.storage.container_name
connection_string = storage_settings.connection_string
storage_account_blob_url = storage_settings.storage_account_blob_url
container_name = storage_settings.container_name
if container_name is None:
msg = "Container name must be provided for blob storage."
raise ValueError(msg)
Expand All @@ -495,12 +498,12 @@ def _get_storage_config(
return PipelineBlobStorageConfig(
connection_string=connection_string,
container_name=container_name,
base_dir=settings.storage.base_dir,
base_dir=storage_settings.base_dir,
storage_account_blob_url=storage_account_blob_url,
)
case _:
# relative to the root_dir
base_dir = settings.storage.base_dir
base_dir = storage_settings.base_dir
if base_dir is None:
msg = "Base directory must be provided for file storage."
raise ValueError(msg)
Expand Down
10 changes: 10 additions & 0 deletions graphrag/index/flows/create_final_communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""All the steps to transform final communities."""

from datetime import datetime, timezone

import pandas as pd
from datashaper import (
VerbCallbacks,
Expand Down Expand Up @@ -61,6 +63,12 @@ def create_final_communities(

filtered["title"] = "Community " + filtered["id"].astype(str)

# Add period timestamp to the community reports
filtered["period"] = datetime.now(timezone.utc).date().isoformat()

# Add size of the community
filtered["size"] = filtered.loc[:, "text_unit_ids"].apply(lambda x: len(x))
AlonsoGuevara marked this conversation as resolved.
Show resolved Hide resolved

return filtered.loc[
:,
[
Expand All @@ -69,5 +77,7 @@ def create_final_communities(
"level",
"relationship_ids",
"text_unit_ids",
"period",
"size",
],
]
11 changes: 10 additions & 1 deletion graphrag/index/flows/create_final_community_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
async def create_final_community_reports(
nodes_input: pd.DataFrame,
edges_input: pd.DataFrame,
communities_input: pd.DataFrame,
claims_input: pd.DataFrame | None,
callbacks: VerbCallbacks,
cache: PipelineCache,
Expand Down Expand Up @@ -118,7 +119,15 @@ async def create_final_community_reports(
embedding_name="community_report_title",
)

return community_reports
# Merge by community and it with communities to add size and period
return community_reports.merge(
communities_input.loc[:, ["id", "size", "period"]],
left_on="community",
right_on="id",
how="left",
copy=False,
suffixes=("", "_y"),
).drop(columns=["id_y"])


def _prep_nodes(input: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def prep_community_report_context(
invalid_context_df, max_tokens
)
set_context_size(invalid_context_df)
invalid_context_df[schemas.CONTEXT_EXCEED_FLAG] = 0
invalid_context_df.loc[:, schemas.CONTEXT_EXCEED_FLAG] = 0
return union(valid_context_df, invalid_context_df)

level_context_df = _antijoin_reports(level_context_df, report_df)
Expand Down
6 changes: 4 additions & 2 deletions graphrag/index/graph/extractors/community_reports/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

def set_context_size(df: pd.DataFrame) -> None:
"""Measure the number of tokens in the context."""
df[schemas.CONTEXT_SIZE] = df[schemas.CONTEXT_STRING].apply(lambda x: num_tokens(x))
df.loc[:, schemas.CONTEXT_SIZE] = df.loc[:, schemas.CONTEXT_STRING].apply(
lambda x: num_tokens(x)
)


def set_context_exceeds_flag(df: pd.DataFrame, max_tokens: int) -> None:
"""Set a flag to indicate if the context exceeds the limit."""
df[schemas.CONTEXT_EXCEED_FLAG] = df[schemas.CONTEXT_SIZE].apply(
df.loc[:, schemas.CONTEXT_EXCEED_FLAG] = df.loc[:, schemas.CONTEXT_SIZE].apply(
lambda x: x > max_tokens
)

Expand Down
6 changes: 6 additions & 0 deletions graphrag/index/init_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
# connection_string: <azure_blob_storage_connection_string>
# container_name: <azure_blob_storage_container_name>
update_index_storage: # Storage to save an updated index (for incremental indexing). Enabling this performs an incremental index run
# type: {defs.STORAGE_TYPE.value} # or blob
# base_dir: "{defs.UPDATE_STORAGE_BASE_DIR}"
# connection_string: <azure_blob_storage_connection_string>
# container_name: <azure_blob_storage_container_name>
reporting:
type: {defs.REPORTING_TYPE.value} # or console, blob
base_dir: "{defs.REPORTING_BASE_DIR}"
Expand Down
Loading
Loading