diff --git a/.env.template b/.env.template index 5c5520de..ba04c704 100644 --- a/.env.template +++ b/.env.template @@ -1,34 +1,34 @@ -# Supabase SQL -SUPABASE_URL= -SUPABASE_API_KEY= -SUPABASE_READ_ONLY= -SUPABASE_JWT_SECRET= - -MATERIALS_SUPABASE_TABLE=uiuc_chatbot -NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE=documents - -# QDRANT -QDRANT_COLLECTION_NAME=uiuc-chatbot -DEV_QDRANT_COLLECTION_NAME=dev -QDRANT_URL= -QDRANT_API_KEY= - -REFACTORED_MATERIALS_SUPABASE_TABLE= - -# AWS -S3_BUCKET_NAME=uiuc-chatbot -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= - -OPENAI_API_KEY= - -NOMIC_API_KEY= -LINTRULE_SECRET= - -# Github Agent -GITHUB_APP_ID= -GITHUB_APP_PRIVATE_KEY="-----BEGIN RSA PRIVATE KEY----- - ------END RSA PRIVATE KEY-----" - -NUMEXPR_MAX_THREADS=2 +# Supabase SQL +SUPABASE_URL= +SUPABASE_API_KEY= +SUPABASE_READ_ONLY= +SUPABASE_JWT_SECRET= + +MATERIALS_SUPABASE_TABLE=uiuc_chatbot +NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE=documents + +# QDRANT +QDRANT_COLLECTION_NAME=uiuc-chatbot +DEV_QDRANT_COLLECTION_NAME=dev +QDRANT_URL= +QDRANT_API_KEY= + +REFACTORED_MATERIALS_SUPABASE_TABLE= + +# AWS +S3_BUCKET_NAME=uiuc-chatbot +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= + +OPENAI_API_KEY= + +NOMIC_API_KEY= +LINTRULE_SECRET= + +# Github Agent +GITHUB_APP_ID= +GITHUB_APP_PRIVATE_KEY="-----BEGIN RSA PRIVATE KEY----- + +-----END RSA PRIVATE KEY-----" + +NUMEXPR_MAX_THREADS=2 diff --git a/.github/workflows/yapf-format.yml b/.github/workflows/yapf-format.yml index 3c0c1321..52d2df31 100644 --- a/.github/workflows/yapf-format.yml +++ b/.github/workflows/yapf-format.yml @@ -14,7 +14,7 @@ jobs: - name: pip install yapf run: pip install yapf - name: Format code with yapf - run: yapf --in-place --recursive --parallel --style="{based_on_style: google, column_limit: 140, indent_width: 2}" --exclude '*.env' . + run: yapf --in-place --recursive --parallel --style='{based_on_style: google, column_limit: 140, indent_width: 2}' --exclude '*.env' . - name: Commit changes uses: EndBug/add-and-commit@v4 with: diff --git a/.gitignore b/.gitignore index 3db8ad0c..70babf88 100644 --- a/.gitignore +++ b/.gitignore @@ -1,167 +1,167 @@ -# don't sync coursera docs -coursera-dl/ -*parsed.json -wandb - -# don't expose env files -dummy.ipynb -.env -# Created by https://www.toptal.com/developers/gitignore/api/python -# Edit at https://www.toptal.com/developers/gitignore?templates=python - -### Python ### -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coveage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -pytestdebug.log - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ -doc/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ -pythonenv* - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# profiling data -.prof - -# Virtualenv -# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ -.Python -[Bb]in -[Ii]nclude -[Ll]ib -[Ll]ib64 -[Ll]ocal -[Ss]cripts -pyvenv.cfg -.venv -pip-selfcheck.json - - -# End of https://www.toptal.com/developers/gitignore/api/python -.aider* +# don't sync coursera docs +coursera-dl/ +*parsed.json +wandb + +# don't expose env files +dummy.ipynb +.env +# Created by https://www.toptal.com/developers/gitignore/api/python +# Edit at https://www.toptal.com/developers/gitignore?templates=python + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coveage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +pytestdebug.log + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ +doc/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +pythonenv* + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# profiling data +.prof + +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +.Python +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +.venv +pip-selfcheck.json + + +# End of https://www.toptal.com/developers/gitignore/api/python +.aider* diff --git a/ai_ta_backend/extreme_context_stuffing.py b/ai_ta_backend/extreme_context_stuffing.py index 03b56e86..ed133a6a 100644 --- a/ai_ta_backend/extreme_context_stuffing.py +++ b/ai_ta_backend/extreme_context_stuffing.py @@ -1,541 +1,541 @@ -""" -API REQUEST PARALLEL PROCESSOR - -Using the OpenAI API to process lots of text quickly takes some care. -If you trickle in a million API requests one by one, they'll take days to complete. -If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. -To maximize throughput, parallel requests need to be throttled to stay under rate limits. - -This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. - -Features: -- Streams requests from file, to avoid running out of memory for giant jobs -- Makes requests concurrently, to maximize throughput -- Throttles request and token usage, to stay under rate limits -- Retries failed requests up to {max_attempts} times, to avoid missing data -- Logs errors, to diagnose problems with requests - -Example command to call script: -``` -python examples/api_request_parallel_processor.py \ - --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \ - --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \ - --request_url https://api.openai.com/v1/embeddings \ - --max_requests_per_minute 1500 \ - --max_tokens_per_minute 6250000 \ - --token_encoding_name cl100k_base \ - --max_attempts 5 \ - --logging_level 20 -``` - -Inputs: -- requests_filepath : str - - path to the file containing the requests to be processed - - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field - - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} - - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) - - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl - - the code to generate the example file is appended to the bottom of this script -- save_filepath : str, optional - - path to the file where the results will be saved - - file will be a jsonl file, where each line is an array with the original request plus the API response - - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] - - if omitted, results will be saved to {requests_filename}_results.jsonl -- request_url : str, optional - - URL of the API endpoint to call - - if omitted, will default to "https://api.openai.com/v1/embeddings" -- api_key : str, optional - - API key to use - - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} -- max_requests_per_minute : float, optional - - target number of requests to make per minute (will make less if limited by tokens) - - leave headroom by setting this to 50% or 75% of your limit - - if requests are limiting you, try batching multiple embeddings or completions into one request - - if omitted, will default to 1,500 -- max_tokens_per_minute : float, optional - - target number of tokens to use per minute (will use less if limited by requests) - - leave headroom by setting this to 50% or 75% of your limit - - if omitted, will default to 125,000 -- token_encoding_name : str, optional - - name of the token encoding used, as defined in the `tiktoken` package - - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) -- max_attempts : int, optional - - number of times to retry a failed request before giving up - - if omitted, will default to 5 -- logging_level : int, optional - - level of logging to use; higher numbers will log fewer messages - - 40 = ERROR; will log only when requests fail after all retries - - 30 = WARNING; will log when requests his rate limits or other errors - - 20 = INFO; will log when requests start and the status at finish - - 10 = DEBUG; will log various things as the loop runs to see when they occur - - if omitted, will default to 20 (INFO). - -The script is structured as follows: - - Imports - - Define main() - - Initialize things - - In main loop: - - Get next request if one is not already waiting for capacity - - Update available token & request capacity - - If enough capacity available, call API - - The loop pauses if a rate limit error is hit - - The loop breaks when no tasks remain - - Define dataclasses - - StatusTracker (stores script metadata counters; only one instance is created) - - APIRequest (stores API inputs, outputs, metadata; one method to call API) - - Define functions - - api_endpoint_from_url (extracts API endpoint from request URL) - - append_to_jsonl (writes to results file) - - num_tokens_consumed_from_request (bigger function to infer token usage from request) - - task_id_generator_function (yields 1, 2, 3, ...) - - Run main() -""" - -# import argparse -# import subprocess -# import tempfile -# from langchain.llms import OpenAI -import asyncio -import json -import logging -import os -import re -import time -from dataclasses import ( # for storing API inputs, outputs, and metadata - dataclass, field) -from typing import Any, List - -import aiohttp # for making API calls concurrently -import tiktoken # for counting tokens -from langchain.embeddings.openai import OpenAIEmbeddings -from langchain.vectorstores import Qdrant -from qdrant_client import QdrantClient, models - - -class OpenAIAPIProcessor: - - def __init__(self, input_prompts_list, request_url, api_key, max_requests_per_minute, max_tokens_per_minute, token_encoding_name, - max_attempts, logging_level): - self.request_url = request_url - self.api_key = api_key - self.max_requests_per_minute = max_requests_per_minute - self.max_tokens_per_minute = max_tokens_per_minute - self.token_encoding_name = token_encoding_name - self.max_attempts = max_attempts - self.logging_level = logging_level - self.input_prompts_list: List[dict] = input_prompts_list - self.results = [] - self.cleaned_results: List[str] = [] - - async def process_api_requests_from_file(self): - """Processes API requests in parallel, throttling to stay under rate limits.""" - # constants - seconds_to_pause_after_rate_limit_error = 15 - seconds_to_sleep_each_loop = 0.001 # 1 ms limits max throughput to 1,000 requests per second - - # initialize logging - logging.basicConfig(level=self.logging_level) - logging.debug(f"Logging initialized at level {self.logging_level}") - - # infer API endpoint and construct request header - api_endpoint = api_endpoint_from_url(self.request_url) - request_header = {"Authorization": f"Bearer {self.api_key}"} - - # initialize trackers - queue_of_requests_to_retry = asyncio.Queue() - task_id_generator = task_id_generator_function() # generates integer IDs of 1, 2, 3, ... - status_tracker = StatusTracker() # single instance to track a collection of variables - next_request = None # variable to hold the next request to call - - # initialize available capacity counts - available_request_capacity = self.max_requests_per_minute - available_token_capacity = self.max_tokens_per_minute - last_update_time = time.time() - - # initialize flags - file_not_finished = True # after file is empty, we'll skip reading it - logging.debug(f"Initialization complete.") - - requests = self.input_prompts_list.__iter__() - - logging.debug(f"File opened. Entering main loop") - - task_list = [] - - while True: - # get next request (if one is not already waiting for capacity) - if next_request is None: - if not queue_of_requests_to_retry.empty(): - next_request = queue_of_requests_to_retry.get_nowait() - logging.debug(f"Retrying request {next_request.task_id}: {next_request}") - elif file_not_finished: - try: - # get new request - # request_json = json.loads(next(requests)) - request_json = next(requests) - - next_request = APIRequest(task_id=next(task_id_generator), - request_json=request_json, - token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, - self.token_encoding_name), - attempts_left=self.max_attempts, - metadata=request_json.pop("metadata", None)) - status_tracker.num_tasks_started += 1 - status_tracker.num_tasks_in_progress += 1 - logging.debug(f"Reading request {next_request.task_id}: {next_request}") - except StopIteration: - # if file runs out, set flag to stop reading it - logging.debug("Read file exhausted") - file_not_finished = False - - # update available capacity - current_time = time.time() - seconds_since_update = current_time - last_update_time - available_request_capacity = min( - available_request_capacity + self.max_requests_per_minute * seconds_since_update / 60.0, - self.max_requests_per_minute, - ) - available_token_capacity = min( - available_token_capacity + self.max_tokens_per_minute * seconds_since_update / 60.0, - self.max_tokens_per_minute, - ) - last_update_time = current_time - - # if enough capacity available, call API - if next_request: - next_request_tokens = next_request.token_consumption - if (available_request_capacity >= 1 and available_token_capacity >= next_request_tokens): - # update counters - available_request_capacity -= 1 - available_token_capacity -= next_request_tokens - next_request.attempts_left -= 1 - - # call API - # TODO: NOT SURE RESPONSE WILL WORK HERE - task = asyncio.create_task( - next_request.call_api( - request_url=self.request_url, - request_header=request_header, - retry_queue=queue_of_requests_to_retry, - status_tracker=status_tracker, - )) - task_list.append(task) - next_request = None # reset next_request to empty - - # print("status_tracker.num_tasks_in_progress", status_tracker.num_tasks_in_progress) - # one_task_result = task.result() - # print("one_task_result", one_task_result) - - # if all tasks are finished, break - if status_tracker.num_tasks_in_progress == 0: - break - - # main loop sleeps briefly so concurrent tasks can run - await asyncio.sleep(seconds_to_sleep_each_loop) - - # if a rate limit error was hit recently, pause to cool down - seconds_since_rate_limit_error = (time.time() - status_tracker.time_of_last_rate_limit_error) - if seconds_since_rate_limit_error < seconds_to_pause_after_rate_limit_error: - remaining_seconds_to_pause = (seconds_to_pause_after_rate_limit_error - seconds_since_rate_limit_error) - await asyncio.sleep(remaining_seconds_to_pause) - # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago - logging.warn( - f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" - ) - - # after finishing, log final status - logging.info(f"""Parallel processing complete. About to return.""") - if status_tracker.num_tasks_failed > 0: - logging.warning(f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed.") - if status_tracker.num_rate_limit_errors > 0: - logging.warning(f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate.") - - # asyncio wait for task_list - await asyncio.wait(task_list) - - for task in task_list: - openai_completion = task.result() - self.results.append(openai_completion) - - self.cleaned_results: List[str] = extract_context_from_results(self.results) - - -def extract_context_from_results(results: List[Any]) -> List[str]: - assistant_contents = [] - total_prompt_tokens = 0 - total_completion_tokens = 0 - - for element in results: - if element is not None: - for item in element: - if 'choices' in item: - for choice in item['choices']: - if choice['message']['role'] == 'assistant': - assistant_contents.append(choice['message']['content']) - total_prompt_tokens += item['usage']['prompt_tokens'] - total_completion_tokens += item['usage']['completion_tokens'] - # Note: I don't think the prompt_tokens or completion_tokens is working quite right... - - return assistant_contents - - -# dataclasses - - -@dataclass -class StatusTracker: - """Stores metadata about the script's progress. Only one instance is created.""" - - num_tasks_started: int = 0 - num_tasks_in_progress: int = 0 # script ends when this reaches 0 - num_tasks_succeeded: int = 0 - num_tasks_failed: int = 0 - num_rate_limit_errors: int = 0 - num_api_errors: int = 0 # excluding rate limit errors, counted above - num_other_errors: int = 0 - time_of_last_rate_limit_error: float = 0 # used to cool off after hitting rate limits - - -@dataclass -class APIRequest: - """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" - - task_id: int - request_json: dict - token_consumption: int - attempts_left: int - metadata: dict - result: list = field(default_factory=list) - - async def call_api( - self, - request_url: str, - request_header: dict, - retry_queue: asyncio.Queue, - status_tracker: StatusTracker, - ): - """Calls the OpenAI API and saves results.""" - # logging.info(f"Starting request #{self.task_id}") - error = None - try: - async with aiohttp.ClientSession() as session: - async with session.post(url=request_url, headers=request_header, json=self.request_json) as response: - response = await response.json() - if "error" in response: - logging.warning(f"Request {self.task_id} failed with error {response['error']}") - status_tracker.num_api_errors += 1 - error = response - if "Rate limit" in response["error"].get("message", ""): - status_tracker.time_of_last_rate_limit_error = time.time() - status_tracker.num_rate_limit_errors += 1 - status_tracker.num_api_errors -= 1 # rate limit errors are counted separately - - except Exception as e: # catching naked exceptions is bad practice, but in this case we'll log & save them - logging.warning(f"Request {self.task_id} failed with Exception {e}") - status_tracker.num_other_errors += 1 - error = e - if error: - self.result.append(error) - if self.attempts_left: - retry_queue.put_nowait(self) - else: - logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}") - data = ([self.request_json, [str(e) for e in self.result], self.metadata] - if self.metadata else [self.request_json, [str(e) for e in self.result]]) - #append_to_jsonl(data, save_filepath) - status_tracker.num_tasks_in_progress -= 1 - status_tracker.num_tasks_failed += 1 - return data - else: - data = ([self.request_json, response, self.metadata] if self.metadata else [self.request_json, response]) # type: ignore - #append_to_jsonl(data, save_filepath) - status_tracker.num_tasks_in_progress -= 1 - status_tracker.num_tasks_succeeded += 1 - # logging.debug(f"Request {self.task_id} saved to {save_filepath}") - - return data - - -# functions - - -def api_endpoint_from_url(request_url: str): - """Extract the API endpoint from the request URL.""" - match = re.search('^https://[^/]+/v\\d+/(.+)$', request_url) - return match[1] # type: ignore - - -def append_to_jsonl(data, filename: str) -> None: - """Append a json payload to the end of a jsonl file.""" - json_string = json.dumps(data) - with open(filename, "a") as f: - f.write(json_string + "\n") - - -def num_tokens_consumed_from_request( - request_json: dict, - api_endpoint: str, - token_encoding_name: str, -): - """Count the number of tokens in the request. Only supports completion and embedding requests.""" - encoding = tiktoken.get_encoding(token_encoding_name) - # if completions request, tokens = prompt + n * max_tokens - if api_endpoint.endswith("completions"): - max_tokens = request_json.get("max_tokens", 15) - n = request_json.get("n", 1) - completion_tokens = n * max_tokens - - # chat completions - if api_endpoint.startswith("chat/"): - num_tokens = 0 - for message in request_json["messages"]: - num_tokens += 4 # every message follows {role/name}\n{content}\n - for key, value in message.items(): - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens -= 1 # role is always required and always 1 token - num_tokens += 2 # every reply is primed with assistant - return num_tokens + completion_tokens - # normal completions - else: - prompt = request_json["prompt"] - if isinstance(prompt, str): # single prompt - prompt_tokens = len(encoding.encode(prompt)) - num_tokens = prompt_tokens + completion_tokens - return num_tokens - elif isinstance(prompt, list): # multiple prompts - prompt_tokens = sum([len(encoding.encode(p)) for p in prompt]) - num_tokens = prompt_tokens + completion_tokens * len(prompt) - return num_tokens - else: - raise TypeError('Expecting either string or list of strings for "prompt" field in completion request') - # if embeddings request, tokens = input tokens - elif api_endpoint == "embeddings": - input = request_json["input"] - if isinstance(input, str): # single input - num_tokens = len(encoding.encode(input)) - return num_tokens - elif isinstance(input, list): # multiple inputs - num_tokens = sum([len(encoding.encode(i)) for i in input]) - return num_tokens - else: - raise TypeError('Expecting either string or list of strings for "inputs" field in embedding request') - # more logic needed to support other API calls (e.g., edits, inserts, DALL-E) - else: - raise NotImplementedError(f'API endpoint "{api_endpoint}" not implemented in this script') - - -def task_id_generator_function(): - """Generate integers 0, 1, 2, and so on.""" - task_id = 0 - while True: - yield task_id - task_id += 1 - -if __name__ == '__main__': - pass - -# run script -# if __name__ == "__main__": -# qdrant_client = QdrantClient( -# url=os.getenv('QDRANT_URL'), -# api_key=os.getenv('QDRANT_API_KEY'), -# ) -# vectorstore = Qdrant( -# client=qdrant_client, -# collection_name=os.getenv('QDRANT_COLLECTION_NAME'), # type: ignore -# embeddings=OpenAIEmbeddings()) # type: ignore - -# user_question = "What is the significance of Six Sigma?" -# k = 4 -# fetch_k = 200 -# found_docs = vectorstore.max_marginal_relevance_search(user_question, k=k, fetch_k=200) - -# requests = [] -# for i, doc in enumerate(found_docs): -# dictionary = { -# "model": "gpt-3.5-turbo-0613", # 4k context -# "messages": [{ -# "role": "system", -# "content": "You are a factual summarizer of partial documents. Stick to the facts (including partial info when necessary to avoid making up potentially incorrect details), and say I don't know when necessary." -# }, { -# "role": -# "user", -# "content": -# f"What is a comprehensive summary of the given text, based on the question:\n{doc.page_content}\nQuestion: {user_question}\nThe summary should cover all the key points only relevant to the question, while also condensing the information into a concise and easy-to-understand format. Please ensure that the summary includes relevant details and examples that support the main ideas, while avoiding any unnecessary information or repetition. Feel free to include references, sentence fragments, keywords, or anything that could help someone learn about it, only as it relates to the given question. The length of the summary should be as short as possible, without losing relevant information.\n" -# }], -# "n": 1, -# "max_tokens": 500, -# "metadata": doc.metadata -# } -# requests.append(dictionary) - -# oai = OpenAIAPIProcessor( -# input_prompts_list=requests, -# request_url='https://api.openai.com/v1/chat/completions', -# api_key=os.getenv("OPENAI_API_KEY"), -# max_requests_per_minute=1500, -# max_tokens_per_minute=90000, -# token_encoding_name='cl100k_base', -# max_attempts=5, -# logging_level=20, -# ) -# # run script -# asyncio.run(oai.process_api_requests_from_file()) - -# assistant_contents = [] -# total_prompt_tokens = 0 -# total_completion_tokens = 0 - -# print("Results, end of main: ", oai.results) -# print("-"*50) - -# # jsonObject = json.loads(oai.results) -# for element in oai.results: -# for item in element: -# if 'choices' in item: -# for choice in item['choices']: -# if choice['message']['role'] == 'assistant': -# assistant_contents.append(choice['message']['content']) -# total_prompt_tokens += item['usage']['prompt_tokens'] -# total_completion_tokens += item['usage']['completion_tokens'] - -# print("Assistant Contents:", assistant_contents) -# print("Total Prompt Tokens:", total_prompt_tokens) -# print("Total Completion Tokens:", total_completion_tokens) -# turbo_total_cost = (total_prompt_tokens * 0.0015) + (total_completion_tokens * 0.002) -# print("Total cost (3.5-turbo):", (total_prompt_tokens * 0.0015), " + Completions: ", (total_completion_tokens * 0.002), " = ", turbo_total_cost) - -# gpt4_total_cost = (total_prompt_tokens * 0.03) + (total_completion_tokens * 0.06) -# print("Hypothetical cost for GPT-4:", (total_prompt_tokens * 0.03), " + Completions: ", (total_completion_tokens * 0.06), " = ", gpt4_total_cost) -# print("GPT-4 cost premium: ", (gpt4_total_cost / turbo_total_cost), "x") - ''' - Pricing: - GPT4: - * $0.03 prompt - * $0.06 completions - 3.5-turbo: - * $0.0015 prompt - * $0.002 completions - ''' -""" -APPENDIX - -The example requests file at openai-cookbook/examples/data/example_requests_to_parallel_process.jsonl contains 10,000 requests to text-embedding-ada-002. - -It was generated with the following code: - -```python -import json - -filename = "data/example_requests_to_parallel_process.jsonl" -n_requests = 10_000 -jobs = [{"model": "text-embedding-ada-002", "input": str(x) + "\n"} for x in range(n_requests)] -with open(filename, "w") as f: - for job in jobs: - json_string = json.dumps(job) - f.write(json_string + "\n") -``` - -As with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically). -""" +""" +API REQUEST PARALLEL PROCESSOR + +Using the OpenAI API to process lots of text quickly takes some care. +If you trickle in a million API requests one by one, they'll take days to complete. +If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. +To maximize throughput, parallel requests need to be throttled to stay under rate limits. + +This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. + +Features: +- Streams requests from file, to avoid running out of memory for giant jobs +- Makes requests concurrently, to maximize throughput +- Throttles request and token usage, to stay under rate limits +- Retries failed requests up to {max_attempts} times, to avoid missing data +- Logs errors, to diagnose problems with requests + +Example command to call script: +``` +python examples/api_request_parallel_processor.py \ + --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \ + --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \ + --request_url https://api.openai.com/v1/embeddings \ + --max_requests_per_minute 1500 \ + --max_tokens_per_minute 6250000 \ + --token_encoding_name cl100k_base \ + --max_attempts 5 \ + --logging_level 20 +``` + +Inputs: +- requests_filepath : str + - path to the file containing the requests to be processed + - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field + - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} + - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) + - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl + - the code to generate the example file is appended to the bottom of this script +- save_filepath : str, optional + - path to the file where the results will be saved + - file will be a jsonl file, where each line is an array with the original request plus the API response + - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] + - if omitted, results will be saved to {requests_filename}_results.jsonl +- request_url : str, optional + - URL of the API endpoint to call + - if omitted, will default to "https://api.openai.com/v1/embeddings" +- api_key : str, optional + - API key to use + - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} +- max_requests_per_minute : float, optional + - target number of requests to make per minute (will make less if limited by tokens) + - leave headroom by setting this to 50% or 75% of your limit + - if requests are limiting you, try batching multiple embeddings or completions into one request + - if omitted, will default to 1,500 +- max_tokens_per_minute : float, optional + - target number of tokens to use per minute (will use less if limited by requests) + - leave headroom by setting this to 50% or 75% of your limit + - if omitted, will default to 125,000 +- token_encoding_name : str, optional + - name of the token encoding used, as defined in the `tiktoken` package + - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) +- max_attempts : int, optional + - number of times to retry a failed request before giving up + - if omitted, will default to 5 +- logging_level : int, optional + - level of logging to use; higher numbers will log fewer messages + - 40 = ERROR; will log only when requests fail after all retries + - 30 = WARNING; will log when requests his rate limits or other errors + - 20 = INFO; will log when requests start and the status at finish + - 10 = DEBUG; will log various things as the loop runs to see when they occur + - if omitted, will default to 20 (INFO). + +The script is structured as follows: + - Imports + - Define main() + - Initialize things + - In main loop: + - Get next request if one is not already waiting for capacity + - Update available token & request capacity + - If enough capacity available, call API + - The loop pauses if a rate limit error is hit + - The loop breaks when no tasks remain + - Define dataclasses + - StatusTracker (stores script metadata counters; only one instance is created) + - APIRequest (stores API inputs, outputs, metadata; one method to call API) + - Define functions + - api_endpoint_from_url (extracts API endpoint from request URL) + - append_to_jsonl (writes to results file) + - num_tokens_consumed_from_request (bigger function to infer token usage from request) + - task_id_generator_function (yields 1, 2, 3, ...) + - Run main() +""" + +# import argparse +# import subprocess +# import tempfile +# from langchain.llms import OpenAI +import asyncio +import json +import logging +import os +import re +import time +from dataclasses import ( # for storing API inputs, outputs, and metadata + dataclass, field) +from typing import Any, List + +import aiohttp # for making API calls concurrently +import tiktoken # for counting tokens +from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.vectorstores import Qdrant +from qdrant_client import QdrantClient, models + + +class OpenAIAPIProcessor: + + def __init__(self, input_prompts_list, request_url, api_key, max_requests_per_minute, max_tokens_per_minute, token_encoding_name, + max_attempts, logging_level): + self.request_url = request_url + self.api_key = api_key + self.max_requests_per_minute = max_requests_per_minute + self.max_tokens_per_minute = max_tokens_per_minute + self.token_encoding_name = token_encoding_name + self.max_attempts = max_attempts + self.logging_level = logging_level + self.input_prompts_list: List[dict] = input_prompts_list + self.results = [] + self.cleaned_results: List[str] = [] + + async def process_api_requests_from_file(self): + """Processes API requests in parallel, throttling to stay under rate limits.""" + # constants + seconds_to_pause_after_rate_limit_error = 15 + seconds_to_sleep_each_loop = 0.001 # 1 ms limits max throughput to 1,000 requests per second + + # initialize logging + logging.basicConfig(level=self.logging_level) + logging.debug(f"Logging initialized at level {self.logging_level}") + + # infer API endpoint and construct request header + api_endpoint = api_endpoint_from_url(self.request_url) + request_header = {"Authorization": f"Bearer {self.api_key}"} + + # initialize trackers + queue_of_requests_to_retry = asyncio.Queue() + task_id_generator = task_id_generator_function() # generates integer IDs of 1, 2, 3, ... + status_tracker = StatusTracker() # single instance to track a collection of variables + next_request = None # variable to hold the next request to call + + # initialize available capacity counts + available_request_capacity = self.max_requests_per_minute + available_token_capacity = self.max_tokens_per_minute + last_update_time = time.time() + + # initialize flags + file_not_finished = True # after file is empty, we'll skip reading it + logging.debug(f"Initialization complete.") + + requests = self.input_prompts_list.__iter__() + + logging.debug(f"File opened. Entering main loop") + + task_list = [] + + while True: + # get next request (if one is not already waiting for capacity) + if next_request is None: + if not queue_of_requests_to_retry.empty(): + next_request = queue_of_requests_to_retry.get_nowait() + logging.debug(f"Retrying request {next_request.task_id}: {next_request}") + elif file_not_finished: + try: + # get new request + # request_json = json.loads(next(requests)) + request_json = next(requests) + + next_request = APIRequest(task_id=next(task_id_generator), + request_json=request_json, + token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, + self.token_encoding_name), + attempts_left=self.max_attempts, + metadata=request_json.pop("metadata", None)) + status_tracker.num_tasks_started += 1 + status_tracker.num_tasks_in_progress += 1 + logging.debug(f"Reading request {next_request.task_id}: {next_request}") + except StopIteration: + # if file runs out, set flag to stop reading it + logging.debug("Read file exhausted") + file_not_finished = False + + # update available capacity + current_time = time.time() + seconds_since_update = current_time - last_update_time + available_request_capacity = min( + available_request_capacity + self.max_requests_per_minute * seconds_since_update / 60.0, + self.max_requests_per_minute, + ) + available_token_capacity = min( + available_token_capacity + self.max_tokens_per_minute * seconds_since_update / 60.0, + self.max_tokens_per_minute, + ) + last_update_time = current_time + + # if enough capacity available, call API + if next_request: + next_request_tokens = next_request.token_consumption + if (available_request_capacity >= 1 and available_token_capacity >= next_request_tokens): + # update counters + available_request_capacity -= 1 + available_token_capacity -= next_request_tokens + next_request.attempts_left -= 1 + + # call API + # TODO: NOT SURE RESPONSE WILL WORK HERE + task = asyncio.create_task( + next_request.call_api( + request_url=self.request_url, + request_header=request_header, + retry_queue=queue_of_requests_to_retry, + status_tracker=status_tracker, + )) + task_list.append(task) + next_request = None # reset next_request to empty + + # print("status_tracker.num_tasks_in_progress", status_tracker.num_tasks_in_progress) + # one_task_result = task.result() + # print("one_task_result", one_task_result) + + # if all tasks are finished, break + if status_tracker.num_tasks_in_progress == 0: + break + + # main loop sleeps briefly so concurrent tasks can run + await asyncio.sleep(seconds_to_sleep_each_loop) + + # if a rate limit error was hit recently, pause to cool down + seconds_since_rate_limit_error = (time.time() - status_tracker.time_of_last_rate_limit_error) + if seconds_since_rate_limit_error < seconds_to_pause_after_rate_limit_error: + remaining_seconds_to_pause = (seconds_to_pause_after_rate_limit_error - seconds_since_rate_limit_error) + await asyncio.sleep(remaining_seconds_to_pause) + # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago + logging.warn( + f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" + ) + + # after finishing, log final status + logging.info(f"""Parallel processing complete. About to return.""") + if status_tracker.num_tasks_failed > 0: + logging.warning(f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed.") + if status_tracker.num_rate_limit_errors > 0: + logging.warning(f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate.") + + # asyncio wait for task_list + await asyncio.wait(task_list) + + for task in task_list: + openai_completion = task.result() + self.results.append(openai_completion) + + self.cleaned_results: List[str] = extract_context_from_results(self.results) + + +def extract_context_from_results(results: List[Any]) -> List[str]: + assistant_contents = [] + total_prompt_tokens = 0 + total_completion_tokens = 0 + + for element in results: + if element is not None: + for item in element: + if 'choices' in item: + for choice in item['choices']: + if choice['message']['role'] == 'assistant': + assistant_contents.append(choice['message']['content']) + total_prompt_tokens += item['usage']['prompt_tokens'] + total_completion_tokens += item['usage']['completion_tokens'] + # Note: I don't think the prompt_tokens or completion_tokens is working quite right... + + return assistant_contents + + +# dataclasses + + +@dataclass +class StatusTracker: + """Stores metadata about the script's progress. Only one instance is created.""" + + num_tasks_started: int = 0 + num_tasks_in_progress: int = 0 # script ends when this reaches 0 + num_tasks_succeeded: int = 0 + num_tasks_failed: int = 0 + num_rate_limit_errors: int = 0 + num_api_errors: int = 0 # excluding rate limit errors, counted above + num_other_errors: int = 0 + time_of_last_rate_limit_error: float = 0 # used to cool off after hitting rate limits + + +@dataclass +class APIRequest: + """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" + + task_id: int + request_json: dict + token_consumption: int + attempts_left: int + metadata: dict + result: list = field(default_factory=list) + + async def call_api( + self, + request_url: str, + request_header: dict, + retry_queue: asyncio.Queue, + status_tracker: StatusTracker, + ): + """Calls the OpenAI API and saves results.""" + # logging.info(f"Starting request #{self.task_id}") + error = None + try: + async with aiohttp.ClientSession() as session: + async with session.post(url=request_url, headers=request_header, json=self.request_json) as response: + response = await response.json() + if "error" in response: + logging.warning(f"Request {self.task_id} failed with error {response['error']}") + status_tracker.num_api_errors += 1 + error = response + if "Rate limit" in response["error"].get("message", ""): + status_tracker.time_of_last_rate_limit_error = time.time() + status_tracker.num_rate_limit_errors += 1 + status_tracker.num_api_errors -= 1 # rate limit errors are counted separately + + except Exception as e: # catching naked exceptions is bad practice, but in this case we'll log & save them + logging.warning(f"Request {self.task_id} failed with Exception {e}") + status_tracker.num_other_errors += 1 + error = e + if error: + self.result.append(error) + if self.attempts_left: + retry_queue.put_nowait(self) + else: + logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}") + data = ([self.request_json, [str(e) for e in self.result], self.metadata] + if self.metadata else [self.request_json, [str(e) for e in self.result]]) + #append_to_jsonl(data, save_filepath) + status_tracker.num_tasks_in_progress -= 1 + status_tracker.num_tasks_failed += 1 + return data + else: + data = ([self.request_json, response, self.metadata] if self.metadata else [self.request_json, response]) # type: ignore + #append_to_jsonl(data, save_filepath) + status_tracker.num_tasks_in_progress -= 1 + status_tracker.num_tasks_succeeded += 1 + # logging.debug(f"Request {self.task_id} saved to {save_filepath}") + + return data + + +# functions + + +def api_endpoint_from_url(request_url: str): + """Extract the API endpoint from the request URL.""" + match = re.search('^https://[^/]+/v\\d+/(.+)$', request_url) + return match[1] # type: ignore + + +def append_to_jsonl(data, filename: str) -> None: + """Append a json payload to the end of a jsonl file.""" + json_string = json.dumps(data) + with open(filename, "a") as f: + f.write(json_string + "\n") + + +def num_tokens_consumed_from_request( + request_json: dict, + api_endpoint: str, + token_encoding_name: str, +): + """Count the number of tokens in the request. Only supports completion and embedding requests.""" + encoding = tiktoken.get_encoding(token_encoding_name) + # if completions request, tokens = prompt + n * max_tokens + if api_endpoint.endswith("completions"): + max_tokens = request_json.get("max_tokens", 15) + n = request_json.get("n", 1) + completion_tokens = n * max_tokens + + # chat completions + if api_endpoint.startswith("chat/"): + num_tokens = 0 + for message in request_json["messages"]: + num_tokens += 4 # every message follows {role/name}\n{content}\n + for key, value in message.items(): + num_tokens += len(encoding.encode(value)) + if key == "name": # if there's a name, the role is omitted + num_tokens -= 1 # role is always required and always 1 token + num_tokens += 2 # every reply is primed with assistant + return num_tokens + completion_tokens + # normal completions + else: + prompt = request_json["prompt"] + if isinstance(prompt, str): # single prompt + prompt_tokens = len(encoding.encode(prompt)) + num_tokens = prompt_tokens + completion_tokens + return num_tokens + elif isinstance(prompt, list): # multiple prompts + prompt_tokens = sum([len(encoding.encode(p)) for p in prompt]) + num_tokens = prompt_tokens + completion_tokens * len(prompt) + return num_tokens + else: + raise TypeError('Expecting either string or list of strings for "prompt" field in completion request') + # if embeddings request, tokens = input tokens + elif api_endpoint == "embeddings": + input = request_json["input"] + if isinstance(input, str): # single input + num_tokens = len(encoding.encode(input)) + return num_tokens + elif isinstance(input, list): # multiple inputs + num_tokens = sum([len(encoding.encode(i)) for i in input]) + return num_tokens + else: + raise TypeError('Expecting either string or list of strings for "inputs" field in embedding request') + # more logic needed to support other API calls (e.g., edits, inserts, DALL-E) + else: + raise NotImplementedError(f'API endpoint "{api_endpoint}" not implemented in this script') + + +def task_id_generator_function(): + """Generate integers 0, 1, 2, and so on.""" + task_id = 0 + while True: + yield task_id + task_id += 1 + +if __name__ == '__main__': + pass + +# run script +# if __name__ == "__main__": +# qdrant_client = QdrantClient( +# url=os.getenv('QDRANT_URL'), +# api_key=os.getenv('QDRANT_API_KEY'), +# ) +# vectorstore = Qdrant( +# client=qdrant_client, +# collection_name=os.getenv('QDRANT_COLLECTION_NAME'), # type: ignore +# embeddings=OpenAIEmbeddings()) # type: ignore + +# user_question = "What is the significance of Six Sigma?" +# k = 4 +# fetch_k = 200 +# found_docs = vectorstore.max_marginal_relevance_search(user_question, k=k, fetch_k=200) + +# requests = [] +# for i, doc in enumerate(found_docs): +# dictionary = { +# "model": "gpt-3.5-turbo-0613", # 4k context +# "messages": [{ +# "role": "system", +# "content": "You are a factual summarizer of partial documents. Stick to the facts (including partial info when necessary to avoid making up potentially incorrect details), and say I don't know when necessary." +# }, { +# "role": +# "user", +# "content": +# f"What is a comprehensive summary of the given text, based on the question:\n{doc.page_content}\nQuestion: {user_question}\nThe summary should cover all the key points only relevant to the question, while also condensing the information into a concise and easy-to-understand format. Please ensure that the summary includes relevant details and examples that support the main ideas, while avoiding any unnecessary information or repetition. Feel free to include references, sentence fragments, keywords, or anything that could help someone learn about it, only as it relates to the given question. The length of the summary should be as short as possible, without losing relevant information.\n" +# }], +# "n": 1, +# "max_tokens": 500, +# "metadata": doc.metadata +# } +# requests.append(dictionary) + +# oai = OpenAIAPIProcessor( +# input_prompts_list=requests, +# request_url='https://api.openai.com/v1/chat/completions', +# api_key=os.getenv("OPENAI_API_KEY"), +# max_requests_per_minute=1500, +# max_tokens_per_minute=90000, +# token_encoding_name='cl100k_base', +# max_attempts=5, +# logging_level=20, +# ) +# # run script +# asyncio.run(oai.process_api_requests_from_file()) + +# assistant_contents = [] +# total_prompt_tokens = 0 +# total_completion_tokens = 0 + +# print("Results, end of main: ", oai.results) +# print("-"*50) + +# # jsonObject = json.loads(oai.results) +# for element in oai.results: +# for item in element: +# if 'choices' in item: +# for choice in item['choices']: +# if choice['message']['role'] == 'assistant': +# assistant_contents.append(choice['message']['content']) +# total_prompt_tokens += item['usage']['prompt_tokens'] +# total_completion_tokens += item['usage']['completion_tokens'] + +# print("Assistant Contents:", assistant_contents) +# print("Total Prompt Tokens:", total_prompt_tokens) +# print("Total Completion Tokens:", total_completion_tokens) +# turbo_total_cost = (total_prompt_tokens * 0.0015) + (total_completion_tokens * 0.002) +# print("Total cost (3.5-turbo):", (total_prompt_tokens * 0.0015), " + Completions: ", (total_completion_tokens * 0.002), " = ", turbo_total_cost) + +# gpt4_total_cost = (total_prompt_tokens * 0.03) + (total_completion_tokens * 0.06) +# print("Hypothetical cost for GPT-4:", (total_prompt_tokens * 0.03), " + Completions: ", (total_completion_tokens * 0.06), " = ", gpt4_total_cost) +# print("GPT-4 cost premium: ", (gpt4_total_cost / turbo_total_cost), "x") + ''' + Pricing: + GPT4: + * $0.03 prompt + * $0.06 completions + 3.5-turbo: + * $0.0015 prompt + * $0.002 completions + ''' +""" +APPENDIX + +The example requests file at openai-cookbook/examples/data/example_requests_to_parallel_process.jsonl contains 10,000 requests to text-embedding-ada-002. + +It was generated with the following code: + +```python +import json + +filename = "data/example_requests_to_parallel_process.jsonl" +n_requests = 10_000 +jobs = [{"model": "text-embedding-ada-002", "input": str(x) + "\n"} for x in range(n_requests)] +with open(filename, "w") as f: + for job in jobs: + json_string = json.dumps(job) + f.write(json_string + "\n") +``` + +As with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically). +""" diff --git a/ai_ta_backend/utils_tokenization.py b/ai_ta_backend/utils_tokenization.py index 5b000e5f..096e2bb6 100644 --- a/ai_ta_backend/utils_tokenization.py +++ b/ai_ta_backend/utils_tokenization.py @@ -1,136 +1,136 @@ -import json -import os -from typing import Any, List - -import supabase -import tiktoken - - -def count_tokens_and_cost(prompt: str, completion: str = '', openai_model_name: str = "gpt-3.5-turbo"): # -> tuple[int, float] | tuple[int, float, int, float]: - """ - Returns the number of tokens in a text string. - - Only the first parameter is required, a string of text to measure. The completion and model name are optional. - - num_tokens, prompt_cost = count_tokens_and_cost(prompt="hello there") - num_tokens_prompt, prompt_cost, num_tokens_completion, completion_cost = count_tokens_and_cost(prompt="hello there", completion="how are you?") - - Args: - prompt (str): _description_ - completion (str, optional): _description_. Defaults to ''. - openai_model_name (str, optional): _description_. Defaults to "gpt-3.5-turbo". - - Returns: - tuple[int, float] | tuple[int, float, int, float]: Returns the number of tokens consumed and the cost. The total cost you'll be billed is the sum of each individual cost (prompt_cost + completion_cost) - """ - # encoding = tiktoken.encoding_for_model(openai_model_name) - openai_model_name = openai_model_name.lower() - encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") # I think they all use the same encoding - prompt_cost = 0 - completion_cost = 0 - - prompt_token_cost = 0 - completion_token_cost = 0 - - if openai_model_name.startswith("gpt-3.5-turbo"): - if "16k" in openai_model_name: - prompt_token_cost: float = 0.003 / 1_000 - completion_token_cost: float = 0.004 / 1_000 - else: - # 3.5-turbo regular (4k context) - prompt_token_cost: float = 0.0015 / 1_000 - completion_token_cost: float = 0.002 / 1_000 - - elif openai_model_name.startswith("gpt-4"): - if "32k" in openai_model_name: - prompt_token_cost = 0.06 / 1_000 - completion_token_cost = 0.12 / 1_000 - else: - # gpt-4 regular (8k context) - prompt_token_cost = 0.03 / 1_000 - completion_token_cost = 0.06 / 1_000 - elif openai_model_name.startswith("text-embedding-ada-002"): - prompt_token_cost = 0.0001 / 1_000 - completion_token_cost = 0.0001 / 1_000 - else: - # no idea of cost - print(f"NO IDEA OF COST, pricing not supported for model model: `{openai_model_name}`") - prompt_token_cost = 0 - completion_token_cost = 0 - - if completion == '': - num_tokens_prompt: int = len(encoding.encode(prompt)) - prompt_cost = float(prompt_token_cost * num_tokens_prompt) - return num_tokens_prompt, prompt_cost - elif prompt == '': - num_tokens_completion: int = len(encoding.encode(completion)) - completion_cost = float(completion_token_cost * num_tokens_completion) - return num_tokens_completion, completion_cost - else: - num_tokens_prompt: int = len(encoding.encode(prompt)) - num_tokens_completion: int = len(encoding.encode(completion)) - prompt_cost = float(prompt_token_cost * num_tokens_prompt) - completion_cost = float(completion_token_cost * num_tokens_completion) - return num_tokens_prompt, prompt_cost, num_tokens_completion, completion_cost - -# from dotenv import load_dotenv - -# load_dotenv() - -def analyze_conversations(supabase_client: Any = None): - - if supabase_client is None: - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - # Get all conversations - response = supabase_client.table('llm-convo-monitor').select('convo').execute() - # print("total entries", response.data.count) - - total_convos = 0 - total_messages = 0 - total_prompt_cost = 0 - total_completion_cost = 0 - - # Iterate through all conversations - # for convo in response['data']: - for convo in response.data: - total_convos += 1 - # print(convo) - # prase json from convo - # parse json into dict - # print(type(convo)) - # convo = json.loads(convo) - convo = convo['convo'] - messages = convo['messages'] - model_name = convo['model']['name'] - - # Iterate through all messages in each conversation - for message in messages: - total_messages += 1 - role = message['role'] - content = message['content'] - - # If the message is from the user, it's a prompt - # TODO: Fix these - # WARNING: Fix these error messages... they are the sign of a logic bug. - if role == 'user': - num_tokens, cost = count_tokens_and_cost(prompt=content, openai_model_name=model_name) - total_prompt_cost += cost - print(f'User Prompt: {content}, Tokens: {num_tokens}, cost: {cost}') - - # If the message is from the assistant, it's a completion - elif role == 'assistant': - num_tokens_completion, cost_completion = count_tokens_and_cost(prompt='', completion=content, openai_model_name=model_name) - total_completion_cost += cost_completion - print(f'Assistant Completion: {content}\nTokens: {num_tokens_completion}, cost: {cost_completion}') - return total_convos, total_messages, total_prompt_cost, total_completion_cost - -if __name__ == '__main__': - pass - -# if __name__ == '__main__': -# print('starting main') -# total_convos, total_messages, total_prompt_cost, total_completion_cost = analyze_conversations() -# print(f'total_convos: {total_convos}, total_messages: {total_messages}') +import json +import os +from typing import Any, List + +import supabase +import tiktoken + + +def count_tokens_and_cost(prompt: str, completion: str = '', openai_model_name: str = "gpt-3.5-turbo"): # -> tuple[int, float] | tuple[int, float, int, float]: + """ + Returns the number of tokens in a text string. + + Only the first parameter is required, a string of text to measure. The completion and model name are optional. + + num_tokens, prompt_cost = count_tokens_and_cost(prompt="hello there") + num_tokens_prompt, prompt_cost, num_tokens_completion, completion_cost = count_tokens_and_cost(prompt="hello there", completion="how are you?") + + Args: + prompt (str): _description_ + completion (str, optional): _description_. Defaults to ''. + openai_model_name (str, optional): _description_. Defaults to "gpt-3.5-turbo". + + Returns: + tuple[int, float] | tuple[int, float, int, float]: Returns the number of tokens consumed and the cost. The total cost you'll be billed is the sum of each individual cost (prompt_cost + completion_cost) + """ + # encoding = tiktoken.encoding_for_model(openai_model_name) + openai_model_name = openai_model_name.lower() + encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") # I think they all use the same encoding + prompt_cost = 0 + completion_cost = 0 + + prompt_token_cost = 0 + completion_token_cost = 0 + + if openai_model_name.startswith("gpt-3.5-turbo"): + if "16k" in openai_model_name: + prompt_token_cost: float = 0.003 / 1_000 + completion_token_cost: float = 0.004 / 1_000 + else: + # 3.5-turbo regular (4k context) + prompt_token_cost: float = 0.0015 / 1_000 + completion_token_cost: float = 0.002 / 1_000 + + elif openai_model_name.startswith("gpt-4"): + if "32k" in openai_model_name: + prompt_token_cost = 0.06 / 1_000 + completion_token_cost = 0.12 / 1_000 + else: + # gpt-4 regular (8k context) + prompt_token_cost = 0.03 / 1_000 + completion_token_cost = 0.06 / 1_000 + elif openai_model_name.startswith("text-embedding-ada-002"): + prompt_token_cost = 0.0001 / 1_000 + completion_token_cost = 0.0001 / 1_000 + else: + # no idea of cost + print(f"NO IDEA OF COST, pricing not supported for model model: `{openai_model_name}`") + prompt_token_cost = 0 + completion_token_cost = 0 + + if completion == '': + num_tokens_prompt: int = len(encoding.encode(prompt)) + prompt_cost = float(prompt_token_cost * num_tokens_prompt) + return num_tokens_prompt, prompt_cost + elif prompt == '': + num_tokens_completion: int = len(encoding.encode(completion)) + completion_cost = float(completion_token_cost * num_tokens_completion) + return num_tokens_completion, completion_cost + else: + num_tokens_prompt: int = len(encoding.encode(prompt)) + num_tokens_completion: int = len(encoding.encode(completion)) + prompt_cost = float(prompt_token_cost * num_tokens_prompt) + completion_cost = float(completion_token_cost * num_tokens_completion) + return num_tokens_prompt, prompt_cost, num_tokens_completion, completion_cost + +# from dotenv import load_dotenv + +# load_dotenv() + +def analyze_conversations(supabase_client: Any = None): + + if supabase_client is None: + supabase_client = supabase.create_client( # type: ignore + supabase_url=os.getenv('SUPABASE_URL'), # type: ignore + supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore + # Get all conversations + response = supabase_client.table('llm-convo-monitor').select('convo').execute() + # print("total entries", response.data.count) + + total_convos = 0 + total_messages = 0 + total_prompt_cost = 0 + total_completion_cost = 0 + + # Iterate through all conversations + # for convo in response['data']: + for convo in response.data: + total_convos += 1 + # print(convo) + # prase json from convo + # parse json into dict + # print(type(convo)) + # convo = json.loads(convo) + convo = convo['convo'] + messages = convo['messages'] + model_name = convo['model']['name'] + + # Iterate through all messages in each conversation + for message in messages: + total_messages += 1 + role = message['role'] + content = message['content'] + + # If the message is from the user, it's a prompt + # TODO: Fix these + # WARNING: Fix these error messages... they are the sign of a logic bug. + if role == 'user': + num_tokens, cost = count_tokens_and_cost(prompt=content, openai_model_name=model_name) + total_prompt_cost += cost + print(f'User Prompt: {content}, Tokens: {num_tokens}, cost: {cost}') + + # If the message is from the assistant, it's a completion + elif role == 'assistant': + num_tokens_completion, cost_completion = count_tokens_and_cost(prompt='', completion=content, openai_model_name=model_name) + total_completion_cost += cost_completion + print(f'Assistant Completion: {content}\nTokens: {num_tokens_completion}, cost: {cost_completion}') + return total_convos, total_messages, total_prompt_cost, total_completion_cost + +if __name__ == '__main__': + pass + +# if __name__ == '__main__': +# print('starting main') +# total_convos, total_messages, total_prompt_cost, total_completion_cost = analyze_conversations() +# print(f'total_convos: {total_convos}, total_messages: {total_messages}') # print(f'total_prompt_cost: {total_prompt_cost}, total_completion_cost: {total_completion_cost}') \ No newline at end of file diff --git a/ai_ta_backend/web_scrape.py b/ai_ta_backend/web_scrape.py index f77d695a..36158db9 100644 --- a/ai_ta_backend/web_scrape.py +++ b/ai_ta_backend/web_scrape.py @@ -1,467 +1,467 @@ -import os -import re -import shutil -import time -from tempfile import NamedTemporaryFile -from zipfile import ZipFile - -import boto3 # type: ignore -import requests -from bs4 import BeautifulSoup - -import supabase - -from ai_ta_backend.aws import upload_data_files_to_s3 -from ai_ta_backend.vector_database import Ingest -import mimetypes - -def get_file_extension(filename): - match = re.search(r'\.([a-zA-Z0-9]+)$', filename) - valid_filetypes = list(mimetypes.types_map.keys()) - valid_filetypes = valid_filetypes + ['.html', '.py', '.vtt', '.pdf', '.txt', '.srt', '.docx', '.ppt', '.pptx'] - if match: - filetype = "." + match.group(1) - if filetype in valid_filetypes: - return filetype - else: - return '.html' - else: - return '.html' - -def valid_url(url): - '''Returns the URL and it's content if it's good, otherwise returns false. Prints the status code.''' - try: - response = requests.get(url, allow_redirects=True, timeout=20) - - redirect_loop_counter = 0 - while response.status_code == 301: - # Check for permanent redirect - if redirect_loop_counter > 3: - print("❌ Redirect loop (on 301 error) exceeded redirect limit of:", redirect_loop_counter, "❌") - return False - redirect_url = response.headers['Location'] - response = requests.head(redirect_url) - redirect_loop_counter += 1 - if response.status_code == 200: - filetype = get_file_extension(response.url) - print("file extension:", filetype) - if filetype == '.html': - content = BeautifulSoup(response.content, "html.parser") - if " len(urls): - max_urls = max_urls - len(urls) - elif max_urls < len(urls): - urls = urls[:max_urls] - max_urls = 0 - else: - max_urls = 0 - # We grab content out of these urls - - for url in urls: - if base_url_on: - if url.startswith(site): - url, s, filetype = valid_url(url) - if url: - print("Scraped:", url) - url_contents.append((url, s, filetype)) - else: - _invalid_urls.append(url) - else: - pass - else: - url, s, filetype = valid_url(url) - if url: - print("Scraped:", url) - url_contents.append((url, s, filetype)) - else: - _invalid_urls.append(url) - print("existing urls", _existing_urls) - url_contents = remove_duplicates(url_contents, _existing_urls) - max_urls = max_urls - len(url_contents) - print(max_urls, "urls left") - - # recursively go through crawler until we reach the max amount of urls. - for url in url_contents: - if url[0] not in _invalid_urls: - if max_urls > 0: - if _depth < max_depth: - temp_data = crawler(url[0], max_urls, max_depth, timeout, _invalid_urls, _depth, url[1], url[2]) - print("existing urls", _existing_urls) - temp_data = remove_duplicates(temp_data, _existing_urls) - max_urls = max_urls - len(temp_data) - print(max_urls, "urls left") - url_contents.extend(temp_data) - url_contents = remove_duplicates(url_contents, _existing_urls) - else: - print("Depth exceeded:", _depth+1, "out of", max_depth) - break - else: - break - else: - pass - - if _depth == 0: - if len(url_contents) < amount: - print("Max URLS not reached, returning all urls found:", len(url_contents), "out of", amount) - elif len(url_contents) == amount: - print("Max URLS reached:", len(url_contents), "out of", amount) - else: - print("Exceeded Max URLS, found:", len(url_contents), "out of", amount) - print(len(url_contents), "urls found") - - # Free up memory - # del url_contents[:] - # del urls[:] - # if _invalid_urls is not None: - # del _invalid_urls[:] - # if _existing_urls is not None: - # del _existing_urls[:] - # gc.collect() - - return url_contents - -def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, timeout:int=1, stay_on_baseurl:bool=False): - """ - Crawl a site and scrape its content and PDFs, then upload the data to S3 and ingest it. - - Args: - url (str): The URL of the site to crawl. - course_name (str): The name of the course to associate with the crawled data. - max_urls (int, optional): The maximum number of URLs to crawl. Defaults to 100. - max_depth (int, optional): The maximum depth of URLs to crawl. Defaults to 3. - timeout (int, optional): The number of seconds to wait between requests. Defaults to 1. - - Returns: - None - """ - print("\n") - max_urls = int(max_urls) - max_depth = int(max_depth) - timeout = int(timeout) - stay_on_baseurl = bool(stay_on_baseurl) - if stay_on_baseurl: - stay_on_baseurl = base_url(url) - print(stay_on_baseurl) - - ingester = Ingest() - s3_client = boto3.client( - 's3', - aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), - aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), - ) - - # Check for GitHub repository coming soon - if url.startswith("https://github.com/"): - print("Begin Ingesting GitHub page") - results = ingester.ingest_github(url, course_name) - print("Finished ingesting GitHub page") - del ingester - return results - else: - try: - print("Gathering existing urls from Supabase") - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - urls = supabase_client.table(os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE')).select('course_name, url, contexts').eq('course_name', course_name).execute() - del supabase_client - if urls.data == []: - existing_urls = None - else: - existing_urls = [] - for thing in urls.data: - whole = '' - for t in thing['contexts']: - whole += t['text'] - existing_urls.append((thing['url'], whole)) - print("Finished gathering existing urls from Supabase") - except Exception as e: - print("Error:", e) - print("Could not gather existing urls from Supabase") - existing_urls = None - - print("Begin Ingesting Web page") - data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=stay_on_baseurl, _existing_urls=existing_urls) - - # Clean some keys for a proper file name - # todo: have a default title - # titles = [value[1][1].title.string for value in data] - - titles = [] - for value in data: - try: - titles.append(value[1].title.string) - except AttributeError as e: - # if no title - try: - placeholder_title = re.findall(pattern=r'[a-zA-Z0-9.]*[a-z]', string=value[0])[1] - except Exception as e: - placeholder_title = "Title Not Found" - titles.append(placeholder_title) - print(f"URL is missing a title, using this title instead: {placeholder_title}") - - try: - clean = [re.match(r"[a-zA-Z0-9\s]*", title).group(0) for title in titles] # type: ignore - except Exception as e: - print("Error:", e) - clean = titles - print("title names after regex before cleaning", clean) - path_name = [] - counter = 0 - for value in clean: - value = value.strip() if value else "" - # value = value.strip() - value = value.replace(" ", "_") - if value == "403_Forbidden": - print("Found Forbidden Key, deleting data") - del data[counter] - counter -= 1 - else: - path_name.append(value) - counter += 1 - print("Cleaned title names", path_name) - - # Upload each html to S3 - print("Uploading files to S3") - paths = [] - counter = 0 - try: - for i, key in enumerate(data): - with NamedTemporaryFile(suffix=key[2]) as temp_file: - if key[1] != "" or key[1] != None: - if key[2] == ".html": - print("Writing", key[2] ,"to temp file") - temp_file.write(key[1].encode('utf-8')) - else: - print("Writing", key[2] ,"to temp file") - temp_file.write(key[1]) - temp_file.seek(0) - s3_upload_path = "courses/"+ course_name + "/" + path_name[i] + key[2] - paths.append(s3_upload_path) - with open(temp_file.name, 'rb') as f: - print("Uploading", key[2] ,"to S3") - s3_client.upload_fileobj(f, os.getenv('S3_BUCKET_NAME'), s3_upload_path) - ingester.bulk_ingest(s3_upload_path, course_name=course_name, url=key[0], base_url=url) - counter += 1 - else: - print("No", key[2] ,"to upload", key[1]) - except Exception as e: - print("Error in upload:", e) - finally: - del ingester - - print(f"Successfully uploaded files to s3: {counter}") - print("Finished /web-scrape") - -# Download an MIT course using its url -def mit_course_download(url:str, course_name:str, local_dir:str): - ingester = Ingest() - base = "https://ocw.mit.edu" - if url.endswith("download"): - pass - else: - url = url + "download" - - r = requests.get(url) - soup = BeautifulSoup(r.text,"html.parser") - - zip = '' - for ref in soup.find_all("a"): - if ref.attrs['href'].endswith("zip"): - zip = ref.attrs['href'] - - site = zip - print('site', site) - r = requests.get(url=site, stream=True) - - zip_file = local_dir + ".zip" - - try: - with open(zip_file, 'wb') as fd: - for chunk in r.iter_content(chunk_size=128): - fd.write(chunk) - print("course downloaded!") - except Exception as e: - print("Error:", e, site) - - with ZipFile(zip_file, 'r') as zObject: - zObject.extractall( - path=local_dir) - - shutil.move(local_dir+"/"+"robots.txt", local_dir+"/static_resources") - s3_paths = upload_data_files_to_s3(course_name, local_dir+"/static_resources") - success_fail = ingester.bulk_ingest(s3_paths, course_name) # type: ignore - - shutil.move(zip_file, local_dir) - shutil.rmtree(local_dir) - del ingester - print("Finished Ingest") - return success_fail - -if __name__ == '__main__': - pass +import os +import re +import shutil +import time +from tempfile import NamedTemporaryFile +from zipfile import ZipFile + +import boto3 # type: ignore +import requests +from bs4 import BeautifulSoup + +import supabase + +from ai_ta_backend.aws import upload_data_files_to_s3 +from ai_ta_backend.vector_database import Ingest +import mimetypes + +def get_file_extension(filename): + match = re.search(r'\.([a-zA-Z0-9]+)$', filename) + valid_filetypes = list(mimetypes.types_map.keys()) + valid_filetypes = valid_filetypes + ['.html', '.py', '.vtt', '.pdf', '.txt', '.srt', '.docx', '.ppt', '.pptx'] + if match: + filetype = "." + match.group(1) + if filetype in valid_filetypes: + return filetype + else: + return '.html' + else: + return '.html' + +def valid_url(url): + '''Returns the URL and it's content if it's good, otherwise returns false. Prints the status code.''' + try: + response = requests.get(url, allow_redirects=True, timeout=20) + + redirect_loop_counter = 0 + while response.status_code == 301: + # Check for permanent redirect + if redirect_loop_counter > 3: + print("❌ Redirect loop (on 301 error) exceeded redirect limit of:", redirect_loop_counter, "❌") + return False + redirect_url = response.headers['Location'] + response = requests.head(redirect_url) + redirect_loop_counter += 1 + if response.status_code == 200: + filetype = get_file_extension(response.url) + print("file extension:", filetype) + if filetype == '.html': + content = BeautifulSoup(response.content, "html.parser") + if " len(urls): + max_urls = max_urls - len(urls) + elif max_urls < len(urls): + urls = urls[:max_urls] + max_urls = 0 + else: + max_urls = 0 + # We grab content out of these urls + + for url in urls: + if base_url_on: + if url.startswith(site): + url, s, filetype = valid_url(url) + if url: + print("Scraped:", url) + url_contents.append((url, s, filetype)) + else: + _invalid_urls.append(url) + else: + pass + else: + url, s, filetype = valid_url(url) + if url: + print("Scraped:", url) + url_contents.append((url, s, filetype)) + else: + _invalid_urls.append(url) + print("existing urls", _existing_urls) + url_contents = remove_duplicates(url_contents, _existing_urls) + max_urls = max_urls - len(url_contents) + print(max_urls, "urls left") + + # recursively go through crawler until we reach the max amount of urls. + for url in url_contents: + if url[0] not in _invalid_urls: + if max_urls > 0: + if _depth < max_depth: + temp_data = crawler(url[0], max_urls, max_depth, timeout, _invalid_urls, _depth, url[1], url[2]) + print("existing urls", _existing_urls) + temp_data = remove_duplicates(temp_data, _existing_urls) + max_urls = max_urls - len(temp_data) + print(max_urls, "urls left") + url_contents.extend(temp_data) + url_contents = remove_duplicates(url_contents, _existing_urls) + else: + print("Depth exceeded:", _depth+1, "out of", max_depth) + break + else: + break + else: + pass + + if _depth == 0: + if len(url_contents) < amount: + print("Max URLS not reached, returning all urls found:", len(url_contents), "out of", amount) + elif len(url_contents) == amount: + print("Max URLS reached:", len(url_contents), "out of", amount) + else: + print("Exceeded Max URLS, found:", len(url_contents), "out of", amount) + print(len(url_contents), "urls found") + + # Free up memory + # del url_contents[:] + # del urls[:] + # if _invalid_urls is not None: + # del _invalid_urls[:] + # if _existing_urls is not None: + # del _existing_urls[:] + # gc.collect() + + return url_contents + +def main_crawler(url:str, course_name:str, max_urls:int=100, max_depth:int=3, timeout:int=1, stay_on_baseurl:bool=False): + """ + Crawl a site and scrape its content and PDFs, then upload the data to S3 and ingest it. + + Args: + url (str): The URL of the site to crawl. + course_name (str): The name of the course to associate with the crawled data. + max_urls (int, optional): The maximum number of URLs to crawl. Defaults to 100. + max_depth (int, optional): The maximum depth of URLs to crawl. Defaults to 3. + timeout (int, optional): The number of seconds to wait between requests. Defaults to 1. + + Returns: + None + """ + print("\n") + max_urls = int(max_urls) + max_depth = int(max_depth) + timeout = int(timeout) + stay_on_baseurl = bool(stay_on_baseurl) + if stay_on_baseurl: + stay_on_baseurl = base_url(url) + print(stay_on_baseurl) + + ingester = Ingest() + s3_client = boto3.client( + 's3', + aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), + ) + + # Check for GitHub repository coming soon + if url.startswith("https://github.com/"): + print("Begin Ingesting GitHub page") + results = ingester.ingest_github(url, course_name) + print("Finished ingesting GitHub page") + del ingester + return results + else: + try: + print("Gathering existing urls from Supabase") + supabase_client = supabase.create_client( # type: ignore + supabase_url=os.getenv('SUPABASE_URL'), # type: ignore + supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore + urls = supabase_client.table(os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE')).select('course_name, url, contexts').eq('course_name', course_name).execute() + del supabase_client + if urls.data == []: + existing_urls = None + else: + existing_urls = [] + for thing in urls.data: + whole = '' + for t in thing['contexts']: + whole += t['text'] + existing_urls.append((thing['url'], whole)) + print("Finished gathering existing urls from Supabase") + except Exception as e: + print("Error:", e) + print("Could not gather existing urls from Supabase") + existing_urls = None + + print("Begin Ingesting Web page") + data = crawler(url=url, max_urls=max_urls, max_depth=max_depth, timeout=timeout, base_url_on=stay_on_baseurl, _existing_urls=existing_urls) + + # Clean some keys for a proper file name + # todo: have a default title + # titles = [value[1][1].title.string for value in data] + + titles = [] + for value in data: + try: + titles.append(value[1].title.string) + except AttributeError as e: + # if no title + try: + placeholder_title = re.findall(pattern=r'[a-zA-Z0-9.]*[a-z]', string=value[0])[1] + except Exception as e: + placeholder_title = "Title Not Found" + titles.append(placeholder_title) + print(f"URL is missing a title, using this title instead: {placeholder_title}") + + try: + clean = [re.match(r"[a-zA-Z0-9\s]*", title).group(0) for title in titles] # type: ignore + except Exception as e: + print("Error:", e) + clean = titles + print("title names after regex before cleaning", clean) + path_name = [] + counter = 0 + for value in clean: + value = value.strip() if value else "" + # value = value.strip() + value = value.replace(" ", "_") + if value == "403_Forbidden": + print("Found Forbidden Key, deleting data") + del data[counter] + counter -= 1 + else: + path_name.append(value) + counter += 1 + print("Cleaned title names", path_name) + + # Upload each html to S3 + print("Uploading files to S3") + paths = [] + counter = 0 + try: + for i, key in enumerate(data): + with NamedTemporaryFile(suffix=key[2]) as temp_file: + if key[1] != "" or key[1] != None: + if key[2] == ".html": + print("Writing", key[2] ,"to temp file") + temp_file.write(key[1].encode('utf-8')) + else: + print("Writing", key[2] ,"to temp file") + temp_file.write(key[1]) + temp_file.seek(0) + s3_upload_path = "courses/"+ course_name + "/" + path_name[i] + key[2] + paths.append(s3_upload_path) + with open(temp_file.name, 'rb') as f: + print("Uploading", key[2] ,"to S3") + s3_client.upload_fileobj(f, os.getenv('S3_BUCKET_NAME'), s3_upload_path) + ingester.bulk_ingest(s3_upload_path, course_name=course_name, url=key[0], base_url=url) + counter += 1 + else: + print("No", key[2] ,"to upload", key[1]) + except Exception as e: + print("Error in upload:", e) + finally: + del ingester + + print(f"Successfully uploaded files to s3: {counter}") + print("Finished /web-scrape") + +# Download an MIT course using its url +def mit_course_download(url:str, course_name:str, local_dir:str): + ingester = Ingest() + base = "https://ocw.mit.edu" + if url.endswith("download"): + pass + else: + url = url + "download" + + r = requests.get(url) + soup = BeautifulSoup(r.text,"html.parser") + + zip = '' + for ref in soup.find_all("a"): + if ref.attrs['href'].endswith("zip"): + zip = ref.attrs['href'] + + site = zip + print('site', site) + r = requests.get(url=site, stream=True) + + zip_file = local_dir + ".zip" + + try: + with open(zip_file, 'wb') as fd: + for chunk in r.iter_content(chunk_size=128): + fd.write(chunk) + print("course downloaded!") + except Exception as e: + print("Error:", e, site) + + with ZipFile(zip_file, 'r') as zObject: + zObject.extractall( + path=local_dir) + + shutil.move(local_dir+"/"+"robots.txt", local_dir+"/static_resources") + s3_paths = upload_data_files_to_s3(course_name, local_dir+"/static_resources") + success_fail = ingester.bulk_ingest(s3_paths, course_name) # type: ignore + + shutil.move(zip_file, local_dir) + shutil.rmtree(local_dir) + del ingester + print("Finished Ingest") + return success_fail + +if __name__ == '__main__': + pass