diff --git a/comps/dataprep/src/integrations/neo4j_llamaindex.py b/comps/dataprep/src/integrations/neo4j_llamaindex.py index a12c930597..599432dac4 100644 --- a/comps/dataprep/src/integrations/neo4j_llamaindex.py +++ b/comps/dataprep/src/integrations/neo4j_llamaindex.py @@ -33,7 +33,6 @@ from llama_index.llms.openai import OpenAI from llama_index.llms.openai_like import OpenAILike from neo4j import GraphDatabase -from transformers import AutoTokenizer from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType from comps.cores.proto.api_protocol import DataprepRequest, Neo4jDataprepRequest @@ -68,7 +67,9 @@ NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "neo4jtest") # LLM/Embedding endpoints +# TGI_LLM_ENDPOINT allows indexing graph nodes using openai-like endpoints (not just TGI) TGI_LLM_ENDPOINT = os.getenv("TGI_LLM_ENDPOINT", f"http://{host_ip}:6005") +TGI_LLM_ENDPOINT_KEY = os.getenv("TGI_LLM_ENDPOINT_KEY", "fake") TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT", f"http://{host_ip}:6006") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") @@ -80,6 +81,23 @@ MAX_OUTPUT_TOKENS = os.getenv("MAX_OUTPUT_TOKENS", "1024") +def get_hf_tokenizer(model_name): + """Attempt to load a HuggingFace tokenizer from string. + + Args: + model_name (str): Name of the model to load tokenizer for + + Returns: + AutoTokenizer or None: The loaded tokenizer if successful, None otherwise + """ + try: + from transformers import AutoTokenizer + + return AutoTokenizer.from_pretrained(model_name) + except Exception: + return None + + class GraphRAGStore(Neo4jPropertyGraphStore): # https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/cookbooks/GraphRAG_v2.ipynb community_summary = {} @@ -92,39 +110,192 @@ def __init__(self, username: str, password: str, url: str, llm: LLM): self.driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) async def generate_community_summary(self, text): - """Generate summary for a given text using an LLM.""" + """Generate summary of extracted entity/relationships using an LLM. + + This method may trim the relationships if tokens exceed LLM context length with the following logic: + 1. For Hugging Face models: Uses AutoTokenizer to accurately count tokens + 2. For other models: Uses character-based token estimation with a conservative ratio. If + the the text is still longer than the max input tokens, it will just use first 5 relationships as + a summary. + """ model_name = LLM_MODEL_ID - max_input_length = int(MAX_INPUT_TOKENS) - if not model_name or not max_input_length: - raise ValueError(f"Could not retrieve model information from TGI endpoint: {TGI_LLM_ENDPOINT}") - - tokenizer = AutoTokenizer.from_pretrained(model_name) - - messages = [ - ChatMessage( - role="system", - content=( - "You are provided with a set of relationships from a knowledge graph, each represented as " - "entity1->entity2->relation->relationship_description. Your task is to create a summary of these " - "relationships. The summary should include the names of the entities involved and a concise synthesis " - "of the relationship descriptions. The goal is to capture the most critical and relevant details that " - "highlight the nature and significance of each relationship. Ensure that the summary is coherent and " - "integrates the information in a way that emphasizes the key aspects of the relationships." - ), - ), - ChatMessage(role="user", content=text), - ] - # Trim the messages to fit within the token limit - # Microsoft does more sophisticated content optimization - trimmed_messages = trim_messages_to_token_limit(tokenizer, messages, max_input_length) + max_input_tokens = int(MAX_INPUT_TOKENS) + + # Define system prompt + system_prompt = ( + "You are provided with a set of relationships from a knowledge graph, each represented as " + "entity1->entity2->relation->relationship_description. Your task is to create a summary of these " + "relationships. The summary should include the names of the entities involved and a concise synthesis " + "of the relationship descriptions. The goal is to capture the most critical and relevant details that " + "highlight the nature and significance of each relationship. Ensure that the summary is coherent and " + "integrates the information in a way that emphasizes the key aspects of the relationships." + ) - if OPENAI_API_KEY: - response = OpenAI().achat(messages) - else: - response = await self.llm.achat(trimmed_messages) + messages = None + was_trimmed = False + + try: + tokenizer = get_hf_tokenizer(model_name) + if tokenizer: + logger.info(f"Using Hugging Face tokenizer to check context limit for model: {model_name}") + original_messages = [ + ChatMessage(role="system", content=system_prompt), + ChatMessage(role="user", content=text), + ] + + messages = self._trim_messages_to_token_limit(tokenizer, original_messages, max_input_tokens) + was_trimmed = ( + len(messages) < len(original_messages) or messages[-1].content != original_messages[-1].content + ) + if was_trimmed: + logger.info( + f"Content trimmed using {model_name} tokenizer to fit within max {max_input_tokens} tokens." + ) + messages[ + 0 + ].content += " Note: Due to token limits, only a subset of the relationships are summarized." + else: + raise ValueError( + f"Could not load Hugging Face tokenizer for model {model_name} for trimming - using character-based token estimation" + ) + + except Exception as e: + logger.info(f"Using character-based token estimation and will trim if needed: {str(e)}") + trimmed_text, was_trimmed = self._trim_messages_with_estimated_tokens(text, system_prompt, max_input_tokens) + + system_content = system_prompt + if was_trimmed: + system_content += " Note: Due to token limits, only a subset of the relationships are provided." + + messages = [ + ChatMessage(role="system", content=system_content), + ChatMessage(role="user", content=trimmed_text), + ] - clean_response = re.sub(r"^assistant:\s*", "", str(response)).strip() - return clean_response + try: + # Create the relationship summaries. + if OPENAI_API_KEY: + response = await OpenAI().achat(messages) + else: + response = await self.llm.achat(messages) + + clean_response = re.sub(r"^assistant:\s*", "", str(response.message.content)).strip() + return clean_response + + except Exception as e: + logger.error(f"Error generating community summary: {str(e)}") + # Use a fallback summary by listing the first 5 relationships + fallback_relationships = 5 + relationships = text.split("\n") + important_rels = relationships[: min(fallback_relationships, len(relationships))] + fallback_summary = "Summary could not be generated. Key relationships include:\n\n" + for rel in important_rels: + fallback_summary += f"- {rel}\n" + + logger.warning("Returning fallback summary with important relationships due to LLM error") + return fallback_summary + + def _trim_messages_with_estimated_tokens(self, text, system_prompt, max_input_tokens): + """Trim input text to fit within token limits (if needed) using character-based + token count estimation. + + A fall-back method can be used if there is an LLM error still i.e. this method + should have trimmed more. + + This method: + 1. Uses character count to approximate token count, with conservative buffer. + 2. Splits text by relationships (newlines) + 3. Keeps the first relationship lines before estimated tokens exceeds the token limit. + + Args: + text: The input text, typically containing relationship descriptions + system_prompt: The system prompt text + max_input_tokens: Maximum allowed input tokens + + Returns: + Tuple of (trimmed_text, was_trimmed) + """ + # Character-based token estimation (conservative 4 chars per token) + chars_per_token = 4 + estimated_system_tokens = len(system_prompt) // chars_per_token + + # Reserve tokens for system prompt and safety buffer + available_tokens = max_input_tokens - estimated_system_tokens - 100 # 100 token buffer + available_chars = available_tokens * chars_per_token + + # If text fits within limits, return as-is + if len(text) <= available_chars: + return text, False + + # Otherwise if text exceeds limit, need to trim. + # Will trim by relationships (typically one per line) + relationships = text.split("\n") + + # Start with empty trimmed text and add relationships until we approach the limit + trimmed_text = [] + current_chars = 0 + + for rel in relationships: + # Add 1 for the newline character + rel_chars = len(rel) + 1 + if current_chars + rel_chars > available_chars: + # We've reached the limit + break + + trimmed_text.append(rel) + current_chars += rel_chars + + result = "\n".join(trimmed_text) + warning = "\n\n[NOTE: Some relationships were omitted due to estimated max token limits.]" + + # Make sure adding the warning doesn't exceed the limit + if len(result) + len(warning) <= available_chars: + result += warning + + percentage_kept = (len(result) / len(text)) * 100 + logger.info( + f"Text trimmed to fit token limits. Kept {percentage_kept:.1f}% of original content ({len(result)} chars of {len(text)})." + ) + + return result, True + + def _trim_messages_to_token_limit(self, tokenizer, messages, max_tokens): + """Trim the messages to fit within the token limit using a HuggingFace tokenizer. + + This method is used when a HuggingFace model is available, allowing for + precise token counting and trimming based on the model's specific tokenizer. + + Args: + tokenizer: A HuggingFace tokenizer instance + messages: List of ChatMessage objects + max_tokens: Maximum allowed input tokens + + Returns: + List of trimmed ChatMessage objects + """ + total_tokens = 0 + trimmed_messages = [] + buffer = 100 + effective_max_tokens = max_tokens - buffer + + for message in messages: + tokens = tokenizer.tokenize(message.content) + message_token_count = len(tokens) + if total_tokens + message_token_count > effective_max_tokens: + # Trim the message to fit within the remaining token limit + logger.info(f"Trimming messages: {total_tokens + message_token_count} > {effective_max_tokens}") + logger.info(f"message_token_count: {message_token_count}") + remaining_tokens = effective_max_tokens - total_tokens + logger.info(f"remaining_tokens: {remaining_tokens}") + tokens = tokens[:remaining_tokens] + message.content = tokenizer.convert_tokens_to_string(tokens) + trimmed_messages.append(message) + break + else: + total_tokens += message_token_count + trimmed_messages.append(message) + + return trimmed_messages async def build_communities(self): """Builds communities from the graph and summarizes them.""" @@ -490,33 +661,6 @@ def get_attribute_from_tgi_endpoint(url, attribute_name): return None -def trim_messages_to_token_limit(tokenizer, messages, max_tokens): - """Trim the messages to fit within the token limit.""" - total_tokens = 0 - trimmed_messages = [] - buffer = 100 - effective_max_tokens = max_tokens - buffer - - for message in messages: - tokens = tokenizer.tokenize(message.content) - message_token_count = len(tokens) - if total_tokens + message_token_count > effective_max_tokens: - # Trim the message to fit within the remaining token limit - logger.info(f"Trimming messages: {total_tokens + message_token_count} > {effective_max_tokens}") - logger.info(f"message_token_count: {message_token_count}") - remaining_tokens = effective_max_tokens - total_tokens - logger.info(f"remaining_tokens: {remaining_tokens}") - tokens = tokens[:remaining_tokens] - message.content = tokenizer.convert_tokens_to_string(tokens) - trimmed_messages.append(message) - break - else: - total_tokens += message_token_count - trimmed_messages.append(message) - - return trimmed_messages - - logger = CustomLogger("opea_dataprep_neo4j_llamaindex") logflag = os.getenv("LOGFLAG", False) @@ -549,12 +693,12 @@ def initialize_graph_store_and_models(self): except Exception as e: logger.info(f"An error occurred while verifying the API Key: {e}") else: - logger.info("NO OpenAI API Key. TGI/VLLM/TEI endpoints will be used.") + logger.info("OpenAI-like endpoint will be used.") # works with TGI and VLLM endpoints self.llm = OpenAILike( model=LLM_MODEL_ID, api_base=TGI_LLM_ENDPOINT + "/v1", - api_key="fake", + api_key=TGI_LLM_ENDPOINT_KEY, temperature=0.7, max_tokens=int(MAX_OUTPUT_TOKENS), # 1512 timeout=1200, # timeout in seconds) diff --git a/comps/dataprep/src/requirements.txt b/comps/dataprep/src/requirements.txt index 03a3c4b382..d09fd3266c 100644 --- a/comps/dataprep/src/requirements.txt +++ b/comps/dataprep/src/requirements.txt @@ -39,6 +39,7 @@ markdown moviepy neo4j numpy +# https://github.com/run-llama/llama_index/issues/18823 openai==1.81.0 openai-whisper opencv-python diff --git a/comps/retrievers/src/integrations/config.py b/comps/retrievers/src/integrations/config.py index 85e16d42fb..a70dff3761 100644 --- a/comps/retrievers/src/integrations/config.py +++ b/comps/retrievers/src/integrations/config.py @@ -72,6 +72,8 @@ def get_boolean_env_var(var_name, default_value=False): NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "test") host_ip = os.getenv("host_ip") TGI_LLM_ENDPOINT = os.getenv("TGI_LLM_ENDPOINT", f"http://{host_ip}:6005") +TGI_LLM_ENDPOINT_KEY = os.getenv("TGI_LLM_ENDPOINT_KEY", "fake") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small") OPENAI_LLM_MODEL = os.getenv("OPENAI_LLM_MODEL", "gpt-4o") LLM_MODEL_ID = os.getenv("LLM_MODEL_ID", "meta-llama/Meta-Llama-3.1-8B-Instruct") diff --git a/comps/retrievers/src/integrations/neo4j.py b/comps/retrievers/src/integrations/neo4j.py index 7cde18cea5..ab6e249c3e 100644 --- a/comps/retrievers/src/integrations/neo4j.py +++ b/comps/retrievers/src/integrations/neo4j.py @@ -34,6 +34,7 @@ OPENAI_LLM_MODEL, TEI_EMBEDDING_ENDPOINT, TGI_LLM_ENDPOINT, + TGI_LLM_ENDPOINT_KEY, ) logger = CustomLogger("neo4j_retrievers") @@ -239,7 +240,7 @@ def _initialize_client(self): llm = OpenAILike( model=LLM_MODEL_ID, api_base=TGI_LLM_ENDPOINT + "/v1", - api_key="fake", + api_key=TGI_LLM_ENDPOINT_KEY, timeout=600, temperature=0.7, max_tokens=int(MAX_OUTPUT_TOKENS),