-
Notifications
You must be signed in to change notification settings - Fork 0
/
tools.py
143 lines (123 loc) · 5 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import json
import logging
import asyncio
import requests
from typing import List
from bs4 import BeautifulSoup
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain import PromptTemplate
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
import constants
from prompts import EXTRACT_PROMPT, MAP_PROMPT, REDUCE_PROMPT, SEARCH_QUERY_PROMPT, SEARCH_RESULT_RANK_PROMPT
def scrape(
url: str,
query:str,
extraction_chain: LLMChain,
map_reduce_chain,
map_reduce=False):
logging.info(f'scraping {url}')
post_url = f"https://chrome.browserless.io/content?token={constants.BROWSERLESS_API_KEY}"
data = {
"url": url,
}
headers = {
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
}
data_json = json.dumps(data)
response = requests.post(post_url, headers=headers, data=data_json)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "html.parser")
for script in soup(["script", "style", "header", "footer", "nav"]):
script.decompose()
text = soup.get_text()
logging.debug(f'raw website: {text}')
if len(text) > 20000 and map_reduce:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=50)
chunks = text_splitter.create_documents([text])
# running this map_reduce concurrently to call gpt api would be nice to have
output = map_reduce_chain.run(chunks)
logging.debug(f'extraction with mapreduce: {output}')
return {'description': output, "url": url, "reduced": True}
else:
output = extraction_chain.run(text=text[:20000], query=query)
logging.debug(f'extraction with cutoff: {output}')
return {'description': output, "url": url, "reduced": False}
else:
print(f"Status: {response.status_code}, content: {response.content}")
async def ascrape_multiple_websites(urls: List[str], query: str, map_reduce = False):
"""
loop throught urls, call browser to get render html, parse using bs and extract with openAI
all are done concurrently and stream the result back to the client as it is done.
"""
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-16k", api_key=constants.OPENAI_API_KEY)
extract_prompt_template = PromptTemplate(
template=EXTRACT_PROMPT, input_variables=["query", "text"])
extraction_chain = LLMChain(
llm=llm,
prompt=extract_prompt_template,
verbose=False,
)
map_reduce_chain = load_summarize_chain(
llm=llm,
chain_type='map_reduce',
verbose=False,
map_prompt=PromptTemplate(
template=MAP_PROMPT,
input_variables=['text', 'query']
).partial(query=query),
combine_prompt=PromptTemplate(
template=REDUCE_PROMPT,
input_variables=['text'])
)
def request(url):
return scrape(
url=url,
query=query,
extraction_chain=extraction_chain,
map_reduce_chain=map_reduce_chain,
map_reduce=map_reduce
)
loop = asyncio.get_event_loop()
for f in asyncio.as_completed([loop.run_in_executor(None, request, url) for url in urls]):
result = await f
if result is not None:
logging.info('sending result for {url}'.format(url=result.get('url')))
yield json.dumps(result)
else:
yield ''
async def amulti_search(queries: List[str]):
def single_search(query: str):
url = "https://google.serper.dev/search"
payload = json.dumps({
"q": query
})
headers = {
'X-API-KEY': constants.SERP_API_KEY,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json()['organic'][:constants.TOP_K]
loop = asyncio.get_event_loop()
return await asyncio.gather(*[loop.run_in_executor(None, single_search, query) for query in queries])
async def allm_rank_chain(query: str):
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-16k", api_key=constants.OPENAI_API_KEY)
query_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(template=SEARCH_QUERY_PROMPT, input_variables=['query']),
verbose=True
)
queries = json.loads(await query_chain.arun(query=query))
search_result = await amulti_search(queries)
rank_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
template=SEARCH_RESULT_RANK_PROMPT,
input_variables=['query', 'top_k', 'result']
).partial(query=query, top_k=constants.TOP_K),
verbose=True
)
urls = await rank_chain.arun(result=search_result)
return urls