Skip to content

Commit

Permalink
Merge pull request #231 from UIUC-Chatbot/refactor-nomic-fix
Browse files Browse the repository at this point in the history
Refactor nomic log fix
  • Loading branch information
star-nox authored Mar 15, 2024
2 parents cac7315 + 259e4ea commit 93078ea
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 20 deletions.
7 changes: 7 additions & 0 deletions .beamignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.venv
venv
.idea
.vscode
.git
*.pyc
__pycache__
11 changes: 9 additions & 2 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from nomic_logging import delete_from_document_map, log_to_document_map

from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
Expand Down Expand Up @@ -156,6 +157,7 @@ def loader():
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]

course_name: List[str] | str = inputs.get('course_name', '')
Expand All @@ -171,6 +173,8 @@ def ingest(**inputs: Dict[str, Any]):

ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog)



def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content):
if content:
return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
Expand All @@ -194,7 +198,10 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
# s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead?
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65


# rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(course_name, map_type='document')

print(f"Final success_fail_dict: {success_fail_dict}")
return json.dumps(success_fail_dict)

Expand Down
56 changes: 38 additions & 18 deletions ai_ta_backend/beam/nomic_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def log_convo_to_nomic(course_name: str, conversation) -> str:
raise Exception({"exception": str(e)})



def get_nomic_map(course_name: str, type: str):
"""
Returns the variables necessary to construct an iframe of the Nomic map given a course name.
Expand All @@ -241,8 +242,7 @@ def get_nomic_map(course_name: str, type: str):
except Exception as e:
# Error: ValueError: You must specify a unique_id_field when creating a new project.
if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore
print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ",
e)
print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", e)
else:
print("ERROR in get_nomic_map():", e)
sentry_sdk.capture_exception(e)
Expand Down Expand Up @@ -405,8 +405,8 @@ def create_nomic_map(course_name: str, log_data: list):
return "failed"


## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ##

## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ##

def create_document_map(course_name: str):
"""
Expand Down Expand Up @@ -444,13 +444,13 @@ def create_document_map(course_name: str):
desc=False).execute()
if not response.count:
return "No documents found for this course."

total_doc_count = response.count
print("Total number of documents in Supabase: ", total_doc_count)

# minimum 20 docs needed to create map
if total_doc_count > 19:

if total_doc_count > 19:
first_id = response.data[0]['id']
combined_dfs = []
curr_total_doc_count = 0
Expand All @@ -469,6 +469,7 @@ def create_document_map(course_name: str):
curr_total_doc_count += len(response.data)
doc_count += len(response.data)


if doc_count >= 1000: # upload to Nomic every 1000 docs

# concat all dfs from the combined_dfs list
Expand All @@ -495,13 +496,15 @@ def create_document_map(course_name: str):
# add project lock logic here
result = append_to_map(embeddings, metadata, project_name)


# reset variables
combined_dfs = []
doc_count = 0

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1


# upload last set of docs
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
Expand All @@ -528,6 +531,7 @@ def create_document_map(course_name: str):
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)

return "failed"


Expand Down Expand Up @@ -615,13 +619,6 @@ def log_to_document_map(data: dict):
project_name = "Document Map for " + course_name
result = append_to_map(embeddings, metadata, project_name)

# check if project is accepting new datums
if project.is_accepting_data:
with project.wait_for_project_lock():
project.rebuild_maps()

# with project.wait_for_project_lock():
# project.rebuild_maps()
return result

except Exception as e:
Expand All @@ -642,7 +639,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co
colorable_fields: list of str
"""
nomic.login(os.getenv('NOMIC_API_KEY'))

try:
project = atlas.map_embeddings(embeddings=embeddings,
data=metadata,
Expand All @@ -658,7 +654,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co
print(e)
return "Error in creating map: {e}"


def append_to_map(embeddings, metadata, map_name):
"""
Generic function to append new data to an existing Nomic map.
Expand All @@ -667,6 +662,7 @@ def append_to_map(embeddings, metadata, map_name):
metadata: pd.DataFrame of Nomic upload metadata
map_name: str
"""

nomic.login(os.getenv('NOMIC_API_KEY'))
try:
project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True)
Expand All @@ -691,10 +687,10 @@ def data_prep_for_doc_map(df: pd.DataFrame):

metadata = []
embeddings = []

texts = []

for index, row in df.iterrows():

current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if row['url'] == None:
row['url'] = ""
Expand Down Expand Up @@ -725,9 +721,7 @@ def data_prep_for_doc_map(df: pd.DataFrame):
# check dimension if embeddings_np is (n, 1536)
if len(embeddings_np.shape) < 2:
print("Creating new embeddings...")
# embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE,
# openai_api_base=os.getenv('AZURE_OPENAI_BASE'),
# openai_api_key=os.getenv('AZURE_OPENAI_KEY')) # type: ignore

embeddings_model = OpenAIEmbeddings(openai_api_type="openai",
openai_api_base="https://api.openai.com/v1/",
openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore
Expand All @@ -738,6 +732,32 @@ def data_prep_for_doc_map(df: pd.DataFrame):

return embeddings, metadata

def rebuild_map(course_name:str, map_type:str):
"""
This function rebuilds a given map in Nomic.
"""
print("in rebuild_map()")
nomic.login(os.getenv('NOMIC_API_KEY'))
if map_type.lower() == 'document':
NOMIC_MAP_NAME_PREFIX = 'Document Map for '
else:
NOMIC_MAP_NAME_PREFIX = 'Conversation Map for '

try:
# fetch project from Nomic
project_name = NOMIC_MAP_NAME_PREFIX + course_name
project = AtlasProject(name=project_name, add_datums_if_exists=True)

with project.wait_for_project_lock():
project.rebuild_maps()
return "Successfully rebuilt map"
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)
return "Error in rebuilding map: {e}"



if __name__ == '__main__':
pass

0 comments on commit 93078ea

Please sign in to comment.