Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor nomic log fix #231

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .beamignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.venv
venv
.idea
.vscode
.git
*.pyc
__pycache__
11 changes: 9 additions & 2 deletions ai_ta_backend/beam/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from nomic_logging import delete_from_document_map, log_to_document_map

from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map
from OpenaiEmbeddings import OpenAIAPIProcessor
from PIL import Image
from posthog import Posthog
Expand Down Expand Up @@ -156,6 +157,7 @@ def loader():
loader=loader,
autoscaler=autoscaler)
def ingest(**inputs: Dict[str, Any]):

qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"]

course_name: List[str] | str = inputs.get('course_name', '')
Expand All @@ -171,6 +173,8 @@ def ingest(**inputs: Dict[str, Any]):

ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog)



def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content):
if content:
return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename)
Expand All @@ -194,7 +198,10 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
# s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead?
success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content)
time.sleep(13 * retry_num) # max is 65


# rebuild nomic document map after all ingests are done
rebuild_status = rebuild_map(course_name, map_type='document')

print(f"Final success_fail_dict: {success_fail_dict}")
return json.dumps(success_fail_dict)

Expand Down
56 changes: 38 additions & 18 deletions ai_ta_backend/beam/nomic_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def log_convo_to_nomic(course_name: str, conversation) -> str:
raise Exception({"exception": str(e)})



def get_nomic_map(course_name: str, type: str):
"""
Returns the variables necessary to construct an iframe of the Nomic map given a course name.
Expand All @@ -241,8 +242,7 @@ def get_nomic_map(course_name: str, type: str):
except Exception as e:
# Error: ValueError: You must specify a unique_id_field when creating a new project.
if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore
print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ",
e)
print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", e)
else:
print("ERROR in get_nomic_map():", e)
sentry_sdk.capture_exception(e)
Expand Down Expand Up @@ -405,8 +405,8 @@ def create_nomic_map(course_name: str, log_data: list):
return "failed"


## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ##

## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ##

def create_document_map(course_name: str):
"""
Expand Down Expand Up @@ -444,13 +444,13 @@ def create_document_map(course_name: str):
desc=False).execute()
if not response.count:
return "No documents found for this course."

total_doc_count = response.count
print("Total number of documents in Supabase: ", total_doc_count)

# minimum 20 docs needed to create map
if total_doc_count > 19:

if total_doc_count > 19:
first_id = response.data[0]['id']
combined_dfs = []
curr_total_doc_count = 0
Expand All @@ -469,6 +469,7 @@ def create_document_map(course_name: str):
curr_total_doc_count += len(response.data)
doc_count += len(response.data)


if doc_count >= 1000: # upload to Nomic every 1000 docs

# concat all dfs from the combined_dfs list
Expand All @@ -495,13 +496,15 @@ def create_document_map(course_name: str):
# add project lock logic here
result = append_to_map(embeddings, metadata, project_name)


# reset variables
combined_dfs = []
doc_count = 0

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1


# upload last set of docs
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
Expand All @@ -528,6 +531,7 @@ def create_document_map(course_name: str):
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)

return "failed"


Expand Down Expand Up @@ -615,13 +619,6 @@ def log_to_document_map(data: dict):
project_name = "Document Map for " + course_name
result = append_to_map(embeddings, metadata, project_name)

# check if project is accepting new datums
if project.is_accepting_data:
with project.wait_for_project_lock():
project.rebuild_maps()

# with project.wait_for_project_lock():
# project.rebuild_maps()
return result

except Exception as e:
Expand All @@ -642,7 +639,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co
colorable_fields: list of str
"""
nomic.login(os.getenv('NOMIC_API_KEY'))

try:
project = atlas.map_embeddings(embeddings=embeddings,
data=metadata,
Expand All @@ -658,7 +654,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co
print(e)
return "Error in creating map: {e}"


def append_to_map(embeddings, metadata, map_name):
"""
Generic function to append new data to an existing Nomic map.
Expand All @@ -667,6 +662,7 @@ def append_to_map(embeddings, metadata, map_name):
metadata: pd.DataFrame of Nomic upload metadata
map_name: str
"""

nomic.login(os.getenv('NOMIC_API_KEY'))
try:
project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True)
Expand All @@ -691,10 +687,10 @@ def data_prep_for_doc_map(df: pd.DataFrame):

metadata = []
embeddings = []

texts = []

for index, row in df.iterrows():

current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if row['url'] == None:
row['url'] = ""
Expand Down Expand Up @@ -725,9 +721,7 @@ def data_prep_for_doc_map(df: pd.DataFrame):
# check dimension if embeddings_np is (n, 1536)
if len(embeddings_np.shape) < 2:
print("Creating new embeddings...")
# embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE,
# openai_api_base=os.getenv('AZURE_OPENAI_BASE'),
# openai_api_key=os.getenv('AZURE_OPENAI_KEY')) # type: ignore

embeddings_model = OpenAIEmbeddings(openai_api_type="openai",
openai_api_base="https://api.openai.com/v1/",
openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore
Expand All @@ -738,6 +732,32 @@ def data_prep_for_doc_map(df: pd.DataFrame):

return embeddings, metadata

def rebuild_map(course_name:str, map_type:str):
"""
This function rebuilds a given map in Nomic.
"""
print("in rebuild_map()")
nomic.login(os.getenv('NOMIC_API_KEY'))
if map_type.lower() == 'document':
NOMIC_MAP_NAME_PREFIX = 'Document Map for '
else:
NOMIC_MAP_NAME_PREFIX = 'Conversation Map for '

try:
# fetch project from Nomic
project_name = NOMIC_MAP_NAME_PREFIX + course_name
project = AtlasProject(name=project_name, add_datums_if_exists=True)

with project.wait_for_project_lock():
project.rebuild_maps()
return "Successfully rebuilt map"
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)
return "Error in rebuilding map: {e}"



if __name__ == '__main__':
pass