Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 161 additions & 29 deletions app-backend/megaservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import json
import importlib
import re
import uuid
import aiofiles
from copy import deepcopy

# library import
from typing import List
Expand All @@ -18,6 +21,7 @@
from comps.cores.proto.api_protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
DocSumChatCompletionRequest,
ChatCompletionResponseChoice,
ChatMessage,
UsageInfo,
Expand All @@ -36,6 +40,44 @@
HOST_IP = os.getenv("HOST_IP", "0.0.0.0")
USE_NODE_ID_AS_IP = os.getenv("USE_NODE_ID_AS_IP","").lower() == 'true'

def encode_file_to_base64(file_path):
"""Encode the content of a file to a base64 string.

Args:
file_path (str): The path to the file to be encoded.

Returns:
str: The base64 encoded string of the file content.
"""
with open(file_path, "rb") as f:
base64_str = base64.b64encode(f.read()).decode("utf-8")
return base64_str

def read_text_from_file(file, save_file_name):
import docx2txt
from langchain.text_splitter import CharacterTextSplitter

# read text file
if file.headers["content-type"] == "text/plain":
file.file.seek(0)
content = file.file.read().decode("utf-8")
# Split text
text_splitter = CharacterTextSplitter()
texts = text_splitter.split_text(content)
# Create multiple documents
file_content = texts
# read pdf file
elif file.headers["content-type"] == "application/pdf":
documents = read_pdf(save_file_name)
file_content = [doc.page_content for doc in documents]
# read docx file
elif (
file.headers["content-type"] == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
or file.headers["content-type"] == "application/octet-stream"
):
file_content = docx2txt.process(save_file_name)

return file_content

class ChatTemplate:
@staticmethod
Expand Down Expand Up @@ -91,9 +133,16 @@ def add_remote_service(self):
for key, value in os.environ.items():
print(f"{key}: {value}")
templates = self.import_all_microservices_from_template()
if 'chat_input_ids' not in self.workflow_info:
raise Exception('chat_input_ids not found in workflow_info')
nodes = self.workflow_info['chat_input_ids'].copy()
# Get nodes from chat_input_ids or file_input_ids
input_node_ids = []
if 'chat_input_ids' in self.workflow_info:
input_node_ids.extend(self.workflow_info['chat_input_ids'])
if 'file_input_ids' in self.workflow_info:
input_node_ids.extend(self.workflow_info['file_input_ids'])
if not input_node_ids:
raise Exception('No chat_input_ids or file_input_ids found in workflow_info')
nodes = input_node_ids.copy()
print('nodes', nodes)
self.processed_node_infos = {}
self.services = {}
self.megaservices = {}
Expand All @@ -103,13 +152,13 @@ def add_remote_service(self):
node = self.workflow_info['nodes'][node_id]
print('node', node)
print('chat_input_ids', self.workflow_info['chat_input_ids'])
if node.get('megaserviceClient') or node_id in self.workflow_info['chat_input_ids']:
if node.get('megaserviceClient') or node_id in self.workflow_info.get('chat_input_ids', []) or node_id in self.workflow_info.get('file_input_ids', []):
print('new Megaservice', node_id)
if node_id in self.processed_node_infos:
# Prevent infinite loop
continue
print('start new Megaservice', node_id)
key = 'default' if node_id in self.workflow_info['chat_input_ids'] else node_id.split('@')[1]
key = 'default' if node_id in self.workflow_info.get('chat_input_ids', []) or node_id in self.workflow_info.get('file_input_ids', []) else node_id.split('@')[1]
self.megaservices[key] = ServiceOrchestrator()
self.megaservices[key].align_inputs = self.align_inputs
self.megaservices[key].align_outputs = self.align_outputs
Expand Down Expand Up @@ -144,7 +193,6 @@ def add_remote_service(self):
print('\n\n\n', '-'*20, 'self.services', self.services)
print('\n\n\n', '-'*20, 'self.megaservices', self.megaservices)
print('\n\n\n', '-'*20, 'self.processed_node_infos', self.processed_node_infos)


def align_inputs(self, inputs, *args, **kwargs):
"""Override this method in megaservice definition."""
Expand Down Expand Up @@ -293,9 +341,9 @@ def align_generator(self, gen, **kwargs):

buffer = ""
for line in gen:
print('line before decode', line)
# print('line before decode', line)
line = line.decode("utf-8")
print('line', line)
# print('line', line)
start = line.find("{")
end = line.rfind("}") + 1

Expand All @@ -304,20 +352,32 @@ def align_generator(self, gen, **kwargs):
json_data = json.loads(json_str)
if json_data["choices"][0]["finish_reason"] != "eos_token":
choice = json_data["choices"][0]
# without word buffer
if "delta" in choice and "content" in choice["delta"]:
buffer += choice["delta"]["content"]
data = choice["delta"]["content"]
print("json data", data)
yield f"data: {json_str}\n\n"
elif "text" in choice:
buffer += choice["text"]
buffer = buffer.replace("\\n", "\n")
print("buffer", buffer)
data = choice["text"]
yield f"data: {data}\n\n"



words = buffer.split()
if len(words) > 1:
output_word = words[0] + ' '
yield f"data: {output_word}\n\n"
buffer = " ".join(words[1:])
else:
buffer = words[0] if words else ""
# with word buffer
# if "delta" in choice and "content" in choice["delta"]:
# buffer += choice["delta"]["content"]
# elif "text" in choice:
# buffer += choice["text"]
# buffer = buffer.replace("\\n", "\n")
# print("buffer", buffer)

# words = buffer.split()
# if len(words) > 1:
# output_word = words[0] + ' '
# yield f"data: {output_word}\n\n"
# buffer = " ".join(words[1:])
# else:
# buffer = words[0] if words else ""
except Exception as e:
yield f"data: {json_str}\n\n"
if buffer:
Expand All @@ -326,7 +386,7 @@ def align_generator(self, gen, **kwargs):



async def handle_request(self, request: Request, megaservice):
async def handle_request(self, request: Request, megaservice=None):
data = await request.json()
print('\n'*5, '====== handle_request ======\n', data)
if 'chat_completion_ids' in self.workflow_info:
Expand Down Expand Up @@ -381,7 +441,7 @@ async def handle_request(self, request: Request, megaservice):
# handle the non-llm response
return result_dict[last_node]

async def handle_request_docsum(self, request: Request, files: List[UploadFile] = File(default=None)):
async def handle_request_docsum(self, request: Request, files: List[UploadFile] = File(default=None), megaservice=None):
"""Accept pure text, or files .txt/.pdf.docx, audio/video base64 string."""
if "application/json" in request.headers.get("content-type"):
data = await request.json()
Expand All @@ -407,17 +467,20 @@ async def handle_request_docsum(self, request: Request, files: List[UploadFile]
file_summaries = []
if files:
for file in files:
if not isinstance(file, UploadFile):
print(dir(file))
print(type(file))
# raise TypeError("Expected an UploadFile instance")

# Fix concurrency issue with the same file name
# https://github.com/opea-project/GenAIExamples/issues/1279
uid = str(uuid.uuid4())
file_path = f"/tmp/{uid}"

import aiofiles

async with aiofiles.open(file_path, "wb") as f:
await f.write(await file.read())

if data_type == "text":
print(f"Reading text from file: {file.filename}")
docs = read_text_from_file(file, file_path)
elif data_type in ["audio", "video"]:
docs = encode_file_to_base64(file_path)
Expand Down Expand Up @@ -445,13 +508,82 @@ async def handle_request_docsum(self, request: Request, files: List[UploadFile]

else:
raise ValueError(f"Unknown request type: {request.headers.get('content-type')}")

docsum_parameters = DocSumChatCompletionRequest(
messages="",
max_tokens=chat_request.max_tokens if chat_request.max_tokens else 1024,
top_k=chat_request.top_k if chat_request.top_k else 10,
top_p=chat_request.top_p if chat_request.top_p else 0.95,
temperature=chat_request.temperature if chat_request.temperature else 0.01,
frequency_penalty=chat_request.frequency_penalty if chat_request.frequency_penalty else 0.0,
presence_penalty=chat_request.presence_penalty if chat_request.presence_penalty else 0.0,
repetition_penalty=chat_request.repetition_penalty if chat_request.repetition_penalty else 1.03,
stream=stream_opt,
model=chat_request.model if chat_request.model else None,
language=chat_request.language if chat_request.language else "auto",
summary_type=summary_type,
chunk_overlap=chunk_overlap,
chunk_size=chunk_size,
)
text_only = "text" in initial_inputs_data
if not text_only:
result_dict, runtime_graph = await megaservice.schedule(
initial_inputs=initial_inputs_data, docsum_parameters=docsum_parameters
)

for node, response in result_dict.items():
# Here it suppose the last microservice in the megaservice is LLM.
if (
isinstance(response, StreamingResponse)
and node == list(megaservice.services.keys())[-1]
and megaservice.services[node].service_type == ServiceType.LLM
):
return response
else:
megaservice_text_only = ServiceOrchestrator()
megaservice_text_only.align_inputs = self.align_inputs
megaservice_text_only.align_outputs = self.align_outputs
megaservice_text_only.align_generator = self.align_generator
megaservice_text_only.services = deepcopy(megaservice.services)
megaservice_text_only.graph = deepcopy(megaservice.graph)
asr_node = [node for node in megaservice_text_only.services if megaservice_text_only.services[node].service_type == ServiceType.ASR][0]
llm_node = [node for node in megaservice_text_only.services if megaservice_text_only.services[node].service_type == ServiceType.LLM][0]
# remove ASR node and its edges
megaservice_text_only.delete_node_if_exists(asr_node)
result_dict, runtime_graph = await megaservice_text_only.schedule(
initial_inputs=initial_inputs_data, docsum_parameters=docsum_parameters
)

for node, response in result_dict.items():
# Here it suppose the last microservice in the megaservice is LLM.
if (
isinstance(response, StreamingResponse)
and node == list(megaservice_text_only.services.keys())[-1]
and megaservice_text_only.services[node].service_type == ServiceType.LLM
):
print("StreamingResponse detected in text only mode")
return response

last_node = runtime_graph.all_leaves()[-1]
response = result_dict[last_node]["text"]
choices = []
usage = UsageInfo()
choices.append(
ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
)
return ChatCompletionResponse(model="docsum", choices=choices, usage=usage)

def create_handle_request(self, megaservice):
async def handle_request_wrapper(request: Request):
if self.is_docsum:
return await self.handle_request_docsum(request)
else:
return await self.handle_request(request, megaservice)
if self.is_docsum:
async def handle_request_wrapper(request: Request, files: List[UploadFile] = File(default=None)):
return await self.handle_request_docsum(request, files, megaservice=megaservice)
else:
async def handle_request_wrapper(request: Request):
return await self.handle_request(request, megaservice=megaservice)
return handle_request_wrapper

def start(self):
Expand Down
11 changes: 11 additions & 0 deletions app-backend/templates/microservices/asr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from comps import MicroService, ServiceType

def get_service(host_ip="0.0.0.0", **kwargs):
return MicroService(
name="asr",
host=host_ip,
port=kwargs.get("port", 7066),
endpoint="/v1/asr",
use_remote_service=True,
service_type=ServiceType.ASR
)
11 changes: 11 additions & 0 deletions app-backend/templates/microservices/llm_codegen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from comps import MicroService, ServiceType

def get_service(host_ip = "0.0.0.0", **kwargs):
return MicroService(
name="llm",
host=host_ip,
port=kwargs.get("port", 9000),
endpoint="/v1/chat/completions",
use_remote_service=True,
service_type=ServiceType.LLM
)
11 changes: 11 additions & 0 deletions app-backend/templates/microservices/llm_docsum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from comps import MicroService, ServiceType

def get_service(host_ip = "0.0.0.0", **kwargs):
return MicroService(
name="llm",
host=host_ip,
port=kwargs.get("port", 9000),
endpoint="/v1/docsum",
use_remote_service=True,
service_type=ServiceType.LLM
)
19 changes: 13 additions & 6 deletions app-frontend/react/src/redux/Conversation/ConversationSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,12 @@ const eventStream = (type: string, body: any, conversationId: string = "") => {
if (msg?.data != "[DONE]") {
// console.log("msg", msg.data);
try {
if (type === "code") {
const parsedData = JSON.parse(msg.data);
result += parsedData.choices[0].delta.content;
store.dispatch(setOnGoingResult(result));
}
if (type === "chat") {
// if (type === "code") {
// const parsedData = JSON.parse(msg.data);
// result += parsedData.choices[0].delta.content;
// store.dispatch(setOnGoingResult(result));
// }
if (type === "chat" || type === "code") {
let parsed = false;

try {
Expand All @@ -690,9 +690,11 @@ const eventStream = (type: string, body: any, conversationId: string = "") => {
// We will extract the text between b' and '</s>'
// Note: This is a workaround and should be used with caution, as it assumes a specific format
// and may not work for all cases.
console.log('onmessage >> msg.data:', JSON.stringify(msg.data))
const match = msg.data.match(/data:\s*b'([^']*)'/);
if (match && match[1] !== "</s>") {
const extractedText = match[1];
console.log('onmessage >> extractedText:', JSON.stringify(extractedText));
result += extractedText;
store.dispatch(setOnGoingResult(result));
} else {
Expand All @@ -711,6 +713,10 @@ const eventStream = (type: string, body: any, conversationId: string = "") => {
}
}
} else {
if (msg.data === "") {
console.log("Empty message received");
return;
}
//text summary/faq for data: "ops string"
const res = JSON.parse(msg.data); // Parse valid JSON
const logs = res.ops;
Expand All @@ -729,6 +735,7 @@ const eventStream = (type: string, body: any, conversationId: string = "") => {
}
} catch (e) {
console.log("something wrong in msg", e);
console.log("msg", msg)
notify("Error in message response", NotificationSeverity.ERROR);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
- { name: 'llm-textgen', dockerfile: 'comps/llms/src/text-generation/Dockerfile' }
- { name: 'dataprep', dockerfile: 'comps/dataprep/src/Dockerfile' }
- { name: 'agent', dockerfile: 'comps/agent/src/Dockerfile' }
- { name: 'whisper', dockerfile: 'comps/third_parties/whisper/src/Dockerfile' }
- { name: 'llm-docsum', dockerfile: 'comps/llms/src/doc-summarization/Dockerfile' }
- { name: 'asr', dockerfile: 'comps/asr/src/Dockerfile' }
block:
- name: Build image
command: docker build -t {{ container_registry }}/{{ item.name }}:{{ container_tag }} -f {{ item.dockerfile }} .
Expand Down
Loading
Loading