Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feedback API #119

Merged
merged 3 commits into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion prompttools/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
# LICENSE file in the root directory of this source tree.


from .logger import Logger
from .logger import Logger, add_feedback


__all__ = [
"Logger",
"add_feedback",
]
43 changes: 40 additions & 3 deletions prompttools/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code's license can be found in the
# LICENSE file in the root directory of this source tree.
import json
import uuid

import requests
import threading
Expand All @@ -27,13 +28,21 @@ class Logger:
def __init__(self):
self.backend_url = f"{HEGEL_BACKEND_URL}/sdk/logger"
self.data_queue = queue.Queue()
self.feedback_queue = queue.Queue()
self.worker_thread = threading.Thread(target=self.worker)

# When the main thread is joining, put `None` into queue to signal worker thread to end
threading.Thread(target=lambda: threading.main_thread().join() or self.data_queue.put(None)).start()

self.worker_thread.start()

def add_feedback(self, log_id, metric_name, value):
self.feedback_queue.put({
"log_id": log_id,
"key": metric_name,
"value": value
})

def execute_and_add_to_queue(self, callable_func, **kwargs):
if "hegel_model" in kwargs:
hegel_model = kwargs["hegel_model"]
Expand All @@ -43,28 +52,40 @@ def execute_and_add_to_queue(self, callable_func, **kwargs):
start = perf_counter()
result = callable_func(**kwargs)
latency = perf_counter() - start
log_id = str(uuid.uuid4())
self.data_queue.put(
{
"hegel_model": hegel_model,
"result": result.model_dump_json(),
"input_parameters": json.dumps(kwargs),
"latency": latency,
"log_id": log_id,
}
)
result.log_id = log_id
return result

def wrap(self, callable_func):
return partial(self.execute_and_add_to_queue, callable_func)

def worker(self):
while True:
# Process logging data
if not self.data_queue.empty():
result = self.data_queue.get()
if result is None:
data = self.data_queue.get()
if data is None: # Shutdown signal
return
self.log_data_to_remote(result)
self.log_data_to_remote(data)
self.data_queue.task_done()

# Process feedback data
if not self.feedback_queue.empty():
feedback_data = self.feedback_queue.get()
if feedback_data is None: # Shutdown signal
return
self.send_feedback_to_remote(feedback_data)
self.feedback_queue.task_done()

def log_data_to_remote(self, data):
try:
headers = {
Expand All @@ -78,6 +99,19 @@ def log_data_to_remote(self, data):
except requests.exceptions.RequestException as e:
print(f"Error sending data to Flask API: {e}")

def send_feedback_to_remote(self, feedback_data):
feedback_url = f"{HEGEL_BACKEND_URL}/sdk/add_feedback/"
try:
headers = {
"Content-Type": "application/json",
"Authorization": os.environ["HEGELAI_API_KEY"],
}

response = requests.post(feedback_url, json=feedback_data, headers=headers)
if response.status_code != 200:
print(f"Failed to send feedback to Flask API. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error sending feedback to Flask API: {e}")

sender = Logger()
# Monkey-patching
Expand All @@ -86,3 +120,6 @@ def log_data_to_remote(self, data):
except Exception:
print("You may need to add `OPENAI_API_KEY=''` to your `.env` file.")
raise

def add_feedback(*args):
sender.add_feedback(*args)