Skip to content

Commit

Permalink
modified context_padding with multi-threading
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Nov 9, 2023
1 parent 8788533 commit 0a17cd1
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 97 deletions.
45 changes: 45 additions & 0 deletions ai_ta_backend/parallel_context_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
import supabase
import pandas as pd
import time
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from multiprocessing import Manager

def qdrant_context_processing(doc, course_name, result_contexts):
"""
Re-factor QDRANT objects into Supabase objects and append to result_docs
"""
context_dict = {
'text': doc.page_content,
'embedding': '',
'pagenumber': doc.metadata['pagenumber'],
'readable_filename': doc.metadata['readable_filename'],
'course_name': course_name,
's3_path': doc.metadata['s3_path'],
'base_url': doc.metadata['base_url']
}
if 'url' in doc.metadata.keys():
context_dict['url'] = doc.metadata['url']

result_contexts.append(context_dict)
return result_contexts


def context_padding(found_docs, search_query, course_name):
"""
Takes top N contexts acquired from QRANT similarity search and pads them
"""
print("inside main context padding")
start_time = time.monotonic()

with Manager() as manager:
result_contexts = manager.list()
partial_func = partial(qdrant_context_processing, course_name=course_name, result_contexts=result_contexts)

with ProcessPoolExecutor() as executor:
executor.map(partial_func, found_docs[5:])

print(f"⏰ QDRANT processing runtime: {(time.monotonic() - start_time):.2f} seconds")
return list(result_contexts)

310 changes: 213 additions & 97 deletions ai_ta_backend/vector_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, List, Optional, Tuple, Union


import boto3
import fitz
import openai
Expand Down Expand Up @@ -41,6 +42,8 @@
from ai_ta_backend.aws import upload_data_files_to_s3
from ai_ta_backend.extreme_context_stuffing import OpenAIAPIProcessor
from ai_ta_backend.utils_tokenization import count_tokens_and_cost
from ai_ta_backend.parallel_context_processing import context_padding


MULTI_QUERY_PROMPT = hub.pull("langchain-ai/rag-fusion-query-generation")

Expand Down Expand Up @@ -1034,101 +1037,177 @@ def batch_vector_search(self, search_queries: List[str], course_name: str, top_n

return found_docs

def context_padding(self, found_docs, search_query, course_name):
# def context_padding(self, found_docs, search_query, course_name):
# """
# Takes top N contexts acquired from QRANT similarity search and pads them
# with context from the original document from Supabase.
# 1. Use s3_path OR url as unique doc indentifier
# 2. Use s_path + chunk_index to locate chunk in the document.
# 3. Pad it with 3 contexts before and after it.
# 4. If chunk_index is not present, use page number to locate the page in the document.
# 5. Ensure no duplication takes place - top N will often have contexts belonging to the same doc.
# """
# print("inside context padding")
# documents_table = os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']
# retrieved_contexts_identifiers = {}
# result_contexts = []

# # only pad the first 5 contexts, append the rest as it is.
# for i, doc in enumerate(found_docs): # top N from QDRANT
# if i < 5:
# # if url present, query through that
# if 'url' in doc.metadata.keys() and doc.metadata['url']:
# parent_doc_id = doc.metadata['url']
# response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('url', parent_doc_id).execute()
# retrieved_contexts_identifiers[parent_doc_id] = []
# # else use s3_path
# else:
# parent_doc_id = doc.metadata['s3_path']
# response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('s3_path', parent_doc_id).execute()
# retrieved_contexts_identifiers[parent_doc_id] = []

# data = response.data # at this point, we have the origin parent document from Supabase
# if len(data) > 0:
# filename = data[0]['readable_filename']
# contexts = data[0]['contexts']
# print("no of contexts within the og doc: ", len(contexts))

# if 'chunk_index' in doc.metadata:
# # retrieve by chunk index --> pad contexts
# target_chunk_index = doc.metadata['chunk_index']
# print("target chunk_index: ", target_chunk_index)
# print("len of result contexts before chunk_index padding: ", len(result_contexts))

# for context in contexts:
# curr_chunk_index = context['chunk_index']
# # collect between range of target index - 3 and target index + 3
# if (target_chunk_index - 3 <= curr_chunk_index <= target_chunk_index + 3) and curr_chunk_index not in retrieved_contexts_identifiers[parent_doc_id]:
# context['readable_filename'] = filename
# context['course_name'] = course_name
# context['s3_path'] = data[0]['s3_path']
# context['url'] = data[0]['url']
# context['base_url'] = data[0]['base_url']

# result_contexts.append(context)
# # add current index to retrieved_contexts_identifiers after each context is retrieved to avoid duplicates
# retrieved_contexts_identifiers[parent_doc_id].append(curr_chunk_index)
# print("len of result contexts after chunk_index padding: ", len(result_contexts))

# elif doc.metadata['pagenumber'] != '':
# # retrieve by page number --> retrieve the single whole page?
# pagenumber = doc.metadata['pagenumber']
# print("target pagenumber: ", pagenumber)
# print("len of result contexts before pagenumber padding: ", len(result_contexts))
# for context in contexts:
# if int(context['pagenumber']) == pagenumber:
# context['readable_filename'] = filename
# context['course_name'] = course_name
# context['s3_path'] = data[0]['s3_path']
# context['url'] = data[0]['url']
# context['base_url'] = data[0]['base_url']
# result_contexts.append(context)

# print("len of result contexts after pagenumber padding: ", len(result_contexts))

# # add page number to retrieved_contexts_identifiers after all contexts belonging to that page number have been retrieved
# retrieved_contexts_identifiers[parent_doc_id].append(pagenumber)
# else:
# # dont pad, re-factor it to be like Supabase object
# print("no chunk index or page number found, just appending the QDRANT context")
# print("len of result contexts before qdrant append: ", len(result_contexts))
# context_dict = {'text': doc.page_content,
# 'embedding': '',
# 'pagenumber': doc.metadata['pagenumber'],
# 'readable_filename': doc.metadata['readable_filename'],
# 'course_name': course_name,
# 's3_path': doc.metadata['s3_path'],
# 'base_url':doc.metadata['base_url']
# }
# if 'url' in doc.metadata.keys():
# context_dict['url'] = doc.metadata['url']

# result_contexts.append(context_dict)
# print("len of result contexts after qdrant append: ", len(result_contexts))
# else:
# # append the rest of the docs as it is.
# print("reached > 5 docs, just appending the QDRANT context")
# context_dict = {'text': doc.page_content,
# 'embedding': '',
# 'pagenumber': doc.metadata['pagenumber'],
# 'readable_filename': doc.metadata['readable_filename'],
# 'course_name': course_name,
# 's3_path': doc.metadata['s3_path'],
# 'base_url':doc.metadata['base_url']
# }
# if 'url' in doc.metadata.keys():
# context_dict['url'] = doc.metadata['url']

# result_contexts.append(context_dict)


# print("length of final contexts: ", len(result_contexts))
# return result_contexts

def context_data_processing(self, doc, course_name, retrieved_contexts_identifiers, result_docs):
"""
Takes top N contexts acquired from QRANT similarity search and pads them
with context from the original document from Supabase.
1. Use s3_path OR url as unique doc indentifier
2. Use s_path + chunk_index to locate chunk in the document.
3. Pad it with 3 contexts before and after it.
4. If chunk_index is not present, use page number to locate the page in the document.
5. Ensure no duplication takes place - top N will often have contexts belonging to the same doc.
Does context padding for given doc. Used with context_padding()
"""
print("inside context padding")
print("in context data processing")
documents_table = os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']
retrieved_contexts_identifiers = {}
result_contexts = []

# only pad the first 5 contexts, append the rest as it is.
for i, doc in enumerate(found_docs): # top N from QDRANT
if i < 5:
# if url present, query through that
if 'url' in doc.metadata.keys() and doc.metadata['url']:
parent_doc_id = doc.metadata['url']
response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('url', parent_doc_id).execute()
retrieved_contexts_identifiers[parent_doc_id] = []
# else use s3_path
else:
parent_doc_id = doc.metadata['s3_path']
response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('s3_path', parent_doc_id).execute()
retrieved_contexts_identifiers[parent_doc_id] = []

data = response.data # at this point, we have the origin parent document from Supabase
if len(data) > 0:
filename = data[0]['readable_filename']
contexts = data[0]['contexts']
print("no of contexts within the og doc: ", len(contexts))

if 'chunk_index' in doc.metadata:
# retrieve by chunk index --> pad contexts
target_chunk_index = doc.metadata['chunk_index']
print("target chunk_index: ", target_chunk_index)
print("len of result contexts before chunk_index padding: ", len(result_contexts))

for context in contexts:
curr_chunk_index = context['chunk_index']
# collect between range of target index - 3 and target index + 3
if (target_chunk_index - 3 <= curr_chunk_index <= target_chunk_index + 3) and curr_chunk_index not in retrieved_contexts_identifiers[parent_doc_id]:
context['readable_filename'] = filename
context['course_name'] = course_name
context['s3_path'] = data[0]['s3_path']
context['url'] = data[0]['url']
context['base_url'] = data[0]['base_url']

result_contexts.append(context)
# add current index to retrieved_contexts_identifiers after each context is retrieved to avoid duplicates
retrieved_contexts_identifiers[parent_doc_id].append(curr_chunk_index)
print("len of result contexts after chunk_index padding: ", len(result_contexts))

elif doc.metadata['pagenumber'] != '':
# retrieve by page number --> retrieve the single whole page?
pagenumber = doc.metadata['pagenumber']
print("target pagenumber: ", pagenumber)
print("len of result contexts before pagenumber padding: ", len(result_contexts))
for context in contexts:
if int(context['pagenumber']) == pagenumber:
context['readable_filename'] = filename
context['course_name'] = course_name
context['s3_path'] = data[0]['s3_path']
context['url'] = data[0]['url']
context['base_url'] = data[0]['base_url']
result_contexts.append(context)

print("len of result contexts after pagenumber padding: ", len(result_contexts))

# add page number to retrieved_contexts_identifiers after all contexts belonging to that page number have been retrieved
retrieved_contexts_identifiers[parent_doc_id].append(pagenumber)
else:
# dont pad, re-factor it to be like Supabase object
print("no chunk index or page number found, just appending the QDRANT context")
print("len of result contexts before qdrant append: ", len(result_contexts))
context_dict = {'text': doc.page_content,
'embedding': '',
'pagenumber': doc.metadata['pagenumber'],
'readable_filename': doc.metadata['readable_filename'],
'course_name': course_name,
's3_path': doc.metadata['s3_path'],
'base_url':doc.metadata['base_url']
}
if 'url' in doc.metadata.keys():
context_dict['url'] = doc.metadata['url']

result_contexts.append(context_dict)
print("len of result contexts after qdrant append: ", len(result_contexts))

# query by url or s3_path
if 'url' in doc.metadata.keys() and doc.metadata['url']:
parent_doc_id = doc.metadata['url']
response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('url', parent_doc_id).execute()
retrieved_contexts_identifiers[parent_doc_id] = []

else:
parent_doc_id = doc.metadata['s3_path']
response = self.supabase_client.table(documents_table).select('*').eq('course_name', course_name).eq('s3_path', parent_doc_id).execute()
retrieved_contexts_identifiers[parent_doc_id] = []

data = response.data

if len(data) > 0:
# do the padding
filename = data[0]['readable_filename']
contexts = data[0]['contexts']
print("no of contexts within the og doc: ", len(contexts))

if 'chunk_index' in doc.metadata and 'chunk_index' in contexts[0].keys():
# pad contexts by chunk index + 3 and - 3
target_chunk_index = doc.metadata['chunk_index']
for context in contexts:
curr_chunk_index = context['chunk_index']
if (target_chunk_index - 3 <= curr_chunk_index <= target_chunk_index + 3) and curr_chunk_index not in retrieved_contexts_identifiers[parent_doc_id]:
context['readable_filename'] = filename
context['course_name'] = course_name
context['s3_path'] = data[0]['s3_path']
context['url'] = data[0]['url']
context['base_url'] = data[0]['base_url']
result_docs.append(context)
# add current index to retrieved_contexts_identifiers after each context is retrieved to avoid duplicates
retrieved_contexts_identifiers[parent_doc_id].append(curr_chunk_index)

elif doc.metadata['pagenumber'] != '':
# pad contexts belonging to same page number
pagenumber = doc.metadata['pagenumber']

for context in contexts:
if int(context['pagenumber']) == pagenumber and pagenumber not in retrieved_contexts_identifiers[parent_doc_id]:
context['readable_filename'] = filename
context['course_name'] = course_name
context['s3_path'] = data[0]['s3_path']
context['url'] = data[0]['url']
context['base_url'] = data[0]['base_url']
result_docs.append(context)
# add page number to retrieved_contexts_identifiers after all contexts belonging to that page number have been retrieved
retrieved_contexts_identifiers[parent_doc_id].append(pagenumber)

else:
# append the rest of the docs as it is.
print("reached > 5 docs, just appending the QDRANT context")
context_dict = {'text': doc.page_content,
# refactor as a Supabase object and append
context_dict = {
'text': doc.page_content,
'embedding': '',
'pagenumber': doc.metadata['pagenumber'],
'readable_filename': doc.metadata['readable_filename'],
Expand All @@ -1138,13 +1217,50 @@ def context_padding(self, found_docs, search_query, course_name):
}
if 'url' in doc.metadata.keys():
context_dict['url'] = doc.metadata['url']

result_contexts.append(context_dict)
result_docs.append(context_dict)

return result_docs

# def qdrant_context_processing(doc, course_name, result_contexts):
# """
# Re-factor QDRANT objects into Supabase objects and append to result_docs
# """
# context_dict = {
# 'text': doc.page_content,
# 'embedding': '',
# 'pagenumber': doc.metadata['pagenumber'],
# 'readable_filename': doc.metadata['readable_filename'],
# 'course_name': course_name,
# 's3_path': doc.metadata['s3_path'],
# 'base_url': doc.metadata['base_url']
# }
# if 'url' in doc.metadata.keys():
# context_dict['url'] = doc.metadata['url']

# result_contexts.append(context_dict)
# return result_contexts

# def context_padding(self, found_docs, search_query, course_name):
# """
# Takes top N contexts acquired from QRANT similarity search and pads them
# """
# print("inside main context padding")
# context_ids = {}
# # for doc in found_docs[:5]:
# # self.context_data_processing(doc, course_name, context_ids, result_contexts)

# with Manager() as manager:
# result_contexts = manager.list()
# with ProcessPoolExecutor() as executor:
# partial_func = partial(qdrant_context_processing, course_name=course_name, result_contexts=result_contexts)
# results = executor.map(partial_func, found_docs[5:])

# results = list(results)
# print("RESULTS: ", results)

# return results


print("length of final contexts: ", len(result_contexts))
return result_contexts

def reciprocal_rank_fusion(self, results: list[list], k=60):
"""
Since we have multiple queries, and n documents returned per query, we need to go through all the results
Expand Down Expand Up @@ -1208,7 +1324,7 @@ def getTopContexts(self, search_query: str, course_name: str, token_limit: int =

# 'context padding' // 'parent document retriever'
# TODO maybe only do context padding for top 5 docs? Otherwise it's wasteful imo.
final_docs = self.context_padding(found_docs, search_query, course_name)
final_docs = context_padding(found_docs, search_query, course_name)
print(f"Number of final docs after context padding: {len(final_docs)}")

pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n"
Expand Down

0 comments on commit 0a17cd1

Please sign in to comment.