diff --git a/README.md b/README.md index 8e358371..6819475a 100644 --- a/README.md +++ b/README.md @@ -46,26 +46,6 @@ The ChatGPT Telegram Bot is a powerful Telegram bot that utilizes the latest GPT | GOOGLE_AI_API_KEY | Google AI offical API key. | No | | GROQ_API_KEY | Groq AI offical API key. | No | -## 🔌 Plugins - -Our plugin system has been successfully developed and is now fully operational. We welcome everyone to contribute their code to enrich our plugin library. All plugins can be activated or deactivated using the `/info` command. The following plugins are currently supported: - -- **Web Search**: By default, DuckDuckGo search is provided. Google search is automatically activated when the `GOOGLE_CSE_ID` and `GOOGLE_API_KEY` environment variables are set. -- **Time Retrieval**: Retrieves the current time, date, and day of the week in the GMT+8 time zone. -- **URL Summary**: Automatically extracts URLs from queries and responds based on the content of the URLs. -- **Version Information**: Displays the current version of the bot, commit hash, update time, and developer name. - -To develop plugins, please follow the steps outlined below: - -- Initially, you need to add the environment variable for the plugin in the `config.PLUGINS` dictionary located in the `config.py` file. The value can be customized to be either enabled or disabled by default. It is advisable to use uppercase letters for the entire environment variable. -- Subsequently, append the function's name and description in the `utils/function_call.py` file. -- Then, enhance the `ask_stream` function in the `utils/chatgpt2api.py` file with the function's processing logic. You can refer to the existing examples within the `ask_stream` method for guidance on how to write it. -- Following that, write the function, as mentioned in the `utils/function_call.py` file, in the `utils/plugins.py` file. -- Next, in the `bot.py` file, augment the `update_first_buttons_message` function with buttons, enabling users to freely toggle plugins using the `info` command. -- Lastly, don't forget to add the plugin's description in the plugins section of the README. - -Please note that the above steps are a general guide and may need to be adjusted based on the specific requirements of your plugin. - ## Zeabur Remote Deployment (Recommended) One-click deployment: @@ -272,7 +252,7 @@ In a group chat scenario, if the environment variable `NICK` is not set, the bot - How many messages will the history keep? -Apart from the latest `gpt-4-turbo-preview` model, the official context supports 128k tokens, but this project limits it to 16k tokens. All other models use the official context length settings, for example, the `gpt-3.5-turbo-16k` context is 16k, the `gpt-4-32k` context is 32k, and the `Claude2` context is 200k. This limitation is implemented to save user costs, as most scenarios do not require a high context. If you have specific needs, you can modify the context limits for each model in the `utils/chatgpt2api.py` file. +Apart from the latest `gpt-4-turbo-preview` model, the official context supports 128k tokens, but this project limits it to 16k tokens. All other models use the official context length settings, for example, the `gpt-3.5-turbo-16k` context is 16k, the `gpt-4-32k` context is 32k, and the `Claude2` context is 200k. This limitation is implemented to save user costs, as most scenarios do not require a high context. ## References diff --git a/bot.py b/bot.py index 7fc06132..d474ea4f 100644 --- a/bot.py +++ b/bot.py @@ -1,20 +1,22 @@ import re -import os import sys sys.dont_write_bytecode = True -import config import logging import traceback import utils.decorators as decorators from md2tgmd import escape -from utils.chatgpt2api import Chatbot as GPT -from utils.chatgpt2api import claudebot, groqbot, claude3bot, gemini_bot -from utils.prompt import translator_en2zh_prompt, translator_prompt, claude3_doc_assistant_prompt + +from ModelMerge.models import chatgpt, claude, groq, claude3, gemini +from ModelMerge.models.config import PLUGINS +from ModelMerge.utils.prompt import translator_en2zh_prompt, translator_prompt, claude3_doc_assistant_prompt +from ModelMerge.utils.scripts import Document_extract, get_encode_image, claude_replace + +import config +from config import WEB_HOOK, PORT, BOT_TOKEN, update_first_buttons_message, buttons + from telegram.constants import ChatAction -from utils.plugins import Document_extract, get_encode_image, claude_replace from telegram import BotCommand, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent from telegram.ext import CommandHandler, MessageHandler, ApplicationBuilder, filters, CallbackQueryHandler, Application, AIORateLimiter, InlineQueryHandler -from config import WEB_HOOK, PORT, BOT_TOKEN, update_first_buttons_message, buttons logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -269,7 +271,7 @@ async def image(update, context): start_messageid = message.message_id try: - for data in robot.dall_e_3(text): + for data in robot.generate(text): result = data await context.bot.delete_message(chat_id=chatid, message_id=start_messageid) await context.bot.send_photo(chat_id=chatid, photo=result, reply_to_message_id=messageid) @@ -329,16 +331,16 @@ async def button_press(update, context): config.GPT_ENGINE = data # print("config.GPT_ENGINE", config.GPT_ENGINE) if (config.API and "gpt-" in data) or (config.API and not config.ClaudeAPI) or (config.API and config.CUSTOM_MODELS and data in config.CUSTOM_MODELS): - config.ChatGPTbot = GPT(api_key=f"{config.API}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.ChatGPTbot = chatgpt(api_key=f"{config.API}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) config.ChatGPTbot.reset(convo_id=str(update.effective_chat.id), system_prompt=config.systemprompt) if config.ClaudeAPI and "claude-2.1" in data: - config.claudeBot = claudebot(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) + config.claudeBot = claude(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) if config.ClaudeAPI and "claude-3" in data: - config.claude3Bot = claude3bot(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) + config.claude3Bot = claude3(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) if config.GROQ_API_KEY and ("mixtral" in data or "llama" in data): - config.groqBot = groqbot(api_key=f"{config.GROQ_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.groqBot = groq(api_key=f"{config.GROQ_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) if config.GOOGLE_AI_API_KEY and "gemini" in data: - config.gemini_Bot = gemini_bot(api_key=f"{config.GOOGLE_AI_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.gemini_Bot = gemini(api_key=f"{config.GOOGLE_AI_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) try: info_message = update_info_message(update) if info_message + banner != callback_query.message.text: @@ -373,15 +375,15 @@ async def button_press(update, context): config.claude_systemprompt = config.claude_systemprompt.replace("English", "Simplified Chinese") # config.systemprompt = f"You are ChatGPT, a large language model trained by OpenAI. Respond conversationally in {config.LANGUAGE}. Knowledge cutoff: 2021-09. Current date: [ {config.Current_Date} ]" if config.API: - config.ChatGPTbot = GPT(api_key=f"{config.API}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.ChatGPTbot = chatgpt(api_key=f"{config.API}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) config.ChatGPTbot.reset(convo_id=str(update.effective_chat.id), system_prompt=config.systemprompt) if config.ClaudeAPI: - config.claudeBot = claudebot(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) - config.claude3Bot = claude3bot(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) + config.claudeBot = claude(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) + config.claude3Bot = claude3(api_key=f"{config.ClaudeAPI}", engine=config.GPT_ENGINE, system_prompt=config.claude_systemprompt, temperature=config.temperature) if config.GROQ_API_KEY: - config.groqBot = groqbot(api_key=f"{config.GROQ_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.groqBot = groq(api_key=f"{config.GROQ_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) if config.GOOGLE_AI_API_KEY: - config.gemini_Bot = gemini_bot(api_key=f"{config.GOOGLE_AI_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) + config.gemini_Bot = gemini(api_key=f"{config.GOOGLE_AI_API_KEY}", engine=config.GPT_ENGINE, system_prompt=config.systemprompt, temperature=config.temperature) info_message = update_info_message(update) message = await callback_query.edit_message_text( @@ -391,7 +393,7 @@ async def button_press(update, context): ) else: try: - config.PLUGINS[data] = not config.PLUGINS[data] + PLUGINS[data] = not PLUGINS[data] except: setattr(config, data, not getattr(config, data)) info_message = update_info_message(update) diff --git a/config.py b/config.py index d1cb8e6b..f2a932a3 100644 --- a/config.py +++ b/config.py @@ -1,7 +1,7 @@ import os from dotenv import load_dotenv load_dotenv() -import utils.prompt as prompt + from telegram import InlineKeyboardButton WEB_HOOK = os.environ.get('WEB_HOOK', None) @@ -28,32 +28,32 @@ CUSTOM_MODELS_LIST = None +from ModelMerge.utils import prompt from datetime import datetime current_date = datetime.now() Current_Date = current_date.strftime("%Y-%m-%d") systemprompt = os.environ.get('SYSTEMPROMPT', prompt.system_prompt.format(LANGUAGE, Current_Date)) claude_systemprompt = os.environ.get('SYSTEMPROMPT', prompt.claude_system_prompt) -from utils.chatgpt2api import Chatbot as GPT -from utils.chatgpt2api import Imagebot, claudebot, groqbot, claude3bot, gemini_bot +from ModelMerge.models import chatgpt, claude, groq, claude3, gemini, dalle3 if API: - ChatGPTbot = GPT(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=systemprompt, temperature=temperature) + ChatGPTbot = chatgpt(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=systemprompt, temperature=temperature) - translate_bot = GPT(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=systemprompt, temperature=temperature) - copilot_bot = GPT(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=prompt.search_system_prompt.format(LANGUAGE), temperature=temperature) - dallbot = Imagebot(api_key=f"{API}") + translate_bot = chatgpt(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=systemprompt, temperature=temperature) + copilot_bot = chatgpt(api_key=f"{API}", engine=GPT_ENGINE, system_prompt=prompt.search_system_prompt.format(LANGUAGE), temperature=temperature) + dallbot = dalle3(api_key=f"{API}") else: ChatGPTbot = None ClaudeAPI = os.environ.get('claude_api_key', None) if ClaudeAPI: - claudeBot = claudebot(api_key=f"{ClaudeAPI}", system_prompt=claude_systemprompt) - claude3Bot = claude3bot(api_key=f"{ClaudeAPI}", system_prompt=claude_systemprompt) + claudeBot = claude(api_key=f"{ClaudeAPI}", system_prompt=claude_systemprompt) + claude3Bot = claude3(api_key=f"{ClaudeAPI}", system_prompt=claude_systemprompt) if GROQ_API_KEY: - groqBot = groqbot(api_key=f"{GROQ_API_KEY}") + groqBot = groq(api_key=f"{GROQ_API_KEY}") if GOOGLE_AI_API_KEY: - gemini_Bot = gemini_bot(api_key=f"{GOOGLE_AI_API_KEY}") + gemini_Bot = gemini(api_key=f"{GOOGLE_AI_API_KEY}") whitelist = os.environ.get('whitelist', None) if whitelist: @@ -65,14 +65,6 @@ if GROUP_LIST: GROUP_LIST = [int(id) for id in GROUP_LIST.split(",")] -PLUGINS = { - "SEARCH_USE_GPT": (os.environ.get('SEARCH_USE_GPT', "True") == "False") == False, - # "USE_G4F": (os.environ.get('USE_G4F', "False") == "False") == False, - "DATE": True, - "URL": True, - "VERSION": True, -} - class userConfig: def __init__(self, user_id: int): self.user_id = user_id @@ -83,21 +75,7 @@ def __init__(self, user_id: int): self.search_system_prompt = prompt.search_system_prompt.format(self.language) self.search_model = "gpt-3.5-turbo-1106" -class openaiAPI: - def __init__( - self, - api_url: str = (os.environ.get("API_URL") or "https://api.openai.com/v1/chat/completions"), - ): - from urllib.parse import urlparse, urlunparse - self.source_api_url: str = api_url - parsed_url = urlparse(self.source_api_url) - self.base_url: str = urlunparse(parsed_url[:2] + ("",) * 4) - self.v1_url: str = urlunparse(parsed_url[:2] + ("/v1",) + ("",) * 3) - self.chat_url: str = urlunparse(parsed_url[:2] + ("/v1/chat/completions",) + ("",) * 3) - self.image_url: str = urlunparse(parsed_url[:2] + ("/v1/images/generations",) + ("",) * 3) - -bot_api_url = openaiAPI() - +from ModelMerge.models.config import PLUGINS def get_plugins_status(item): return "✅" if PLUGINS[item] else "☑️" @@ -181,13 +159,12 @@ def update_first_buttons_message(): InlineKeyboardButton(f"历史记录 {history}", callback_data="PASS_HISTORY"), ], [ - InlineKeyboardButton(f"搜索 {get_plugins_status('SEARCH_USE_GPT')}", callback_data='SEARCH_USE_GPT'), + InlineKeyboardButton(f"搜索 {get_plugins_status('SEARCH')}", callback_data='SEARCH'), InlineKeyboardButton(f"当前时间 {get_plugins_status('DATE')}", callback_data='DATE'), ], [ InlineKeyboardButton(f"URL 总结 {get_plugins_status('URL')}", callback_data='URL'), InlineKeyboardButton(f"版本信息 {get_plugins_status('VERSION')}", callback_data='VERSION'), - # InlineKeyboardButton(f"gpt4free {get_plugins_status('USE_G4F')}", callback_data='USE_G4F'), ], ] return first_buttons \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 7c0f529e..ad088a3a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,8 @@ --index-url https://pypi.python.org/simple/ -requests -tiktoken==0.6.0 -md2tgmd==0.1.9 -# jieba +pytz python-dotenv -beautifulsoup4 -lxml -python-telegram-bot[webhooks,rate-limiter]==21.0.1 -# python-telegram-bot[webhooks,rate-limiter]==20.6 - -# langchain -# chromadb -# unstructured[md,pdf] +md2tgmd==0.1.9 fake_useragent -openai==0.28.1 -google-api-python-client -duckduckgo-search==5.3.0 -langchain==0.0.271 +ModelMerge==0.2.9 oauth2client==3.0.0 -pdfminer.six -# g4f==0.1.9.6 - -# plugin -pytz \ No newline at end of file +python-telegram-bot[webhooks,rate-limiter]==21.0.1 \ No newline at end of file diff --git a/utils/chatgpt2api.py b/utils/chatgpt2api.py deleted file mode 100644 index 460eb600..00000000 --- a/utils/chatgpt2api.py +++ /dev/null @@ -1,1463 +0,0 @@ -import os -import re -import json -import copy -from pathlib import Path -from typing import AsyncGenerator - -import httpx -import requests -import tiktoken - -from . import typings as t -from typing import Set - -import config -from utils.plugins import * -from utils.function_call import function_call_list, claude_tools_list - -def get_filtered_keys_from_object(obj: object, *keys: str) -> Set[str]: - """ - Get filtered list of object variable names. - :param keys: List of keys to include. If the first key is "not", the remaining keys will be removed from the class keys. - :return: List of class keys. - """ - class_keys = obj.__dict__.keys() - if not keys: - return set(class_keys) - - # Remove the passed keys from the class keys. - if keys[0] == "not": - return {key for key in class_keys if key not in keys[1:]} - # Check if all passed keys are valid - if invalid_keys := set(keys) - class_keys: - raise ValueError( - f"Invalid keys: {invalid_keys}", - ) - # Only return specified keys that are in class_keys - return {key for key in keys if key in class_keys} - -ENGINES = [ - "gpt-3.5-turbo", - "gpt-3.5-turbo-16k", - "gpt-3.5-turbo-0301", - "gpt-3.5-turbo-0613", - "gpt-3.5-turbo-1106", - "gpt-3.5-turbo-16k-0613", - "gpt-4", - "gpt-4-0314", - "gpt-4-32k", - "gpt-4-32k-0314", - "gpt-4-0613", - "gpt-4-32k-0613", - "gpt-4-1106-preview", - "gpt-4-0125-preview", - "gpt-4-turbo-preview", - "gpt-4-turbo-2024-04-09", - "mixtral-8x7b-32768", - "llama2-70b-4096", - "llama3-70b-8192", - "claude-2.1", - "claude-3-sonnet-20240229", - "claude-3-haiku-20240307", - "claude-3-opus-20240229", - "gemini-1.5-pro-latest", -] -if config.CUSTOM_MODELS_LIST: - ENGINES.extend(config.CUSTOM_MODELS_LIST) - -class claudeConversation(dict): - def Conversation(self, index): - conversation_list = super().__getitem__(index) - return "\n\n" + "\n\n".join([f"{item['role']}:{item['content']}" for item in conversation_list]) + "\n\nAssistant:" - - -class claudebot: - def __init__( - self, - api_key: str, - engine: str = os.environ.get("GPT_ENGINE") or "claude-2.1", - temperature: float = 0.5, - top_p: float = 0.7, - chat_url: str = "https://api.anthropic.com/v1/complete", - timeout: float = 20, - system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally", - **kwargs, - ): - self.api_key: str = api_key - self.engine: str = engine - self.temperature = temperature - self.top_p = top_p - self.chat_url = chat_url - self.timeout = timeout - self.session = requests.Session() - self.conversation = claudeConversation() - self.system_prompt = system_prompt - - def add_to_conversation( - self, - message: str, - role: str, - convo_id: str = "default", - pass_history: bool = True, - ) -> None: - """ - Add a message to the conversation - """ - - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.conversation[convo_id].append({"role": role, "content": message}) - - def reset(self, convo_id: str = "default", system_prompt: str = None) -> None: - """ - Reset the conversation - """ - self.conversation[convo_id] = list() - - def __truncate_conversation(self, convo_id: str = "default") -> None: - """ - Truncate the conversation - """ - while True: - if ( - self.get_token_count(convo_id) > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - self.conversation[convo_id].pop(1) - else: - break - - def get_token_count(self, convo_id: str = "default") -> int: - """ - Get token count - """ - if self.engine not in ENGINES: - raise NotImplementedError( - f"Engine {self.engine} is not supported. Select from {ENGINES}", - ) - tiktoken.model.MODEL_TO_ENCODING["claude-2.1"] = "cl100k_base" - encoding = tiktoken.encoding_for_model(self.engine) - - num_tokens = 0 - for message in self.conversation[convo_id]: - # every message follows {role/name}\n{content}\n - num_tokens += 5 - for key, value in message.items(): - if value: - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens += 5 # role is always required and always 1 token - num_tokens += 5 # every reply is primed with assistant - return num_tokens - - def ask_stream( - self, - prompt: str, - role: str = "Human", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - model_max_tokens: int = 4096, - **kwargs, - ): - pass_history = True - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.add_to_conversation(prompt, role, convo_id=convo_id) - # self.__truncate_conversation(convo_id=convo_id) - # print(self.conversation[convo_id]) - - url = self.chat_url - headers = { - "accept": "application/json", - "anthropic-version": "2023-06-01", - "content-type": "application/json", - "x-api-key": f"{kwargs.get('api_key', self.api_key)}", - } - - json_post = { - "model": model or self.engine, - "prompt": self.conversation.Conversation(convo_id) if pass_history else f"\n\nHuman:{prompt}\n\nAssistant:", - "stream": True, - "temperature": kwargs.get("temperature", self.temperature), - "top_p": kwargs.get("top_p", self.top_p), - "max_tokens_to_sample": model_max_tokens, - } - - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - - if response.status_code != 200: - print(response.text) - raise BaseException(f"{response.status_code} {response.reason} {response.text}") - response_role: str = "Assistant" - full_response: str = "" - for line in response.iter_lines(): - if not line or line.decode("utf-8") == "event: completion" or line.decode("utf-8") == "event: ping" or line.decode("utf-8") == "data: {}": - continue - line = line.decode("utf-8")[6:] - # print(line) - resp: dict = json.loads(line) - content = resp.get("completion") - if content: - full_response += content - yield content - self.add_to_conversation(full_response, response_role, convo_id=convo_id) - # print(repr(self.conversation.Conversation(convo_id))) - # print("total tokens:", self.get_token_count(convo_id)) - -class claude3bot: - def __init__( - self, - api_key: str, - engine: str = os.environ.get("GPT_ENGINE") or "claude-3-opus-20240229", - temperature: float = 0.5, - top_p: float = 0.7, - chat_url: str = "https://api.anthropic.com/v1/messages", - timeout: float = 20, - system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally", - **kwargs, - ): - self.api_key: str = api_key - self.engine: str = engine - self.temperature = temperature - self.top_p = top_p - self.chat_url = chat_url - self.timeout = timeout - self.session = requests.Session() - self.conversation: dict[str, list[dict]] = { - "default": [], - } - self.system_prompt = system_prompt - - def add_to_conversation( - self, - message: str, - role: str, - convo_id: str = "default", - pass_history: bool = True, - ) -> None: - """ - Add a message to the conversation - """ - - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - # print("message", message) - self.conversation[convo_id].append({"role": role, "content": message}) - index = len(self.conversation[convo_id]) - 2 - if index >= 0 and self.conversation[convo_id][index]["role"] == self.conversation[convo_id][index + 1]["role"]: - self.conversation[convo_id][index]["content"] += self.conversation[convo_id][index + 1]["content"] - self.conversation[convo_id].pop(index + 1) - - def reset(self, convo_id: str = "default", system_prompt: str = None) -> None: - """ - Reset the conversation - """ - self.conversation[convo_id] = list() - - def __truncate_conversation(self, convo_id: str = "default") -> None: - """ - Truncate the conversation - """ - while True: - if ( - self.get_token_count(convo_id) > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - self.conversation[convo_id].pop(1) - else: - break - - def get_token_count(self, convo_id: str = "default") -> int: - """ - Get token count - """ - if self.engine not in ENGINES: - raise NotImplementedError( - f"Engine {self.engine} is not supported. Select from {ENGINES}", - ) - tiktoken.model.MODEL_TO_ENCODING["claude-2.1"] = "cl100k_base" - encoding = tiktoken.encoding_for_model(self.engine) - - num_tokens = 0 - for message in self.conversation[convo_id]: - # every message follows {role/name}\n{content}\n - num_tokens += 5 - for key, value in message.items(): - if value: - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens += 5 # role is always required and always 1 token - num_tokens += 5 # every reply is primed with assistant - return num_tokens - - def ask_stream( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - model_max_tokens: int = 4096, - **kwargs, - ): - pass_history = True - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.add_to_conversation(prompt, role, convo_id=convo_id) - # self.__truncate_conversation(convo_id=convo_id) - # print(self.conversation[convo_id]) - - url = self.chat_url - headers = { - "x-api-key": f"{kwargs.get('api_key', self.api_key)}", - "anthropic-version": "2023-06-01", - "content-type": "application/json", - # "anthropic-beta": "tools-2024-04-04" - } - - json_post = { - "model": model or self.engine, - "messages": self.conversation[convo_id] if pass_history else [{ - "role": "user", - "content": prompt - }], - "temperature": kwargs.get("temperature", self.temperature), - "top_p": kwargs.get("top_p", self.top_p), - "max_tokens": model_max_tokens, - "stream": True, - } - if self.system_prompt: - json_post["system"] = self.system_prompt - - print(json.dumps(json_post, indent=4, ensure_ascii=False)) - - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - - if response.status_code != 200: - print(response.text) - raise BaseException(f"{response.status_code} {response.reason} {response.text}") - response_role: str = "assistant" - full_response: str = "" - for line in response.iter_lines(): - if not line or line.decode("utf-8")[:6] == "event:" or line.decode("utf-8") == "data: {}": - continue - # print(line.decode("utf-8")) - # if "tool_use" in line.decode("utf-8"): - # tool_input = json.loads(line.decode("utf-8")["content"][1]["input"]) - # else: - # line = line.decode("utf-8")[6:] - line = line.decode("utf-8")[6:] - # print(line) - resp: dict = json.loads(line) - delta = resp.get("delta") - if not delta: - continue - if "text" in delta: - content = delta["text"] - full_response += content - yield content - self.add_to_conversation(full_response, response_role, convo_id=convo_id) - # print(repr(self.conversation.Conversation(convo_id))) - # print("total tokens:", self.get_token_count(convo_id)) - -class Imagebot: - def __init__( - self, - api_key: str, - timeout: float = 20, - ): - self.api_key: str = api_key - self.engine: str = "dall-e-3" - self.session = requests.Session() - self.timeout: float = timeout - - def dall_e_3( - self, - prompt: str, - model: str = None, - **kwargs, - ): - url = config.bot_api_url.image_url - headers = {"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"} - - json_post = { - "model": os.environ.get("IMAGE_MODEL_NAME") or model or self.engine, - "prompt": prompt, - "n": 1, - "size": "1024x1024", - } - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - - if response.status_code != 200: - raise t.APIConnectionError( - f"{response.status_code} {response.reason} {response.text}", - ) - json_data = json.loads(response.text) - url = json_data["data"][0]["url"] - yield url - -class Chatbot: - """ - Official ChatGPT API - """ - - def __init__( - self, - api_key: str, - engine: str = os.environ.get("GPT_ENGINE") or "gpt-3.5-turbo", - proxy: str = None, - timeout: float = 600, - max_tokens: int = None, - temperature: float = 0.5, - top_p: float = 1.0, - presence_penalty: float = 0.0, - frequency_penalty: float = 0.0, - reply_count: int = 1, - truncate_limit: int = None, - system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally", - ) -> None: - """ - Initialize Chatbot with API key (from https://platform.openai.com/account/api-keys) - """ - self.engine: str = engine - self.api_key: str = api_key - self.system_prompt: str = system_prompt - self.max_tokens: int = max_tokens or ( - 4096 - if "gpt-4-1106-preview" in engine or "gpt-4-0125-preview" in engine or "gpt-4-turbo" in engine or "gpt-3.5-turbo-1106" in engine or "claude" in engine - else 31000 - if "gpt-4-32k" in engine - else 7000 - if "gpt-4" in engine - else 16385 - if "gpt-3.5-turbo-16k" in engine - # else 99000 - # if "claude-2.1" in engine - else 4000 - ) - # context max tokens - self.truncate_limit: int = truncate_limit or ( - 32000 - # 126500 Control the number of search characters to prevent excessive spending - if "gpt-4-1106-preview" in engine or "gpt-4-0125-preview" in engine or "gpt-4-turbo" in engine - else 30500 - if "gpt-4-32k" in engine - else 6500 - if "gpt-4" in engine - else 14500 - if "gpt-3.5-turbo-16k" in engine or "gpt-3.5-turbo-1106" in engine - else 98500 - if "claude-2.1" in engine - else 3500 - ) - self.temperature: float = temperature - self.top_p: float = top_p - self.presence_penalty: float = presence_penalty - self.frequency_penalty: float = frequency_penalty - self.reply_count: int = reply_count - self.timeout: float = timeout - self.proxy = proxy - self.session = requests.Session() - self.session.proxies.update( - { - "http": proxy, - "https": proxy, - }, - ) - if proxy := ( - proxy or os.environ.get("all_proxy") or os.environ.get("ALL_PROXY") or None - ): - if "socks5h" not in proxy: - self.aclient = httpx.AsyncClient( - follow_redirects=True, - proxies=proxy, - timeout=timeout, - ) - else: - self.aclient = httpx.AsyncClient( - follow_redirects=True, - proxies=proxy, - timeout=timeout, - ) - - self.conversation: dict[str, list[dict]] = { - "default": [ - { - "role": "system", - "content": system_prompt, - }, - ], - } - self.function_calls_counter = {} - self.function_call_max_loop = 10 - # self.encode_web_text_list = [] - - if self.get_token_count("default") > self.max_tokens: - raise t.ActionRefuseError("System prompt is too long") - - def add_to_conversation( - self, - message: list, - role: str, - convo_id: str = "default", - function_name: str = "", - ) -> None: - """ - Add a message to the conversation - """ - if convo_id not in self.conversation: - self.reset(convo_id=convo_id) - if function_name == "" and message and message != None: - self.conversation[convo_id].append({"role": role, "content": message}) - elif function_name != "" and message and message != None: - self.conversation[convo_id].append({"role": role, "name": function_name, "content": message}) - else: - print('\033[31m') - print("error: add_to_conversation message is None or empty") - print("role", role, "function_name", function_name, "message", message) - print('\033[0m') - - def __truncate_conversation(self, convo_id: str = "default") -> None: - """ - Truncate the conversation - """ - while True: - if ( - self.get_token_count(convo_id) > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - mess = self.conversation[convo_id].pop(1) - print("Truncate message:", mess) - else: - break - - def truncate_conversation( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - **kwargs, - ) -> None: - """ - Truncate the conversation - """ - while True: - json_post = self.get_post_body(prompt, role, convo_id, model, pass_history, **kwargs) - url = config.bot_api_url.chat_url - if "gpt-4" in self.engine or "claude" in self.engine or (config.CUSTOM_MODELS and self.engine in config.CUSTOM_MODELS): - message_token = { - "total": self.get_token_count(convo_id), - } - else: - message_token = self.get_message_token(url, json_post) - print("message_token", message_token, "truncate_limit", self.truncate_limit) - if ( - message_token["total"] > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - mess = self.conversation[convo_id].pop(1) - print("Truncate message:", mess) - else: - break - return json_post, message_token - - def extract_values(self, obj): - if isinstance(obj, dict): - for value in obj.values(): - yield from self.extract_values(value) - elif isinstance(obj, list): - for item in obj: - yield from self.extract_values(item) - else: - yield obj - # def clear_function_call(self, convo_id: str = "default"): - # self.conversation[convo_id] = [item for item in self.conversation[convo_id] if '@Trash@' not in item['content']] - # function_call_items = [item for item in self.conversation[convo_id] if 'function' in item['role']] - # function_call_num = len(function_call_items) - # if function_call_num > 50: - # for i in range(function_call_num - 25): - # self.conversation[convo_id].remove(function_call_items[i]) - - # https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb - def get_token_count(self, convo_id: str = "default") -> int: - """ - Get token count - """ - if self.engine not in ENGINES: - raise NotImplementedError( - f"Engine {self.engine} is not supported. Select from {ENGINES}", - ) - encoding = tiktoken.get_encoding("cl100k_base") - - num_tokens = 0 - for message in self.conversation[convo_id]: - # every message follows {role/name}\n{content}\n - num_tokens += 5 - for key, value in message.items(): - values = list(self.extract_values(value)) - if "image_url" in values: - continue - if values: - for value in values: - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens += 5 # role is always required and always 1 token - num_tokens += 5 # every reply is primed with assistant - return num_tokens - - def get_message_token(self, url, json_post): - json_post["max_tokens"] = 17000 - headers = {"Authorization": f"Bearer {os.environ.get('API', None)}"} - response = requests.Session().post( - url, - headers=headers, - json=json_post, - timeout=None, - ) - if response.status_code != 200: - print(response.text) - json_response = json.loads(response.text) - string = json_response["error"]["message"] - # print(json_response) - try: - string = re.findall(r"\((.*?)\)", string)[0] - except: - if "You exceeded your current quota" in json_response: - raise Exception("当前账号余额不足!") - numbers = re.findall(r"\d+\.?\d*", string) - numbers = [int(i) for i in numbers] - if len(numbers) == 2: - return { - "messages": numbers[0], - "total": numbers[0], - } - elif len(numbers) == 3: - return { - "messages": numbers[0], - "functions": numbers[1], - "total": numbers[0] + numbers[1], - } - else: - raise Exception(json_post, json_response) - # print("response.text", response.text) - return { - "messages": 0, - "total": 0, - } - - - def get_post_body( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - **kwargs, - ): - json_post_body = { - "model": model or self.engine, - "messages": self.conversation[convo_id] if pass_history else [{"role": "system","content": self.system_prompt},{"role": role, "content": prompt}], - "max_tokens": 5000, - "stream": True, - } - body = { - # kwargs - "temperature": kwargs.get("temperature", self.temperature), - "top_p": kwargs.get("top_p", self.top_p), - "presence_penalty": kwargs.get( - "presence_penalty", - self.presence_penalty, - ), - "frequency_penalty": kwargs.get( - "frequency_penalty", - self.frequency_penalty, - ), - "n": kwargs.get("n", self.reply_count), - "user": role, - } - if config.CUSTOM_MODELS and self.engine in config.CUSTOM_MODELS and "gpt-" not in self.engine and "claude-3" not in self.engine: - return json_post_body - json_post_body.update(copy.deepcopy(body)) - json_post_body.update(copy.deepcopy(function_call_list["base"])) - for item in config.PLUGINS.keys(): - try: - if config.PLUGINS[item]: - json_post_body["functions"].append(function_call_list[item]) - except: - pass - - return json_post_body - - def get_max_tokens(self, convo_id: str) -> int: - """ - Get max tokens - """ - # print("self.max_tokens, self.get_token_count(convo_id)", self.max_tokens, self.get_token_count(convo_id)) - return self.max_tokens - self.get_token_count(convo_id) - - def ask_stream( - self, - prompt: list, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - function_name: str = "", - **kwargs, - ): - """ - Ask a question - """ - # Make conversation if it doesn't exist - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id, system_prompt=self.system_prompt) - self.add_to_conversation(prompt, role, convo_id=convo_id, function_name=function_name) - json_post, message_token = self.truncate_conversation(prompt, role, convo_id, model, pass_history, **kwargs) - print(json.dumps(json_post, indent=4, ensure_ascii=False)) - # print(self.conversation[convo_id]) - - if self.engine == "gpt-4-1106-preview" or "gpt-4-0125-preview" in self.engine or "gpt-4-turbo" in self.engine or "claude" in self.engine: - model_max_tokens = kwargs.get("max_tokens", self.max_tokens) - elif self.engine == "gpt-3.5-turbo-1106": - model_max_tokens = min(kwargs.get("max_tokens", self.max_tokens), 16385 - message_token["total"]) - else: - model_max_tokens = min(kwargs.get("max_tokens", self.max_tokens), self.max_tokens - message_token["total"]) - print("model_max_tokens", model_max_tokens) - json_post["max_tokens"] = model_max_tokens - - url = config.bot_api_url.chat_url - headers = {"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"} - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - if response.status_code != 200: - raise t.APIConnectionError( - f"{response.status_code} {response.reason} {response.text}", - ) - response_role: str = None - full_response: str = "" - function_full_response: str = "" - function_call_name: str = "" - need_function_call: bool = False - for line in response.iter_lines(): - if not line: - continue - # Remove "data: " - if line.decode("utf-8")[:6] == "data: ": - line = line.decode("utf-8")[6:] - else: - print(line.decode("utf-8")) - full_response = json.loads(line.decode("utf-8"))["choices"][0]["message"]["content"] - yield full_response - break - if line == "[DONE]": - break - resp: dict = json.loads(line) - # print("resp", resp) - choices = resp.get("choices") - if not choices: - continue - delta = choices[0].get("delta") - if not delta: - continue - if "role" in delta and response_role == None: - response_role = delta["role"] - if "content" in delta and delta["content"]: - need_function_call = False - content = delta["content"] - full_response += content - yield content - if "function_call" in delta: - need_function_call = True - function_call_content = delta["function_call"]["arguments"] - if "name" in delta["function_call"]: - function_call_name = delta["function_call"]["name"] - function_full_response += function_call_content - if function_full_response.count("\\n") > 2 or "}" in function_full_response: - break - if need_function_call: - function_full_response = check_json(function_full_response) - print("function_full_response", function_full_response) - function_response = "" - if not self.function_calls_counter.get(function_call_name): - self.function_calls_counter[function_call_name] = 1 - else: - self.function_calls_counter[function_call_name] += 1 - if self.function_calls_counter[function_call_name] <= self.function_call_max_loop: - function_call_max_tokens = self.truncate_limit - message_token["total"] - 1000 - if function_call_max_tokens <= 0: - function_call_max_tokens = int(self.truncate_limit / 2) - print("\033[32m function_call", function_call_name, "max token:", function_call_max_tokens, "\033[0m") - if function_call_name == "get_search_results": - prompt = json.loads(function_full_response)["prompt"] - function_response = yield from eval(function_call_name)(prompt) - function_response, text_len = cut_message(function_response, function_call_max_tokens) - function_response = ( - f"You need to response the following question: {prompt}. Search results is provided inside XML tags. Your task is to think about the question step by step and then answer the above question in {config.LANGUAGE} based on the Search results provided. Please response in {config.LANGUAGE} and adopt a style that is logical, in-depth, and detailed. Note: In order to make the answer appear highly professional, you should be an expert in textual analysis, aiming to make the answer precise and comprehensive. Directly response markdown format, without using markdown code blocks" - "Here is the Search results, inside XML tags:" - "" - "{}" - "" - ).format(function_response) - # user_prompt = f"You need to response the following question: {prompt}. Search results is provided inside XML tags. Your task is to think about the question step by step and then answer the above question in {config.LANGUAGE} based on the Search results provided. Please response in {config.LANGUAGE} and adopt a style that is logical, in-depth, and detailed. Note: In order to make the answer appear highly professional, you should be an expert in textual analysis, aiming to make the answer precise and comprehensive. Directly response markdown format, without using markdown code blocks" - # self.add_to_conversation(user_prompt, "user", convo_id=convo_id) - if function_call_name == "get_url_content": - url = json.loads(function_full_response)["url"] - print("\n\nurl", url) - # function_response = jina_ai_Web_crawler(url) - function_response = Web_crawler(url) - function_response, text_len = cut_message(function_response, function_call_max_tokens) - function_response = ( - "Here is the documentation, inside XML tags:" - "" - "{}" - "" - ).format(function_response) - if function_call_name == "get_date_time_weekday": - function_response = eval(function_call_name)() - function_response, text_len = cut_message(function_response, function_call_max_tokens) - if function_call_name == "get_version_info": - function_response = eval(function_call_name)() - function_response, text_len = cut_message(function_response, function_call_max_tokens) - else: - function_response = "抱歉,直接告诉用户,无法找到相关信息" - response_role = "function" - # print(self.conversation[convo_id][-1]) - if self.conversation[convo_id][-1]["role"] == "function" and self.conversation[convo_id][-1]["name"] == "get_search_results": - mess = self.conversation[convo_id].pop(-1) - # print("Truncate message:", mess) - yield from self.ask_stream(function_response, response_role, convo_id=convo_id, function_name=function_call_name) - else: - if self.conversation[convo_id][-1]["role"] == "function" and self.conversation[convo_id][-1]["name"] == "get_search_results": - mess = self.conversation[convo_id].pop(-1) - self.add_to_conversation(full_response, response_role, convo_id=convo_id) - self.function_calls_counter = {} - # self.clear_function_call(convo_id=convo_id) - # self.encode_web_text_list = [] - # total_tokens = self.get_token_count(convo_id) - - async def ask_stream_async( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - **kwargs, - ) -> AsyncGenerator[str, None]: - """ - Ask a question - """ - # Make conversation if it doesn't exist - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id, system_prompt=self.system_prompt) - self.add_to_conversation(prompt, "user", convo_id=convo_id) - self.__truncate_conversation(convo_id=convo_id) - if self.engine == "gpt-4-1106-preview" or "gpt-4-0125-preview" in self.engine or "gpt-4-turbo" in self.engine: - model_max_tokens = kwargs.get("max_tokens", self.max_tokens) - else: - model_max_tokens = min(self.get_max_tokens(convo_id=convo_id) - 500, kwargs.get("max_tokens", self.max_tokens)) - # Get response - async with self.aclient.stream( - "post", - config.bot_api_url.chat_url, - headers={"Authorization": f"Bearer {kwargs.get('api_key', self.api_key)}"}, - json={ - "model": model or self.engine, - "messages": self.conversation[convo_id] if pass_history else [{"role": "system","content": self.system_prompt},{"role": role, "content": prompt}], - "stream": True, - # kwargs - "temperature": kwargs.get("temperature", self.temperature), - "top_p": kwargs.get("top_p", self.top_p), - "presence_penalty": kwargs.get( - "presence_penalty", - self.presence_penalty, - ), - "frequency_penalty": kwargs.get( - "frequency_penalty", - self.frequency_penalty, - ), - "n": kwargs.get("n", self.reply_count), - "user": role, - "max_tokens": model_max_tokens, - # "max_tokens": min( - # self.get_max_tokens(convo_id=convo_id), - # kwargs.get("max_tokens", self.max_tokens), - # ), - }, - timeout=kwargs.get("timeout", self.timeout), - ) as response: - if response.status_code != 200: - await response.aread() - raise t.APIConnectionError( - f"{response.status_code} {response.reason_phrase} {response.text}", - ) - - response_role: str = "" - full_response: str = "" - async for line in response.aiter_lines(): - line = line.strip() - if not line: - continue - # Remove "data: " - line = line[6:] - if line == "[DONE]": - break - resp: dict = json.loads(line) - if "error" in resp: - raise t.ResponseError(f"{resp['error']}") - choices = resp.get("choices") - if not choices: - continue - delta: dict[str, str] = choices[0].get("delta") - if not delta: - continue - if "role" in delta: - response_role = delta["role"] - if "content" in delta: - content: str = delta["content"] - full_response += content - yield content - self.add_to_conversation(full_response, response_role, convo_id=convo_id) - print("total tokens:", self.get_token_count(convo_id)) - - async def ask_async( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - **kwargs, - ) -> str: - """ - Non-streaming ask - """ - response = self.ask_stream_async( - prompt=prompt, - role=role, - convo_id=convo_id, - **kwargs, - ) - full_response: str = "".join([r async for r in response]) - return full_response - - def ask( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - **kwargs, - ) -> str: - """ - Non-streaming ask - """ - response = self.ask_stream( - prompt=prompt, - role=role, - convo_id=convo_id, - model=model, - pass_history=pass_history, - **kwargs, - ) - full_response: str = "".join(response) - return full_response - - def rollback(self, n: int = 1, convo_id: str = "default") -> None: - """ - Rollback the conversation - """ - for _ in range(n): - self.conversation[convo_id].pop() - - def reset(self, convo_id: str = "default", system_prompt: str = None) -> None: - """ - Reset the conversation - """ - self.conversation[convo_id] = [ - {"role": "system", "content": system_prompt or self.system_prompt}, - ] - - def save(self, file: str, *keys: str) -> None: - """ - Save the Chatbot configuration to a JSON file - """ - with open(file, "w", encoding="utf-8") as f: - data = { - key: self.__dict__[key] - for key in get_filtered_keys_from_object(self, *keys) - } - # saves session.proxies dict as session - # leave this here for compatibility - data["session"] = data["proxy"] - del data["aclient"] - json.dump( - data, - f, - indent=2, - ) - - def load(self, file: Path, *keys_: str) -> None: - """ - Load the Chatbot configuration from a JSON file - """ - with open(file, encoding="utf-8") as f: - # load json, if session is in keys, load proxies - loaded_config = json.load(f) - keys = get_filtered_keys_from_object(self, *keys_) - - if ( - "session" in keys - and loaded_config["session"] - or "proxy" in keys - and loaded_config["proxy"] - ): - self.proxy = loaded_config.get("session", loaded_config["proxy"]) - self.session = httpx.Client( - follow_redirects=True, - proxies=self.proxy, - timeout=self.timeout, - cookies=self.session.cookies, - headers=self.session.headers, - ) - self.aclient = httpx.AsyncClient( - follow_redirects=True, - proxies=self.proxy, - timeout=self.timeout, - cookies=self.session.cookies, - headers=self.session.headers, - ) - if "session" in keys: - keys.remove("session") - if "aclient" in keys: - keys.remove("aclient") - self.__dict__.update({key: loaded_config[key] for key in keys}) - - -class groqbot: - def __init__( - self, - api_key: str, - engine: str = os.environ.get("GPT_ENGINE") or "llama3-70b-8192", - temperature: float = 0.5, - top_p: float = 1, - chat_url: str = "https://api.groq.com/openai/v1/chat/completions", - timeout: float = 20, - system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally", - **kwargs, - ): - self.api_key: str = api_key - self.engine: str = engine - self.temperature = temperature - self.top_p = top_p - self.chat_url = chat_url - self.timeout = timeout - self.session = requests.Session() - self.conversation: dict[str, list[dict]] = { - "default": [ - { - "role": "system", - "content": system_prompt, - }, - ], - } - self.system_prompt = system_prompt - - def add_to_conversation( - self, - message: str, - role: str, - convo_id: str = "default", - pass_history: bool = True, - ) -> None: - """ - Add a message to the conversation - """ - - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.conversation[convo_id].append({"role": role, "content": message}) - - def reset(self, convo_id: str = "default", system_prompt: str = None) -> None: - """ - Reset the conversation - """ - self.conversation[convo_id] = list() - - def __truncate_conversation(self, convo_id: str = "default") -> None: - """ - Truncate the conversation - """ - while True: - if ( - self.get_token_count(convo_id) > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - self.conversation[convo_id].pop(1) - else: - break - - def get_token_count(self, convo_id: str = "default") -> int: - """ - Get token count - """ - if self.engine not in ENGINES: - raise NotImplementedError( - f"Engine {self.engine} is not supported. Select from {ENGINES}", - ) - # tiktoken.model.MODEL_TO_ENCODING["mixtral-8x7b-32768"] = "cl100k_base" - encoding = tiktoken.get_encoding("cl100k_base") - - num_tokens = 0 - for message in self.conversation[convo_id]: - # every message follows {role/name}\n{content}\n - num_tokens += 5 - for key, value in message.items(): - if value: - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens += 5 # role is always required and always 1 token - num_tokens += 5 # every reply is primed with assistant - return num_tokens - - def ask_stream( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - model_max_tokens: int = 1024, - **kwargs, - ): - pass_history = True - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.add_to_conversation(prompt, role, convo_id=convo_id) - # self.__truncate_conversation(convo_id=convo_id) - # print(self.conversation[convo_id]) - - url = self.chat_url - headers = { - "Authorization": f"Bearer {kwargs.get('GROQ_API_KEY', self.api_key)}", - "Content-Type": "application/json", - } - - json_post = { - "messages": self.conversation[convo_id] if pass_history else [{ - "role": "user", - "content": prompt - }], - "model": model or self.engine, - "temperature": kwargs.get("temperature", self.temperature), - "max_tokens": model_max_tokens, - "top_p": kwargs.get("top_p", self.top_p), - "stop": None, - "stream": True, - } - # print("json_post", json_post) - # print(os.environ.get("GPT_ENGINE"), model, self.engine) - - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - - if response.status_code != 200: - print(response.text) - raise BaseException(f"{response.status_code} {response.reason} {response.text}") - response_role: str = "assistant" - full_response: str = "" - for line in response.iter_lines(): - if not line: - continue - # Remove "data: " - # print(line.decode("utf-8")) - if line.decode("utf-8")[:6] == "data: ": - line = line.decode("utf-8")[6:] - else: - print(line.decode("utf-8")) - full_response = json.loads(line.decode("utf-8"))["choices"][0]["message"]["content"] - yield full_response - break - if line == "[DONE]": - break - resp: dict = json.loads(line) - # print("resp", resp) - choices = resp.get("choices") - if not choices: - continue - delta = choices[0].get("delta") - if not delta: - continue - if "role" in delta: - response_role = delta["role"] - if "content" in delta and delta["content"]: - content = delta["content"] - full_response += content - yield content - self.add_to_conversation(full_response, response_role, convo_id=convo_id) - # print(repr(self.conversation.Conversation(convo_id))) - # print("total tokens:", self.get_token_count(convo_id)) - -class gemini_bot: - def __init__( - self, - api_key: str, - engine: str = os.environ.get("GPT_ENGINE") or "gemini-1.5-pro-latest", - temperature: float = 0.5, - top_p: float = 0.7, - chat_url: str = "https://generativelanguage.googleapis.com/v1beta/models/{model}:{stream}?key={api_key}", - timeout: float = 20, - system_prompt: str = "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally", - **kwargs, - ): - self.api_key: str = api_key - self.engine: str = engine - self.temperature = temperature - self.top_p = top_p - self.chat_url = chat_url - self.timeout = timeout - self.session = requests.Session() - self.conversation: dict[str, list[dict]] = { - "default": [], - } - self.system_prompt = system_prompt - - def add_to_conversation( - self, - message: str, - role: str, - convo_id: str = "default", - pass_history: bool = True, - ) -> None: - """ - Add a message to the conversation - """ - - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - # print("message", message) - self.conversation[convo_id].append({"role": role, "parts": [{"text": message}]}) - # index = len(self.conversation[convo_id]) - 2 - # if index >= 0 and self.conversation[convo_id][index]["role"] == self.conversation[convo_id][index + 1]["role"]: - # self.conversation[convo_id][index]["content"] += self.conversation[convo_id][index + 1]["content"] - # self.conversation[convo_id].pop(index + 1) - - def reset(self, convo_id: str = "default", system_prompt: str = None) -> None: - """ - Reset the conversation - """ - self.conversation[convo_id] = list() - - def __truncate_conversation(self, convo_id: str = "default") -> None: - """ - Truncate the conversation - """ - while True: - if ( - self.get_token_count(convo_id) > self.truncate_limit - and len(self.conversation[convo_id]) > 1 - ): - # Don't remove the first message - self.conversation[convo_id].pop(1) - else: - break - - def get_token_count(self, convo_id: str = "default") -> int: - """ - Get token count - """ - if self.engine not in ENGINES: - raise NotImplementedError( - f"Engine {self.engine} is not supported. Select from {ENGINES}", - ) - encoding = tiktoken.get_encoding("cl100k_base") - - num_tokens = 0 - for message in self.conversation[convo_id]: - # every message follows {role/name}\n{content}\n - num_tokens += 5 - for key, value in message.items(): - if value: - num_tokens += len(encoding.encode(value)) - if key == "name": # if there's a name, the role is omitted - num_tokens += 5 # role is always required and always 1 token - num_tokens += 5 # every reply is primed with assistant - return num_tokens - - def ask_stream( - self, - prompt: str, - role: str = "user", - convo_id: str = "default", - model: str = None, - pass_history: bool = True, - model_max_tokens: int = 4096, - **kwargs, - ): - pass_history = True - if convo_id not in self.conversation or pass_history == False: - self.reset(convo_id=convo_id) - self.add_to_conversation(prompt, role, convo_id=convo_id) - # self.__truncate_conversation(convo_id=convo_id) - # print(self.conversation[convo_id]) - - headers = { - "Content-Type": "application/json", - } - - json_post = { - "contents": self.conversation[convo_id] if pass_history else [{ - "role": "user", - "content": prompt - }], - "safetySettings": [ - { - "category": "HARM_CATEGORY_HARASSMENT", - "threshold": "BLOCK_NONE" - }, - { - "category": "HARM_CATEGORY_HATE_SPEECH", - "threshold": "BLOCK_NONE" - }, - { - "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", - "threshold": "BLOCK_NONE" - }, - { - "category": "HARM_CATEGORY_DANGEROUS_CONTENT", - "threshold": "BLOCK_NONE" - } - ], - } - print(json.dumps(json_post, indent=4, ensure_ascii=False)) - - url = self.chat_url.format(model=model or self.engine, stream="streamGenerateContent", api_key=self.api_key) - - try: - response = self.session.post( - url, - headers=headers, - json=json_post, - timeout=kwargs.get("timeout", self.timeout), - stream=True, - ) - except ConnectionError: - print("连接错误,请检查服务器状态或网络连接。") - return - except requests.exceptions.ReadTimeout: - print("请求超时,请检查网络连接或增加超时时间。{e}") - return - except Exception as e: - print(f"发生了未预料的错误: {e}") - return - - if response.status_code != 200: - print(response.text) - raise BaseException(f"{response.status_code} {response.reason} {response.text}") - response_role: str = "model" - full_response: str = "" - try: - for line in response.iter_lines(): - if not line: - continue - line = line.decode("utf-8") - if line and '\"text\": \"' in line: - content = line.split('\"text\": \"')[1][:-1] - content = "\n".join(content.split("\\n")) - full_response += content - yield content - except requests.exceptions.ChunkedEncodingError as e: - print("Chunked Encoding Error occurred:", e) - except Exception as e: - print("An error occurred:", e) - - self.add_to_conversation(full_response, response_role, convo_id=convo_id) \ No newline at end of file diff --git a/utils/function_call.py b/utils/function_call.py deleted file mode 100644 index 0ac2bee0..00000000 --- a/utils/function_call.py +++ /dev/null @@ -1,95 +0,0 @@ -function_call_list = \ -{ - "base": { - "functions": [], - "function_call": "auto" - }, - "current_weather": { - "name": "get_current_weather", - "description": "Get the current weather in a given location", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA" - }, - "unit": { - "type": "string", - "enum": [ - "celsius", - "fahrenheit" - ] - } - }, - "required": [ - "location" - ] - } - }, - "SEARCH_USE_GPT": { - "name": "get_search_results", - "description": "Search Google to enhance knowledge.", - "parameters": { - "type": "object", - "properties": { - "prompt": { - "type": "string", - "description": "The prompt to search." - } - }, - "required": [ - "prompt" - ] - } - }, - "URL": { - "name": "get_url_content", - "description": "Get the webpage content of a URL", - "parameters": { - "type": "object", - "properties": { - "url": { - "type": "string", - "description": "the URL to request" - } - }, - "required": [ - "url" - ] - } - }, - "DATE": { - "name": "get_date_time_weekday", - "description": "Get the current time, date, and day of the week", - "parameters": { - "type": "object", - "properties": {} - } - }, - "VERSION": { - "name": "get_version_info", - "description": "Get version information", - "parameters": { - "type": "object", - "properties": {} - } - }, -} -def gpt2claude_tools_json(json_dict): - import copy - json_dict = copy.deepcopy(json_dict) - keys_to_change = { - "parameters": "input_schema", - "functions": "tools", - "function_call": None # 如果没有新的键名,则设置为None或留空 - } - for old_key, new_key in keys_to_change.items(): - if old_key in json_dict: - if new_key: - json_dict[new_key] = json_dict.pop(old_key) - else: - json_dict.pop(old_key) - return json_dict - -claude_tools_list = {f"{key}": gpt2claude_tools_json(function_call_list[key]) for key in function_call_list.keys()} diff --git a/utils/googlesearch.py b/utils/googlesearch.py deleted file mode 100644 index b87471db..00000000 --- a/utils/googlesearch.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Util that calls Google Search.""" -from typing import Any, Dict, List, Optional - -from langchain.pydantic_v1 import BaseModel, Extra, root_validator -from langchain.utils import get_from_dict_or_env - - -class GoogleSearchAPIWrapper(BaseModel): - """Wrapper for Google Search API. - - Adapted from: Instructions adapted from https://stackoverflow.com/questions/ - 37083058/ - programmatically-searching-google-in-python-using-custom-search - - TODO: DOCS for using it - 1. Install google-api-python-client - - If you don't already have a Google account, sign up. - - If you have never created a Google APIs Console project, - read the Managing Projects page and create a project in the Google API Console. - - Install the library using pip install google-api-python-client - The current version of the library is 2.70.0 at this time - - 2. To create an API key: - - Navigate to the APIs & Services→Credentials panel in Cloud Console. - - Select Create credentials, then select API key from the drop-down menu. - - The API key created dialog box displays your newly created key. - - You now have an API_KEY - - 3. Setup Custom Search Engine so you can search the entire web - - Create a custom search engine in this link. - - In Sites to search, add any valid URL (i.e. www.stackoverflow.com). - - That’s all you have to fill up, the rest doesn’t matter. - In the left-side menu, click Edit search engine → {your search engine name} - → Setup Set Search the entire web to ON. Remove the URL you added from - the list of Sites to search. - - Under Search engine ID you’ll find the search-engine-ID. - - 4. Enable the Custom Search API - - Navigate to the APIs & Services→Dashboard panel in Cloud Console. - - Click Enable APIs and Services. - - Search for Custom Search API and click on it. - - Click Enable. - URL for it: https://console.cloud.google.com/apis/library/customsearch.googleapis - .com - """ - - search_engine: Any #: :meta private: - google_api_key: Optional[str] = None - google_cse_id: Optional[str] = None - k: int = 10 - siterestrict: bool = False - - class Config: - """Configuration for this pydantic object.""" - - extra = Extra.forbid - - def _google_search_results(self, search_term: str, **kwargs: Any) -> List[dict]: - cse = self.search_engine.cse() - if self.siterestrict: - cse = cse.siterestrict() - res = cse.list(q=search_term, cx=self.google_cse_id, **kwargs).execute() - return res.get("items", []) - - @root_validator() - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key and python package exists in environment.""" - google_api_key = get_from_dict_or_env( - values, "google_api_key", "GOOGLE_API_KEY" - ) - values["google_api_key"] = google_api_key - - google_cse_id = get_from_dict_or_env(values, "google_cse_id", "GOOGLE_CSE_ID") - values["google_cse_id"] = google_cse_id - - try: - from googleapiclient.discovery import build - - except ImportError: - raise ImportError( - "google-api-python-client is not installed. " - "Please install it with `pip install google-api-python-client`" - ) - - service = build("customsearch", "v1", developerKey=google_api_key, static_discovery=False) - values["search_engine"] = service - - return values - - def run(self, query: str) -> str: - """Run query through GoogleSearch and parse result.""" - snippets = [] - results = self._google_search_results(query, num=self.k) - if len(results) == 0: - return "No good Google Search Result was found" - for result in results: - if "snippet" in result: - snippets.append(result["snippet"]) - - return " ".join(snippets) - - def results( - self, - query: str, - num_results: int, - search_params: Optional[Dict[str, str]] = None, - ) -> List[Dict]: - """Run query through GoogleSearch and return metadata. - - Args: - query: The query to search for. - num_results: The number of results to return. - search_params: Parameters to be passed on search - - Returns: - A list of dictionaries with the following keys: - snippet - The description of the result. - title - The title of the result. - link - The link to the result. - """ - metadata_results = [] - results = self._google_search_results( - query, num=num_results, **(search_params or {}) - ) - if len(results) == 0: - return [{"Result": "No good Google Search Result was found"}] - for result in results: - metadata_result = { - "title": result["title"], - "link": result["link"], - } - if "snippet" in result: - metadata_result["snippet"] = result["snippet"] - metadata_results.append(metadata_result) - - return metadata_results diff --git a/utils/gpt4free.py b/utils/gpt4free.py deleted file mode 100644 index 2bc78a0b..00000000 --- a/utils/gpt4free.py +++ /dev/null @@ -1,52 +0,0 @@ -import re -import g4f -import os -import sys -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import config - -GPT_ENGINE_map = { - "gpt-3.5-turbo": "gpt-3.5-turbo", - "gpt-3.5-turbo-16k": "gpt-3.5-turbo-16k", - "gpt-3.5-turbo-0301": "gpt-3.5-turbo", - "gpt-3.5-turbo-0613": "gpt-3.5-turbo-0613", - "gpt-3.5-turbo-1106": "gpt-3.5-turbo", - "gpt-3.5-turbo-16k-0613": "gpt-3.5-turbo-0613", - "gpt-4": "gpt-4", - "gpt-4-0314": "gpt-4", - "gpt-4-32k": "gpt-4-32k", - "gpt-4-32k-0314": "gpt-4", - "gpt-4-0613": "gpt-4-0613", - "gpt-4-32k-0613": "gpt-4-32k-0613", - "gpt-4-1106-preview": "gpt-4-turbo", - "gpt-4-turbo-preview": "gpt-4-turbo", - "gpt-4-0125-preview": "gpt-4-turbo", - "claude-2-web": "gpt-4", - "claude-2.1": "gpt-4", -} - -def ask_stream(message, **kwargs): - response = g4f.ChatCompletion.create( - model=GPT_ENGINE_map[config.GPT_ENGINE], - messages=[{"role": "user", "content": message}], - stream=True, - ) - for message in response: - yield message - -def bing(response): - response = re.sub(r"\[\^\d+\^\]", "", response) - if len(response.split("\n\n")) >= 2: - result = "\n\n".join(response.split("\n\n")[1:]) - return result - else: - return response - -if __name__ == "__main__": - - message = rf""" -鲁迅和周树人为什么打架 - """ - answer = "" - for result in ask_stream(message, model="gpt-4"): - print(result, end="") \ No newline at end of file diff --git a/utils/plugins.py b/utils/plugins.py deleted file mode 100644 index 05784603..00000000 --- a/utils/plugins.py +++ /dev/null @@ -1,478 +0,0 @@ -import os -import re -import json -import base64 -import datetime - -import sys -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import config -from utils.prompt import search_key_word_prompt -# import jieba - -import asyncio -import tiktoken -import requests -import threading - -import urllib.parse -import time as record_time -from bs4 import BeautifulSoup - - -from langchain.prompts import PromptTemplate -from langchain.chat_models import ChatOpenAI -from langchain.tools import DuckDuckGoSearchResults -from langchain.chains import LLMChain - -# from typing import Optional, List -# from langchain.llms.base import LLM -# import g4f -# class EducationalLLM(LLM): - -# @property -# def _llm_type(self) -> str: -# return "custom" - -# def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: -# out = g4f.ChatCompletion.create( -# model=config.GPT_ENGINE, -# messages=[{"role": "user", "content": prompt}], -# ) # -# if stop: -# stop_indexes = (out.find(s) for s in stop if s in out) -# min_stop = min(stop_indexes, default=-1) -# if min_stop > -1: -# out = out[:min_stop] -# return out - -class ThreadWithReturnValue(threading.Thread): - def run(self): - if self._target is not None: - self._return = self._target(*self._args, **self._kwargs) - - def join(self): - super().join() - return self._return - -def Web_crawler(url: str, isSearch=False) -> str: - """返回链接网址url正文内容,必须是合法的网址""" - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" - } - result = '' - try: - requests.packages.urllib3.disable_warnings() - response = requests.get(url, headers=headers, verify=False, timeout=3, stream=True) - if response.status_code == 404: - print("Page not found:", url) - return "" - # return "抱歉,网页不存在,目前无法访问该网页。@Trash@" - content_length = int(response.headers.get('Content-Length', 0)) - if content_length > 5000000: - print("Skipping large file:", url) - return result - soup = BeautifulSoup(response.text.encode(response.encoding), 'lxml', from_encoding='utf-8') - - table_contents = "" - tables = soup.find_all('table') - for table in tables: - table_contents += table.get_text() - table.decompose() - body = "".join(soup.find('body').get_text().split('\n')) - result = table_contents + body - if result == '' and not isSearch: - result = "" - # result = "抱歉,可能反爬虫策略,目前无法访问该网页。@Trash@" - if result.count("\"") > 1000: - result = "" - except Exception as e: - print('\033[31m') - print("error url", url) - print("error", e) - print('\033[0m') - # print("url content", result + "\n\n") - return result - -def jina_ai_Web_crawler(url: str, isSearch=False) -> str: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" - } - result = '' - try: - requests.packages.urllib3.disable_warnings() - url = "https://r.jina.ai/" + url - response = requests.get(url, headers=headers, verify=False, timeout=5, stream=True) - if response.status_code == 404: - print("Page not found:", url) - return "抱歉,网页不存在,目前无法访问该网页。@Trash@" - content_length = int(response.headers.get('Content-Length', 0)) - if content_length > 5000000: - print("Skipping large file:", url) - return result - soup = BeautifulSoup(response.text.encode(response.encoding), 'lxml', from_encoding='utf-8') - - table_contents = "" - tables = soup.find_all('table') - for table in tables: - table_contents += table.get_text() - table.decompose() - body = "".join(soup.find('body').get_text().split('\n')) - result = table_contents + body - if result == '' and not isSearch: - result = "抱歉,可能反爬虫策略,目前无法访问该网页。@Trash@" - if result.count("\"") > 1000: - result = "" - except Exception as e: - print('\033[31m') - print("error url", url) - print("error", e) - print('\033[0m') - # print(result + "\n\n") - return result - -def getddgsearchurl(result, numresults=4): - try: - search = DuckDuckGoSearchResults(num_results=numresults) - webresult = search.run(result) - # print("ddgwebresult", webresult) - if webresult == None: - return [] - urls = re.findall(r"(https?://\S+)\]", webresult, re.MULTILINE) - except Exception as e: - print('\033[31m') - print("duckduckgo error", e) - print('\033[0m') - urls = [] - # print("ddg urls", urls) - return urls - -from utils.googlesearch import GoogleSearchAPIWrapper -def getgooglesearchurl(result, numresults=3): - google_search = GoogleSearchAPIWrapper() - urls = [] - try: - googleresult = google_search.results(result, numresults) - # print("googleresult", googleresult) - for i in googleresult: - if "No good Google Search Result was found" in i or "google.com" in i["link"]: - continue - urls.append(i["link"]) - except Exception as e: - print('\033[31m') - print("error", e) - print('\033[0m') - if "rateLimitExceeded" in str(e): - print("Google API 每日调用频率已达上限,请明日再试!") - config.USE_GOOGLE = False - # print("google urls", urls) - return urls - -def sort_by_time(urls): - def extract_date(url): - match = re.search(r'[12]\d{3}.\d{1,2}.\d{1,2}', url) - if match is not None: - match = re.sub(r'([12]\d{3}).(\d{1,2}).(\d{1,2})', "\\1/\\2/\\3", match.group()) - print(match) - if int(match[:4]) > datetime.datetime.now().year: - match = "1000/01/01" - else: - match = "1000/01/01" - try: - return datetime.datetime.strptime(match, '%Y/%m/%d') - except: - match = "1000/01/01" - return datetime.datetime.strptime(match, '%Y/%m/%d') - - # 提取日期并创建一个包含日期和URL的元组列表 - date_url_pairs = [(extract_date(url), url) for url in urls] - - # 按日期排序 - date_url_pairs.sort(key=lambda x: x[0], reverse=True) - - # 获取排序后的URL列表 - sorted_urls = [url for _, url in date_url_pairs] - - return sorted_urls - -def get_search_url(prompt, chainllm): - urls_set = [] - - keyword_prompt = PromptTemplate( - input_variables=["source"], - template=search_key_word_prompt, - ) - key_chain = LLMChain(llm=chainllm, prompt=keyword_prompt) - keyword_google_search_thread = ThreadWithReturnValue(target=key_chain.run, args=({"source": prompt},)) - keyword_google_search_thread.start() - keywords = keyword_google_search_thread.join().split('\n')[-3:] - print("keywords", keywords) - keywords = [item.replace("三行关键词是:", "") for item in keywords if "\\x" not in item if item != ""] - - keywords = [prompt] + keywords - keywords = keywords[:3] - # if len(keywords) == 1: - # keywords = keywords * 3 - print("select keywords", keywords) - - # # seg_list = jieba.cut_for_search(prompt) # 搜索引擎模式 - # seg_list = jieba.cut(prompt, cut_all=True) - # result = " ".join(seg_list) - # keywords = [result] * 3 - # print("keywords", keywords) - - search_threads = [] - urls_set = [] - if len(keywords) == 3: - search_url_num = 4 - if len(keywords) == 2: - search_url_num = 6 - if len(keywords) == 1: - search_url_num = 12 - # print(keywords) - yield "🌐 正在网上挑选最相关的信息源,请稍候..." - if config.USE_GOOGLE: - search_thread = ThreadWithReturnValue(target=getgooglesearchurl, args=(keywords[0],search_url_num,)) - search_thread.start() - search_threads.append(search_thread) - keywords.pop(0) - # print(keywords) - for keyword in keywords: - search_thread = ThreadWithReturnValue(target=getddgsearchurl, args=(keyword,search_url_num,)) - search_thread.start() - search_threads.append(search_thread) - # exit(0) - - for t in search_threads: - tmp = t.join() - urls_set += tmp - url_set_list = sorted(set(urls_set), key=lambda x: urls_set.index(x)) - url_set_list = sort_by_time(url_set_list) - - url_pdf_set_list = [item for item in url_set_list if item.endswith(".pdf")] - url_set_list = [item for item in url_set_list if not item.endswith(".pdf")] - # cut_num = int(len(url_set_list) * 1 / 3) - return url_set_list[:6], url_pdf_set_list - # return url_set_list, url_pdf_set_list - -def concat_url(threads): - url_result = [] - for t in threads: - tmp = t.join() - if tmp: - url_result.append(tmp) - return url_result - -def cut_message(message: str, max_tokens: int): - tiktoken.get_encoding("cl100k_base") - encoding = tiktoken.encoding_for_model(config.GPT_ENGINE) - encode_text = encoding.encode(message) - if len(encode_text) > max_tokens: - encode_text = encode_text[:max_tokens] - message = encoding.decode(encode_text) - encode_text = encoding.encode(message) - return message, len(encode_text) - -def get_url_text_list(prompt): - start_time = record_time.time() - yield "🌐 正在搜索您的问题,提取关键词..." - - # if config.PLUGINS["USE_G4F"]: - # chainllm = EducationalLLM() - # else: - # chainllm = ChatOpenAI(temperature=config.temperature, openai_api_base=config.bot_api_url.v1_url, model_name=config.GPT_ENGINE, openai_api_key=config.API) - chainllm = ChatOpenAI(temperature=config.temperature, openai_api_base=config.bot_api_url.v1_url, model_name=config.GPT_ENGINE, openai_api_key=config.API) - - # url_set_list, url_pdf_set_list = get_search_url(prompt, chainllm) - url_set_list, url_pdf_set_list = yield from get_search_url(prompt, chainllm) - - yield "🌐 已找到一些有用的链接,正在获取详细内容..." - threads = [] - for url in url_set_list: - # url_search_thread = ThreadWithReturnValue(target=jina_ai_Web_crawler, args=(url,True,)) - url_search_thread = ThreadWithReturnValue(target=Web_crawler, args=(url,True,)) - url_search_thread.start() - threads.append(url_search_thread) - - url_text_list = concat_url(threads) - # print("url_text_list", url_text_list) - - - yield "🌐 快完成了✅,正在为您整理搜索结果..." - end_time = record_time.time() - run_time = end_time - start_time - print("urls", url_set_list) - print(f"搜索用时:{run_time}秒") - - return url_text_list - -# Plugins 搜索 -def get_search_results(prompt: str): - - url_text_list = yield from get_url_text_list(prompt) - useful_source_text = "\n\n".join(url_text_list) - - # useful_source_text, search_tokens_len = cut_message(useful_source_text, context_max_tokens) - # print("search tokens len", search_tokens_len, "\n\n") - - return useful_source_text - -# Plugins 获取日期时间 -def get_date_time_weekday(): - import datetime - import pytz - tz = pytz.timezone('Asia/Shanghai') # 为东八区设置时区 - now = datetime.datetime.now(tz) # 获取东八区当前时间 - weekday = now.weekday() - weekday_str = ['星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期日'][weekday] - return "今天是:" + str(now.date()) + ",现在的时间是:" + str(now.time())[:-7] + "," + weekday_str - -# Plugins 使用函数 -def get_version_info(): - import subprocess - current_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - result = subprocess.run(['git', '-C', current_directory, 'log', '-1'], stdout=subprocess.PIPE) - output = result.stdout.decode() - return output - - - -# 公用函数 -def encode_image(image_path): - with open(image_path, "rb") as image_file: - return base64.b64encode(image_file.read()).decode('utf-8') - -def get_doc_from_url(url): - filename = urllib.parse.unquote(url.split("/")[-1]) - response = requests.get(url, stream=True) - with open(filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=1024): - f.write(chunk) - return filename - -def get_encode_image(image_url): - filename = get_doc_from_url(image_url) - image_path = os.getcwd() + "/" + filename - base64_image = encode_image(image_path) - prompt = f"data:image/jpeg;base64,{base64_image}" - os.remove(image_path) - return prompt - -def get_text_token_len(text): - tiktoken.get_encoding("cl100k_base") - encoding = tiktoken.encoding_for_model(config.GPT_ENGINE) - encode_text = encoding.encode(text) - return len(encode_text) - -def Document_extract(docurl): - filename = get_doc_from_url(docurl) - docpath = os.getcwd() + "/" + filename - if filename[-3:] == "pdf": - from pdfminer.high_level import extract_text - text = extract_text(docpath) - if filename[-3:] == "txt": - with open(docpath, 'r') as f: - text = f.read() - prompt = ( - "Here is the document, inside XML tags:" - "" - "{}" - "" - ).format(text) - os.remove(docpath) - return prompt - -def check_json(json_data): - while True: - try: - json.loads(json_data) - break - except json.decoder.JSONDecodeError as e: - print("JSON error:", e) - print("JSON body", repr(json_data)) - if "Invalid control character" in str(e): - json_data = json_data.replace("\n", "\\n") - if "Unterminated string starting" in str(e): - json_data += '"}' - if "Expecting ',' delimiter" in str(e): - json_data += '}' - if "Expecting value: line 1 column 1" in str(e): - json_data = '{"prompt": ' + json.dumps(json_data) + '}' - return json_data - -def is_surrounded_by_chinese(text, index): - left_char = text[index - 1] - if 0 < index < len(text) - 1: - right_char = text[index + 1] - return '\u4e00' <= left_char <= '\u9fff' or '\u4e00' <= right_char <= '\u9fff' - if index == len(text) - 1: - return '\u4e00' <= left_char <= '\u9fff' - return False - -def replace_char(string, index, new_char): - return string[:index] + new_char + string[index+1:] - -def claude_replace(text): - Punctuation_mapping = {",": ",", ":": ":", "!": "!", "?": "?", ";": ";"} - key_list = list(Punctuation_mapping.keys()) - for i in range(len(text)): - if is_surrounded_by_chinese(text, i) and (text[i] in key_list): - text = replace_char(text, i, Punctuation_mapping[text[i]]) - return text - -if __name__ == "__main__": - os.system("clear") - print(get_date_time_weekday()) - # print(get_version_info()) - print(get_search_results("今天的微博热搜有哪些?", 1000)) - - # from langchain.agents import get_all_tool_names - # print(get_all_tool_names()) - - # # 搜索 - - # for i in search_web_and_summary("今天的微博热搜有哪些?"): - # for i in search_web_and_summary("给出清华铊中毒案时间线,并作出你的评论。"): - # for i in search_web_and_summary("红警hbk08是谁"): - # for i in search_web_and_summary("国务院 2024 放假安排"): - # for i in search_web_and_summary("中国最新公布的游戏政策,对游戏行业和其他相关行业有什么样的影响?"): - # for i in search_web_and_summary("今天上海的天气怎么样?"): - # for i in search_web_and_summary("阿里云24核96G的云主机价格是多少"): - # for i in search_web_and_summary("话说葬送的芙莉莲动漫是半年番还是季番?完结没?"): - # for i in search_web_and_summary("周海媚事件进展"): - # for i in search_web_and_summary("macos 13.6 有什么新功能"): - # for i in search_web_and_summary("用python写个网络爬虫给我"): - # for i in search_web_and_summary("消失的她主要讲了什么?"): - # for i in search_web_and_summary("奥巴马的全名是什么?"): - # for i in search_web_and_summary("华为mate60怎么样?"): - # for i in search_web_and_summary("慈禧养的猫叫什么名字?"): - # for i in search_web_and_summary("民进党当初为什么支持柯文哲选台北市长?"): - # for i in search_web_and_summary("Has the United States won the china US trade war?"): - # for i in search_web_and_summary("What does 'n+2' mean in Huawei's 'Mate 60 Pro' chipset? Please conduct in-depth analysis."): - # for i in search_web_and_summary("AUTOMATIC1111 是什么?"): - # for i in search_web_and_summary("python telegram bot 怎么接收pdf文件"): - # for i in search_web_and_summary("中国利用外资指标下降了 87% ?真的假的。"): - # for i in search_web_and_summary("How much does the 'zeabur' software service cost per month? Is it free to use? Any limitations?"): - # for i in search_web_and_summary("英国脱欧没有好处,为什么英国人还是要脱欧?"): - # for i in search_web_and_summary("2022年俄乌战争为什么发生?"): - # for i in search_web_and_summary("卡罗尔与星期二讲的啥?"): - # for i in search_web_and_summary("金砖国家会议有哪些决定?"): - # for i in search_web_and_summary("iphone15有哪些新功能?"): - # for i in search_web_and_summary("python函数开头:def time(text: str) -> str:每个部分有什么用?"): - # print(i, end="") - - # 问答 - # result = asyncio.run(docQA("/Users/yanyuming/Downloads/GitHub/wiki/docs", "ubuntu 版本号怎么看?")) - # result = asyncio.run(docQA("https://yym68686.top", "说一下HSTL pipeline")) - # result = asyncio.run(docQA("https://wiki.yym68686.top", "PyTorch to MindSpore翻译思路是什么?")) - # print(result['answer']) - # result = asyncio.run(pdfQA("https://api.telegram.org/file/bot5569497961:AAHobhUuydAwD8SPkXZiVFybvZJOmGrST_w/documents/file_1.pdf", "HSTL的pipeline详细讲一下")) - # print(result) - # source_url = set([i.metadata['source'] for i in result["source_documents"]]) - # source_url = "\n".join(source_url) - # message = ( - # f"{result['result']}\n\n" - # f"参考链接:\n" - # f"{source_url}" - # ) - # print(message) \ No newline at end of file diff --git a/utils/prompt.py b/utils/prompt.py deleted file mode 100644 index d2c7a5e1..00000000 --- a/utils/prompt.py +++ /dev/null @@ -1,116 +0,0 @@ -translator_prompt = ( - "You are a translation engine, you can only translate text and cannot interpret it, and do not explain." - "Translate the text to {}, please do not explain any sentences, just translate or leave them as they are." - "This is the content you need to translate: " -) - -translator_en2zh_prompt = ( - "你是一位精通简体中文的专业翻译,尤其擅长将专业学术论文翻译成浅显易懂的科普文章。请你帮我将以下英文段落翻译成中文,风格与中文科普读物相似。" - "规则:" - "- 翻译时要准确传达原文的事实和背景。" - "- 即使上意译也要保留原始段落格式,以及保留术语,例如 FLAC,JPEG 等。保留公司缩写,例如 Microsoft, Amazon, OpenAI 等。" - "- 人名不翻译" - "- 同时要保留引用的论文,例如 [20] 这样的引用。" - "- 对于 Figure 和 Table,翻译的同时保留原有格式,例如:“Figure 1: ”翻译为“图 1: ”,“Table 1: ”翻译为:“表 1: ”。" - "- 全角括号换成半角括号,并在左括号前面加半角空格,右括号后面加半角空格。" - "- 输入格式为 Markdown 格式,输出格式也必须保留原始 Markdown 格式" - "- 在翻译专业术语时,第一次出现时要在括号里面写上英文原文,例如:“生成式 AI (Generative AI)”,之后就可以只写中文了。" - "- 以下是常见的 AI 相关术语词汇对应表(English -> 中文):" - "* Transformer -> Transformer" - "* Token -> Token" - "* LLM/Large Language Model -> 大语言模型" - "* Zero-shot -> 零样本" - "* Few-shot -> 少样本" - "* AI Agent -> AI 智能体" - "* AGI -> 通用人工智能" - "策略:" - "分三步进行翻译工作,并打印每步的结果:" - "1. 根据英文内容直译,保持原有格式,不要遗漏任何信息" - "2. 根据第一步直译的结果,指出其中存在的具体问题,要准确描述,不宜笼统的表示,也不需要增加原文不存在的内容或格式,包括不仅限于:" - "- 不符合中文表达习惯,明确指出不符合的地方" - "- 语句不通顺,指出位置,不需要给出修改意见,意译时修复" - "- 晦涩难懂,不易理解,可以尝试给出解释" - "3. 根据第一步直译的结果和第二步指出的问题,重新进行意译,保证内容的原意的基础上,使其更易于理解,更符合中文的表达习惯,同时保持原有的格式不变" - "返回格式如下,'{xxx}'表示占位符:" - "直译\n\n" - "{直译结果}\n\n" - "问题\n\n" - "{直译的具体问题列表}\n\n" - "意译\n\n" - "{意译结果}" - "现在请按照上面的要求翻译以下内容为简体中文:" -) - -search_key_word_prompt = ( - "根据我的问题,总结关键词概括问题,输出要求如下:" - "1. 给出三行不同的关键词组合,每行的关键词用空格连接。每行关键词可以是一个或者多个。" - "2. 至少有一行关键词里面有中文,至少有一行关键词里面有英文。" - "3. 只要直接给出这三行关键词,不需要其他任何解释,不要出现其他符号和内容。" - "4. 如果问题有关于日漫,至少有一行关键词里面有日文。" - "下面是一些根据问题提取关键词的示例:" - "问题 1:How much does the 'zeabur' software service cost per month? Is it free to use? Any limitations?" - "三行关键词是:" - "zeabur price" - "zeabur documentation" - "zeabur 价格" - "问题 2:pplx API 怎么使用?" - "三行关键词是:" - "pplx API demo" - "pplx API" - "pplx API 使用方法" - "问题 3:以色列哈马斯的最新情况" - "三行关键词是:" - "以色列 哈马斯 最新情况" - "Israel Hamas situation" - "哈马斯 以色列 冲突" - "问题 4:话说葬送的芙莉莲动漫是半年番还是季番?完结没?" - "三行关键词是:" - "葬送的芙莉莲" - "葬送のフリーレン" - "Frieren: Beyond Journey's End" - "问题 5:周海媚最近发生了什么" - "三行关键词是:" - "周海媚" - "周海媚 事件" - "Kathy Chau Hoi Mei news" - "这是我的问题:{source}" -) - -system_prompt = ( - "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally in {}. Knowledge cutoff: 2023-12. Current date: [ {} ]" - # "Search results is provided inside XML tags. Your task is to think about my question step by step and then answer my question based on the Search results provided. Please response with a style that is logical, in-depth, and detailed. Note: In order to make the answer appear highly professional, you should be an expert in textual analysis, aiming to make the answer precise and comprehensive. Directly response markdown format, without using markdown code blocks." -) - -claude_system_prompt = None - -search_system_prompt = ( - "You are ChatGPT, a large language model trained by OpenAI. Respond conversationally in {}." - "You can break down the task into multiple steps and search the web to answer my questions one by one." - "you needs to follow the following strategies:" - "- First, you need to analyze how many steps are required to answer my question.\n" - "- Then output the specific content of each step.\n" - "- Then start using web search and other tools to answer my question from the first step. Each step search only once.\n" - "- After each search is completed, it is necessary to summarize and then proceed to the next search until all parts of the step are completed.\n" - "- Continue until all tasks are completed, and finally summarize my question.\n" - # "Each search summary needs to follow the following strategies:" - # "- think about the user question step by step and then answer the user question based on the Search results provided." - "- Please response with a style that is logical, in-depth, and detailed." - # "- please enclose the thought process and the next steps in action using the XML tags ." - "Output format:" - "- Add the label 'thought:' before your thought process steps to indicate that it is your thinking process.\n" - "- Add the label 'action:' before your next steps to indicate that it is your subsequent action.\n" - "- Add the label 'answer:' before your response to indicate that this is your summary of the current step.\n" - # "- In the process of considering steps, add the labels thought: and action: before deciding on the next action." - # "- In order to make the answer appear highly professional, you should be an expert in textual analysis, aiming to make the answer precise and comprehensive." - # "- Directly response markdown format, without using markdown code blocks." -) - -claude3_doc_assistant_prompt = ( - "我将按下列要求回答用户的问题:" - "1. 仔细阅读文章,仔细地检查论文内容,反复检查全文,根据问题提取最相关的文档内容,只对原文有明确依据的信息作出回答。如果无法找到相关证据,直接说明论文没有提供相应信息,而不是给我假设。" - "2. 你所有回答都要有依据,给出出处,指出在论文的第几章的第几小节的第几段。" - "3. 除了上面的页数小节信息,还要给出每一点回答的原文依据,把所有关于这个细节的原文列出来。如果原文没有提到相关内容,直接告诉我没有,请不要杜撰、臆断、假设或者给出不准确的回答。" - "4. 使用简体中文分点作答,给出清晰、结构化、详尽的回答,语言严谨且学术化,逻辑清晰,行文流畅。" - "5. 每个学术词汇或者缩写都要标注英文全称。注意术语翻译正确。" - "我已经准备好,请提出你的问题。" -) \ No newline at end of file diff --git a/utils/sitemap.py b/utils/sitemap.py deleted file mode 100644 index 5917768a..00000000 --- a/utils/sitemap.py +++ /dev/null @@ -1,456 +0,0 @@ -import itertools -import re -from typing import Any, Callable, Generator, Iterable, List, Optional - -# from langchain.document_loaders.web_base import WebBaseLoader -from langchain.schema import Document - - -def _default_parsing_function(content: Any) -> str: - return str(content.get_text()) - - -def _default_meta_function(meta: dict, _content: Any) -> dict: - return {"source": meta["loc"], **meta} - - -def _batch_block(iterable: Iterable, size: int) -> Generator[List[dict], None, None]: - it = iter(iterable) - while item := list(itertools.islice(it, size)): - yield item - -"""Web base loader class.""" -import asyncio -import logging -import warnings -from typing import Any, Dict, Iterator, List, Optional, Union - -import aiohttp -import requests - -from langchain.docstore.document import Document - - -logger = logging.getLogger(__name__) - -default_header_template = { - "User-Agent": "", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*" - ";q=0.8", - "Accept-Language": "en-US,en;q=0.5", - "Referer": "https://www.google.com/", - "DNT": "1", - "Connection": "keep-alive", - "Upgrade-Insecure-Requests": "1", -} - - -def _build_metadata(soup: Any, url: str) -> dict: - """Build metadata from BeautifulSoup output.""" - metadata = {"source": url} - if title := soup.find("title"): - metadata["title"] = title.get_text() - if description := soup.find("meta", attrs={"name": "description"}): - metadata["description"] = description.get("content", "No description found.") - if html := soup.find("html"): - metadata["language"] = html.get("lang", "No language found.") - return metadata - -# from langchain.document_loaders.base import BaseLoader - -"""Abstract interface for document loader implementations.""" -from abc import ABC, abstractmethod -from typing import Iterator, List, Optional - -# from langchain.document_loaders.blob_loaders import Blob -from langchain.schema import Document -from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter - - -class BaseLoader(ABC): - """Interface for Document Loader. - - Implementations should implement the lazy-loading method using generators - to avoid loading all Documents into memory at once. - - The `load` method will remain as is for backwards compatibility, but its - implementation should be just `list(self.lazy_load())`. - """ - - # Sub-classes should implement this method - # as return list(self.lazy_load()). - # This method returns a List which is materialized in memory. - @abstractmethod - def load(self) -> List[Document]: - """Load data into Document objects.""" - - def load_and_split( - self, text_splitter: Optional[TextSplitter] = None - ) -> List[Document]: - """Load Documents and split into chunks. Chunks are returned as Documents. - - Args: - text_splitter: TextSplitter instance to use for splitting documents. - Defaults to RecursiveCharacterTextSplitter. - - Returns: - List of Documents. - """ - if text_splitter is None: - _text_splitter: TextSplitter = RecursiveCharacterTextSplitter() - else: - _text_splitter = text_splitter - docs = self.load() - return _text_splitter.split_documents(docs) - - # Attention: This method will be upgraded into an abstractmethod once it's - # implemented in all the existing subclasses. - def lazy_load( - self, - ) -> Iterator[Document]: - """A lazy loader for Documents.""" - raise NotImplementedError( - f"{self.__class__.__name__} does not implement lazy_load()" - ) - -class WebBaseLoader(BaseLoader): - """Load HTML pages using `urllib` and parse them with `BeautifulSoup'.""" - - web_paths: List[str] - - requests_per_second: int = 2 - """Max number of concurrent requests to make.""" - - default_parser: str = "html.parser" - """Default parser to use for BeautifulSoup.""" - - requests_kwargs: Dict[str, Any] = {} - """kwargs for requests""" - - raise_for_status: bool = False - """Raise an exception if http status code denotes an error.""" - - bs_get_text_kwargs: Dict[str, Any] = {} - """kwargs for beatifulsoup4 get_text""" - - def __init__( - self, - web_path: Union[str, List[str]], - header_template: Optional[dict] = None, - verify_ssl: Optional[bool] = True, - proxies: Optional[dict] = None, - continue_on_failure: Optional[bool] = False, - ): - """Initialize with webpage path.""" - - # TODO: Deprecate web_path in favor of web_paths, and remove this - # left like this because there are a number of loaders that expect single - # urls - if isinstance(web_path, str): - self.web_paths = [web_path] - elif isinstance(web_path, List): - self.web_paths = web_path - - try: - import bs4 # noqa:F401 - except ImportError: - raise ImportError( - "bs4 package not found, please install it with " "`pip install bs4`" - ) - - headers = header_template or default_header_template - if not headers.get("User-Agent"): - try: - from fake_useragent import UserAgent - - headers["User-Agent"] = UserAgent().random - except ImportError: - logger.info( - "fake_useragent not found, using default user agent." - "To get a realistic header for requests, " - "`pip install fake_useragent`." - ) - - self.session = requests.Session() - self.session.headers = dict(headers) - self.session.verify = verify_ssl - self.continue_on_failure = continue_on_failure - - if proxies: - self.session.proxies.update(proxies) - - @property - def web_path(self) -> str: - if len(self.web_paths) > 1: - raise ValueError("Multiple webpaths found.") - return self.web_paths[0] - - async def _fetch( - self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5 - ) -> str: - async with aiohttp.ClientSession() as session: - for i in range(retries): - try: - async with session.get( - url, - headers=self.session.headers, - ssl=None if self.session.verify else False, - ) as response: - return await response.text() - except aiohttp.ClientConnectionError as e: - if i == retries - 1: - raise - else: - logger.warning( - f"Error fetching {url} with attempt " - f"{i + 1}/{retries}: {e}. Retrying..." - ) - await asyncio.sleep(cooldown * backoff**i) - raise ValueError("retry count exceeded") - - async def _fetch_with_rate_limit( - self, url: str, semaphore: asyncio.Semaphore - ) -> str: - async with semaphore: - try: - return await self._fetch(url) - except Exception as e: - if self.continue_on_failure: - logger.warning( - f"Error fetching {url}, skipping due to" - f" continue_on_failure=True" - ) - return "" - logger.exception( - f"Error fetching {url} and aborting, use continue_on_failure=True " - "to continue loading urls after encountering an error." - ) - raise e - - async def fetch_all(self, urls: List[str]) -> Any: - """Fetch all urls concurrently with rate limiting.""" - semaphore = asyncio.Semaphore(self.requests_per_second) - tasks = [] - for url in urls: - task = asyncio.ensure_future(self._fetch_with_rate_limit(url, semaphore)) - tasks.append(task) - try: - from tqdm.asyncio import tqdm_asyncio - - return await tqdm_asyncio.gather( - *tasks, desc="Fetching pages", ascii=True, mininterval=1 - ) - except ImportError: - warnings.warn("For better logging of progress, `pip install tqdm`") - return await asyncio.gather(*tasks) - - @staticmethod - def _check_parser(parser: str) -> None: - """Check that parser is valid for bs4.""" - valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"] - if parser not in valid_parsers: - raise ValueError( - "`parser` must be one of " + ", ".join(valid_parsers) + "." - ) - - async def scrape_all(self, urls: List[str], parser: Union[str, None] = None) -> List[Any]: - """Fetch all urls, then return soups for all results.""" - from bs4 import BeautifulSoup - - results = await self.fetch_all(urls) - final_results = [] - for i, result in enumerate(results): - url = urls[i] - if parser is None: - if url.endswith(".xml"): - parser = "xml" - else: - parser = self.default_parser - self._check_parser(parser) - final_results.append(BeautifulSoup(result, parser)) - - return final_results - - def _scrape(self, url: str, parser: Union[str, None] = None) -> Any: - from bs4 import BeautifulSoup - - if parser is None: - if url.endswith(".xml"): - parser = "xml" - else: - parser = self.default_parser - - self._check_parser(parser) - - html_doc = self.session.get(url, **self.requests_kwargs) - if self.raise_for_status: - html_doc.raise_for_status() - html_doc.encoding = html_doc.apparent_encoding - return BeautifulSoup(html_doc.text, parser) - - def scrape(self, parser: Union[str, None] = None) -> Any: - """Scrape data from webpage and return it in BeautifulSoup format.""" - - if parser is None: - parser = self.default_parser - - return self._scrape(self.web_path, parser) - - def lazy_load(self) -> Iterator[Document]: - """Lazy load text from the url(s) in web_path.""" - for path in self.web_paths: - soup = self._scrape(path) - text = soup.get_text(**self.bs_get_text_kwargs) - metadata = _build_metadata(soup, path) - yield Document(page_content=text, metadata=metadata) - - def load(self) -> List[Document]: - """Load text from the url(s) in web_path.""" - return list(self.lazy_load()) - - def aload(self) -> List[Document]: - """Load text from the urls in web_path async into Documents.""" - - results = self.scrape_all(self.web_paths) - docs = [] - for i in range(len(results)): - soup = results[i] - text = soup.get_text(**self.bs_get_text_kwargs) - metadata = _build_metadata(soup, self.web_paths[i]) - docs.append(Document(page_content=text, metadata=metadata)) - - return docs - -class SitemapLoader(WebBaseLoader): - """Load a sitemap and its URLs.""" - - def __init__( - self, - web_path: str, - filter_urls: Optional[List[str]] = None, - parsing_function: Optional[Callable] = None, - blocksize: Optional[int] = None, - blocknum: int = 0, - meta_function: Optional[Callable] = None, - is_local: bool = False, - continue_on_failure: bool = False, - ): - """Initialize with webpage path and optional filter URLs. - - Args: - web_path: url of the sitemap. can also be a local path - filter_urls: list of strings or regexes that will be applied to filter the - urls that are parsed and loaded - parsing_function: Function to parse bs4.Soup output - blocksize: number of sitemap locations per block - blocknum: the number of the block that should be loaded - zero indexed. - Default: 0 - meta_function: Function to parse bs4.Soup output for metadata - remember when setting this method to also copy metadata["loc"] - to metadata["source"] if you are using this field - is_local: whether the sitemap is a local file. Default: False - continue_on_failure: whether to continue loading the sitemap if an error - occurs loading a url, emitting a warning instead of raising an - exception. Setting this to True makes the loader more robust, but also - may result in missing data. Default: False - """ - - if blocksize is not None and blocksize < 1: - raise ValueError("Sitemap blocksize should be at least 1") - - if blocknum < 0: - raise ValueError("Sitemap blocknum can not be lower then 0") - - try: - import lxml # noqa:F401 - except ImportError: - raise ImportError( - "lxml package not found, please install it with " "`pip install lxml`" - ) - - super().__init__(web_path) - - self.filter_urls = filter_urls - self.parsing_function = parsing_function or _default_parsing_function - self.meta_function = meta_function or _default_meta_function - self.blocksize = blocksize - self.blocknum = blocknum - self.is_local = is_local - self.continue_on_failure = continue_on_failure - - async def parse_sitemap(self, soup: Any) -> List[dict]: - """Parse sitemap xml and load into a list of dicts. - - Args: - soup: BeautifulSoup object. - - Returns: - List of dicts. - """ - els = [] - for url in soup.find_all("url"): - loc = url.find("loc") - if not loc: - continue - - # Strip leading and trailing whitespace and newlines - loc_text = loc.text.strip() - - if self.filter_urls and not any( - re.match(r, loc_text) for r in self.filter_urls - ): - continue - - els.append( - { - tag: prop.text - for tag in ["loc", "lastmod", "changefreq", "priority"] - if (prop := url.find(tag)) - } - ) - - for sitemap in soup.find_all("sitemap"): - loc = sitemap.find("loc") - if not loc: - continue - soup_child = await self.scrape_all([loc.text], "xml")[0] - - els.extend(self.parse_sitemap(soup_child)) - return els - - async def load(self) -> List[Document]: - """Load sitemap.""" - if self.is_local: - try: - import bs4 - except ImportError: - raise ImportError( - "beautifulsoup4 package not found, please install it" - " with `pip install beautifulsoup4`" - ) - fp = open(self.web_path) - soup = bs4.BeautifulSoup(fp, "xml") - else: - soup = self.scrape("xml") - - els = await self.parse_sitemap(soup) - - if self.blocksize is not None: - elblocks = list(_batch_block(els, self.blocksize)) - blockcount = len(elblocks) - if blockcount - 1 < self.blocknum: - raise ValueError( - "Selected sitemap does not contain enough blocks for given blocknum" - ) - else: - els = elblocks[self.blocknum] - - results = await self.scrape_all([el["loc"].strip() for el in els if "loc" in el]) - - return [ - Document( - page_content=self.parsing_function(results[i]), - metadata=self.meta_function(els[i], results[i]), - ) - for i in range(len(results)) - ] diff --git a/utils/typings.py b/utils/typings.py deleted file mode 100644 index faa902b5..00000000 --- a/utils/typings.py +++ /dev/null @@ -1,198 +0,0 @@ -""" -A module that contains all the types used in this project -""" - -import os -import platform -from enum import Enum -from typing import Union - - -python_version = list(platform.python_version_tuple()) -SUPPORT_ADD_NOTES = int(python_version[0]) >= 3 and int(python_version[1]) >= 11 - - -class ChatbotError(Exception): - """ - Base class for all Chatbot errors in this Project - """ - - def __init__(self, *args: object) -> None: - if SUPPORT_ADD_NOTES: - super().add_note( - "Please check that the input is correct, or you can resolve this issue by filing an issue", - ) - super().add_note("Project URL: https://github.com/acheong08/ChatGPT") - super().__init__(*args) - - -class ActionError(ChatbotError): - """ - Subclass of ChatbotError - - An object that throws an error because the execution of an operation is blocked - """ - - def __init__(self, *args: object) -> None: - if SUPPORT_ADD_NOTES: - super().add_note( - "The current operation is not allowed, which may be intentional", - ) - super().__init__(*args) - - -class ActionNotAllowedError(ActionError): - """ - Subclass of ActionError - - An object that throws an error because the execution of an unalloyed operation is blocked - """ - - -class ActionRefuseError(ActionError): - """ - Subclass of ActionError - - An object that throws an error because the execution of a refused operation is blocked. - """ - - -class CLIError(ChatbotError): - """ - Subclass of ChatbotError - - The error caused by a CLI program error - """ - - -class ErrorType(Enum): - """ - Enumeration class for different types of errors. - """ - - USER_ERROR = -1 - UNKNOWN_ERROR = 0 - SERVER_ERROR = 1 - RATE_LIMIT_ERROR = 2 - INVALID_REQUEST_ERROR = 3 - EXPIRED_ACCESS_TOKEN_ERROR = 4 - INVALID_ACCESS_TOKEN_ERROR = 5 - PROHIBITED_CONCURRENT_QUERY_ERROR = 6 - AUTHENTICATION_ERROR = 7 - CLOUDFLARE_ERROR = 8 - - -class Error(ChatbotError): - """ - Base class for exceptions in V1 module. - """ - - def __init__( - self, - source: str, - message: str, - *args: object, - code: Union[ErrorType, int] = ErrorType.UNKNOWN_ERROR, - ) -> None: - self.source: str = source - self.message: str = message - self.code: ErrorType | int = code - super().__init__(*args) - - def __str__(self) -> str: - return f"{self.source}: {self.message} (code: {self.code})" - - def __repr__(self) -> str: - return f"{self.source}: {self.message} (code: {self.code})" - - -class AuthenticationError(ChatbotError): - """ - Subclass of ChatbotError - - The object of the error thrown by a validation failure or exception - """ - - def __init__(self, *args: object) -> None: - if SUPPORT_ADD_NOTES: - super().add_note( - "Please check if your key is correct, maybe it may not be valid", - ) - super().__init__(*args) - - -class APIConnectionError(ChatbotError): - """ - Subclass of ChatbotError - - An exception object thrown when an API connection fails or fails to connect due to network or - other miscellaneous reasons - """ - - def __init__(self, *args: object) -> None: - if SUPPORT_ADD_NOTES: - super().add_note( - "Please check if there is a problem with your network connection", - ) - super().__init__(*args) - - -class NotAllowRunning(ActionNotAllowedError): - """ - Subclass of ActionNotAllowedError - - Direct startup is not allowed for some reason - """ - - -class ResponseError(APIConnectionError): - """ - Subclass of APIConnectionError - - Error objects caused by API request errors due to network or other miscellaneous reasons - """ - - -class OpenAIError(APIConnectionError): - """ - Subclass of APIConnectionError - - Error objects caused by OpenAI's own server errors - """ - - -class RequestError(APIConnectionError): - """ - Subclass of APIConnectionError - - There is a problem with the API response due to network or other miscellaneous reasons, or there - is no reply to the object that caused the error at all - """ - - -class Colors: - """ - Colors for printing - """ - - HEADER = "\033[95m" - OKBLUE = "\033[94m" - OKCYAN = "\033[96m" - OKGREEN = "\033[92m" - WARNING = "\033[93m" - FAIL = "\033[91m" - ENDC = "\033[0m" - BOLD = "\033[1m" - UNDERLINE = "\033[4m" - - def __init__(self) -> None: - if os.getenv("NO_COLOR"): - Colors.HEADER = "" - Colors.OKBLUE = "" - Colors.OKCYAN = "" - Colors.OKGREEN = "" - Colors.WARNING = "" - Colors.FAIL = "" - Colors.ENDC = "" - Colors.BOLD = "" - Colors.UNDERLINE = ""