-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
383 lines (332 loc) · 14.6 KB
/
tools.py
File metadata and controls
383 lines (332 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
from tavily import TavilyClient
import os
from dotenv import load_dotenv
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
"""
This module contains tools for agentic LLM interactions, including web search capabilities using the Tavily API.
It provides functions to check for environment variables and perform web searches to retrieve recent information relevant to the user's queries.
"""
def check_environment_variable(var_name):
"""
Check if the specified environment variable is set.
Args:
var_name (str): The name of the environment variable to check.
Returns:
bool: True if the environment variable is set, False otherwise.
"""
return var_name in os.environ and os.environ[var_name] != ""
def extract_content(url):
"""
Extracts content from a given URL using the Tavily API.
Args:
url (str): The URL from which to extract content.
Returns:
dict: A dictionary containing the extracted content with the following structure:
- 'title': The title of the page.
- 'content': The main content of the page.
- 'images': List of image URLs found on the page.
- 'links': List of links found on the page.
- 'raw_content': Raw content data (may be None).
"""
logger.info("Extracting content from URL: %s", url)
client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
result = client.extract(url)
if not result:
raise ValueError(f"Failed to extract content from URL: {url}")
return result
def web_search(query):
"""
Performs a web search using the Tavily API and returns the results.
This function initializes the Tavily client with the API key from environment variables
and performs a search query using their search engine. Tavily specializes in providing
up-to-date information from the web, making it suitable for retrieving recent news or trends.
Args:
query (str): The search query to perform.
Returns:
dict: A dictionary containing the search results with the following structure:
- 'query': The original search query.
- 'follow_up_questions': Suggested follow-up questions (may be None).
- 'answer': A direct answer to the query if available (may be None).
- 'images': List of relevant images if any.
- 'results': A list of dictionaries, each containing:
- 'title': The title of the search result.
- 'url': The URL to the full content.
- 'content': A text summary or snippet of the content.
- 'score': A relevance score of the search result (float between 0-1).
- 'raw_content': Raw content data (may be None).
- 'response_time': Time taken to process the query in seconds.
Example:
```
results = web_search("latest NVIDIA stock price")
print(f"Query: {results['query']}")
print(f"Response time: {results['response_time']} seconds")
for result in results['results']:
print(f"Title: {result['title']}")
print(f"URL: {result['url']}")
print(f"Score: {result['score']}")
print("---")
```
Note:
Ensure that the environment variable `TAVILY_API_KEY` is set with your Tavily API key before calling this function.
"""
logger.info("Performing web search with query: %s", query)
client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
results = client.search(query=query)
return results
def is_safe_math_expression(expression):
"""
Checks if the provided expression is a safe mathematical expression.
Args:
expression (str): The mathematical expression to check.
Returns:
bool: True if the expression is safe, False otherwise.
"""
# A simple check for safe characters (digits, operators, parentheses)
allowed_chars = set("0123456789+-*/(). ")
return all(char in allowed_chars for char in expression)
def calculator(expression):
"""
Evaluates a mathematical expression and returns the result.
WARNING: This function uses `eval()` to evaluate the expression, which can be dangerous if the input is not controlled. This should only be used with trusted input or in a controlled environment.
Args:
expression (str): The mathematical expression to evaluate.
Returns:
float: The result of the evaluated expression.
Raises:
ValueError: If the expression is unsafe or if there is an error during evaluation.
Example:
>>> result = calculator("3 + 5 * (2 - 8)")
>>> print(result)
-13.0
Note:
Ensure that the expression is safe before calling this function to avoid security risks associated with `eval()`.
"""
try:
if not is_safe_math_expression(expression):
raise ValueError(
"Unsafe mathematical expression detected. Please use only digits, operators, and parentheses."
)
# Evaluate the expression safely
result = eval(expression)
return result
except Exception as e:
raise ValueError(f"Error evaluating expression '{expression}': {e}")
def market_data(symbol, data_type="price", period="1d", interval="30m"):
"""
Fetches market data for trading analysis and decision-making.
This function provides various types of financial market data including current prices,
historical data, company information, and news for a given stock symbol using the yfinance API.
Args:
symbol (str): One stock ticker symbol (e.g., "AAPL", "MSFT", "NVDA")
data_type (str): Type of data to retrieve:
- "price": Current price and daily trading info (default)
- "historical": Historical OHLCV (Open, High, Low, Close, Volume) data
- "info": Company information and key financial statistics
- "news": Recent news articles about the company
period (str): Time period for historical data. Options include:
- "1d", "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "ytd", "max"
interval (str): Data interval for historical data. Options include:
- "1m", "2m", "5m", "15m", "30m", "60m", "90m", "1h", "1d", "5d", "1wk", "1mo", "3mo"
Note: Intraday data cannot extend last 60 days for "1m" interval
Returns:
dict: A dictionary containing the requested market data with the following structure:
- 'symbol': The ticker symbol requested
- 'timestamp': When the data was fetched (YYYY-MM-DD HH:MM:SS)
- 'data_type': The type of data requested
- 'data': The actual market data, structure depends on data_type:
- For "price": {
"current_price": float,
"open": float,
"high": float,
"low": float,
"volume": int,
"change_percent": float
}
- For "historical": List of dictionaries, each containing:
{
"date": "YYYY-MM-DD HH:MM:SS",
"open": float,
"high": float,
"low": float,
"close": float,
"volume": int
}
- For "info": {
"name": str,
"sector": str,
"industry": str,
"market_cap": int,
"pe_ratio": float,
"dividend_yield": float,
"52w_high": float,
"52w_low": float,
"avg_volume": int,
"description": str
}
- For "news": List of dictionaries, each containing:
{
"title": str,
"summary": str,
"content_type": str,
"publisher": str,
"published": str,
"link": str,
"thumbnail_url": str,
"editors_pick": bool,
"id": str
}
Example:
```
# Get current price
price_data = market_data("AAPL")
print(f"Current price of {price_data['symbol']}: ${price_data['data']['current_price']}")
# Get historical data
hist_data = market_data("MSFT", data_type="historical", period="5d")
print(f"Last 5 closing prices: {[day['close'] for day in hist_data['data'][:5]]}")
# Get news about a company
news_data = market_data("NVDA", data_type="news")
for article in news_data["data"]:
print(f"{article['title']} - {article['publisher']}")
```
Note:
This function requires the yfinance package to be installed.
Some data types and frequencies may be limited by the provider's API restrictions.
"""
print(
f"Fetching market data for {symbol} with data_type={data_type}, period={period}, interval={interval}"
)
try:
ticker = yf.Ticker(symbol)
result = {
"symbol": symbol,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"data_type": data_type,
"data": {},
}
if data_type == "price":
# Get current price and basic info
hist = ticker.history(period="3d")
if hist.empty:
raise ValueError(f"No data found for symbol: {symbol}")
# Get the last available price data
last_quote = hist.iloc[-1]
result["data"] = {
"current_price": round(last_quote["Close"], 2),
"open": round(last_quote["Open"], 2),
"high": round(last_quote["High"], 2),
"low": round(last_quote["Low"], 2),
"volume": int(last_quote["Volume"]),
"change_percent": (
round((last_quote["Close"] / hist.iloc[-2]["Close"] - 1) * 100, 2)
if len(hist) > 1
else 0
),
}
elif data_type == "historical":
# Get historical OHLCV data
hist = ticker.history(period=period, interval=interval)
if hist.empty:
raise ValueError(
f"No historical data found for {symbol} with period={period}, interval={interval}"
)
# Convert historical data to a list of dictionaries
hist_data = []
for date, row in hist.iterrows():
hist_data.append(
{
"date": date.strftime("%Y-%m-%d %H:%M:%S"),
"open": round(row["Open"], 2),
"high": round(row["High"], 2),
"low": round(row["Low"], 2),
"close": round(row["Close"], 2),
"volume": int(row["Volume"]),
}
)
result["data"] = hist_data
elif data_type == "info":
# Get company information
info = ticker.info
# Select the most relevant information
result["data"] = {
"name": info.get("shortName", ""),
"sector": info.get("sector", ""),
"industry": info.get("industry", ""),
"market_cap": info.get("marketCap", 0),
"pe_ratio": info.get("trailingPE", 0),
"dividend_yield": (
info.get("dividendYield", 0) * 100
if info.get("dividendYield")
else 0
),
"52w_high": info.get("fiftyTwoWeekHigh", 0),
"52w_low": info.get("fiftyTwoWeekLow", 0),
"avg_volume": info.get("averageVolume", 0),
"description": info.get("longBusinessSummary", ""),
}
elif data_type == "news":
# Get news about the company
news_items = ticker.news
result["data"] = []
for item in news_items[:5]: # Limit to 5 news items
try:
news_entry = {
# Basic info
"title": item.get("content", {}).get("title", "No Title"),
"summary": item.get("content", {}).get("summary", ""),
"content_type": item.get("content", {}).get(
"contentType", "STORY"
),
# Publication info
"publisher": item.get("content", {})
.get("provider", {})
.get("displayName", "Unknown Publisher"),
"published": item.get("content", {}).get("pubDate", ""),
# Links
"link": item.get("content", {})
.get("clickThroughUrl", {})
.get(
"url",
item.get("content", {})
.get("canonicalUrl", {})
.get("url", ""),
),
# Image (if available)
"thumbnail_url": item.get("content", {})
.get("thumbnail", {})
.get("resolutions", [{}])[0]
.get("url", ""),
# Metadata
"editors_pick": item.get("content", {})
.get("metadata", {})
.get("editorsPick", False),
"id": item.get("id", ""),
}
result["data"].append(news_entry)
except Exception as e:
logger.warning(f"Error processing news item: {e}")
if not result["data"]:
logger.warning(f"No news items could be processed for {symbol}")
else:
raise ValueError(
f"Invalid data_type: {data_type}. Must be 'price', 'historical', 'info', or 'news'"
)
print(result)
return result
except Exception as e:
raise ValueError(f"Error fetching market data for {symbol}: {str(e)}")
if __name__ == "__main__":
# Example usage of the web_search function
# Test market_data function
try:
# Get current price
data = market_data("NVDA")
print(data)
except Exception as e:
print(f"Error: {e}")