Skip to content

Commit

Permalink
updated create_doc_map()
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 21, 2024
1 parent a450b12 commit b4792f7
Showing 1 changed file with 75 additions and 75 deletions.
150 changes: 75 additions & 75 deletions ai_ta_backend/beam/nomic_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ def create_document_map(course_name: str):

try:
# check if map exists
response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
if response.data:
return "Map already exists for this course."
# response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
# if response.data:
# return "Map already exists for this course."

# fetch relevant document data from Supabase
response = supabase_client.table("documents").select("id",
Expand All @@ -449,92 +449,91 @@ def create_document_map(course_name: str):
print("Total number of documents in Supabase: ", total_doc_count)

# minimum 20 docs needed to create map
if total_doc_count < 20:
return "Cannot create a map because there are less than 20 documents in the course."

if total_doc_count > 19:
first_id = response.data[0]['id']
combined_dfs = []
curr_total_doc_count = 0
doc_count = 0
first_batch = True
first_id = response.data[0]['id']
combined_dfs = []
curr_total_doc_count = 0
doc_count = 0
first_batch = True

# iteratively query in batches of 25
while curr_total_doc_count < total_doc_count:
# iteratively query in batches of 25
while curr_total_doc_count < total_doc_count:

response = supabase_client.table("documents").select(
"id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte(
response = supabase_client.table("documents").select(
"id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte(
'id', first_id).order('id', desc=False).limit(25).execute()
df = pd.DataFrame(response.data)
combined_dfs.append(df) # list of dfs

curr_total_doc_count += len(response.data)
doc_count += len(response.data)

df = pd.DataFrame(response.data)
combined_dfs.append(df) # list of dfs

if doc_count >= 1000: # upload to Nomic every 1000 docs
curr_total_doc_count += len(response.data)
doc_count += len(response.data)

# concat all dfs from the combined_dfs list
final_df = pd.concat(combined_dfs, ignore_index=True)

# prep data for nomic upload
embeddings, metadata = data_prep_for_doc_map(final_df)

if first_batch:
# create a new map
print("Creating new map...")
project_name = NOMIC_MAP_NAME_PREFIX + course_name
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
# update flag
first_batch = False

else:
# append to existing map
print("Appending data to existing map...")
project_name = NOMIC_MAP_NAME_PREFIX + course_name
# add project lock logic here
result = append_to_map(embeddings, metadata, project_name)
if doc_count >= 100: # upload to Nomic in batches of 1000

# concat all dfs from the combined_dfs list
final_df = pd.concat(combined_dfs, ignore_index=True)

# reset variables
combined_dfs = []
doc_count = 0
# prep data for nomic upload
embeddings, metadata = data_prep_for_doc_map(final_df)

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1
if first_batch:
# create a new map
print("Creating new map...")
project_name = NOMIC_MAP_NAME_PREFIX + course_name
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text", "base_url", "created_at"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
# update flag
first_batch = False


# upload last set of docs
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
project_name = NOMIC_MAP_NAME_PREFIX + course_name
if first_batch:
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
else:
result = append_to_map(embeddings, metadata, project_name)
print("Atlas upload status: ", result)

# log info to supabase
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
project.rebuild_maps()
project_info = {'course_name': course_name, 'doc_map_id': project_id}
response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", response)
return "success"
else:
# append to existing map
print("Appending data to existing map...")
project_name = NOMIC_MAP_NAME_PREFIX + course_name
# add project lock logic here
result = append_to_map(embeddings, metadata, project_name)

# reset variables
combined_dfs = []
doc_count = 0

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1

# upload last set of docs
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
project_name = NOMIC_MAP_NAME_PREFIX + course_name
if first_batch:
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text", "base_url", "created_at"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
else:
return "Cannot create a map because there are less than 20 documents in the course."
result = append_to_map(embeddings, metadata, project_name)
print("Atlas upload status: ", result)

# log info to supabase
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
project.rebuild_maps()
project_info = {'course_name': course_name, 'doc_map_id': project_id}
response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", response)
return "success"

except Exception as e:
print(e)
sentry_sdk.capture_exception(e)

return "failed"



def delete_from_document_map(course_name: str, ids: list):
"""
This function is used to delete datapoints from a document map.
Expand Down Expand Up @@ -644,11 +643,11 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co
data=metadata,
id_field="id",
build_topic_model=True,
name=map_name,
topic_label_field=topic_label_field,
name=map_name,
colorable_fields=colorable_fields,
add_datums_if_exists=True)
project.create_index(index_name, build_topic_model=True)
project.create_index(name=index_name, build_topic_model=True)
return "success"
except Exception as e:
print(e)
Expand All @@ -673,7 +672,6 @@ def append_to_map(embeddings, metadata, map_name):
print(e)
return "Error in appending to map: {e}"


def data_prep_for_doc_map(df: pd.DataFrame):
"""
This function prepares embeddings and metadata for nomic upload in document map creation.
Expand All @@ -692,6 +690,7 @@ def data_prep_for_doc_map(df: pd.DataFrame):

for index, row in df.iterrows():
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S")
if row['url'] == None:
row['url'] = ""
# iterate through all contexts and create separate entries for each
Expand All @@ -703,11 +702,12 @@ def data_prep_for_doc_map(df: pd.DataFrame):

meta_row = {
"id": str(row['id']) + "_" + str(context_count),
"doc_ingested_at": row['created_at'],
"created_at": created_at,
"s3_path": row['s3_path'],
"url": row['url'],
"base_url": row['base_url'],
"readable_filename": row['readable_filename'],
"created_at": current_time,
"modified_at": current_time,
"text": text_row
}

Expand Down

0 comments on commit b4792f7

Please sign in to comment.