diff --git a/.github/workflows/dev-build.yaml b/.github/workflows/dev-build.yaml
index bf9a1e67fc..bcd509b5c8 100644
--- a/.github/workflows/dev-build.yaml
+++ b/.github/workflows/dev-build.yaml
@@ -6,7 +6,7 @@ concurrency:
on:
push:
- branches: ['agent-ui-animations'] # put your current branch to create a build. Core team only.
+ branches: ['3069-tokenizer-collector-improvements'] # put your current branch to create a build. Core team only.
paths-ignore:
- '**.md'
- 'cloud-deployments/*'
diff --git a/collector/processLink/convert/generic.js b/collector/processLink/convert/generic.js
index cd970d6362..89c83e3122 100644
--- a/collector/processLink/convert/generic.js
+++ b/collector/processLink/convert/generic.js
@@ -41,7 +41,7 @@ async function scrapeGenericUrl(link, textOnly = false) {
published: new Date().toLocaleString(),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processRawText/index.js b/collector/processRawText/index.js
index d435c9e7e0..a29eb63c37 100644
--- a/collector/processRawText/index.js
+++ b/collector/processRawText/index.js
@@ -55,7 +55,7 @@ async function processRawText(textContent, metadata) {
published: METADATA_KEYS.possible.published(metadata),
wordCount: textContent.split(" ").length,
pageContent: textContent,
- token_count_estimate: tokenizeString(textContent).length,
+ token_count_estimate: tokenizeString(textContent),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asAudio.js b/collector/processSingleFile/convert/asAudio.js
index 170426e406..5f033af74a 100644
--- a/collector/processSingleFile/convert/asAudio.js
+++ b/collector/processSingleFile/convert/asAudio.js
@@ -56,7 +56,7 @@ async function asAudio({ fullFilePath = "", filename = "", options = {} }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asDocx.js b/collector/processSingleFile/convert/asDocx.js
index b0fbd8843e..d33a46b943 100644
--- a/collector/processSingleFile/convert/asDocx.js
+++ b/collector/processSingleFile/convert/asDocx.js
@@ -42,7 +42,7 @@ async function asDocX({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asEPub.js b/collector/processSingleFile/convert/asEPub.js
index 827e3c3af4..51bb20c809 100644
--- a/collector/processSingleFile/convert/asEPub.js
+++ b/collector/processSingleFile/convert/asEPub.js
@@ -40,7 +40,7 @@ async function asEPub({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asMbox.js b/collector/processSingleFile/convert/asMbox.js
index 4adde23ec9..48de60fa37 100644
--- a/collector/processSingleFile/convert/asMbox.js
+++ b/collector/processSingleFile/convert/asMbox.js
@@ -53,7 +53,7 @@ async function asMbox({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
item++;
diff --git a/collector/processSingleFile/convert/asOfficeMime.js b/collector/processSingleFile/convert/asOfficeMime.js
index b6c3c0601f..09e320d168 100644
--- a/collector/processSingleFile/convert/asOfficeMime.js
+++ b/collector/processSingleFile/convert/asOfficeMime.js
@@ -38,7 +38,7 @@ async function asOfficeMime({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asPDF/index.js b/collector/processSingleFile/convert/asPDF/index.js
index bf14516419..e3e42d3bd7 100644
--- a/collector/processSingleFile/convert/asPDF/index.js
+++ b/collector/processSingleFile/convert/asPDF/index.js
@@ -49,7 +49,7 @@ async function asPdf({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asTxt.js b/collector/processSingleFile/convert/asTxt.js
index 53987f247d..bc95969e14 100644
--- a/collector/processSingleFile/convert/asTxt.js
+++ b/collector/processSingleFile/convert/asTxt.js
@@ -38,7 +38,7 @@ async function asTxt({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/processSingleFile/convert/asXlsx.js b/collector/processSingleFile/convert/asXlsx.js
index f21c6f1d9b..ca9b8ebac9 100644
--- a/collector/processSingleFile/convert/asXlsx.js
+++ b/collector/processSingleFile/convert/asXlsx.js
@@ -67,7 +67,7 @@ async function asXlsx({ fullFilePath = "", filename = "" }) {
published: createdDate(fullFilePath),
wordCount: content.split(/\s+/).length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
const document = writeToServerDocuments(
diff --git a/collector/utils/extensions/Confluence/index.js b/collector/utils/extensions/Confluence/index.js
index 0bce6bf781..d3e94d98c5 100644
--- a/collector/utils/extensions/Confluence/index.js
+++ b/collector/utils/extensions/Confluence/index.js
@@ -96,7 +96,7 @@ async function loadConfluence(
published: new Date().toLocaleString(),
wordCount: doc.pageContent.split(" ").length,
pageContent: doc.pageContent,
- token_count_estimate: tokenizeString(doc.pageContent).length,
+ token_count_estimate: tokenizeString(doc.pageContent),
};
console.log(
diff --git a/collector/utils/extensions/RepoLoader/GithubRepo/index.js b/collector/utils/extensions/RepoLoader/GithubRepo/index.js
index 10b408584d..b30fe3e456 100644
--- a/collector/utils/extensions/RepoLoader/GithubRepo/index.js
+++ b/collector/utils/extensions/RepoLoader/GithubRepo/index.js
@@ -59,7 +59,7 @@ async function loadGithubRepo(args, response) {
published: new Date().toLocaleString(),
wordCount: doc.pageContent.split(" ").length,
pageContent: doc.pageContent,
- token_count_estimate: tokenizeString(doc.pageContent).length,
+ token_count_estimate: tokenizeString(doc.pageContent),
};
console.log(
`[Github Loader]: Saving ${doc.metadata.source} to ${outFolder}`
diff --git a/collector/utils/extensions/RepoLoader/GitlabRepo/index.js b/collector/utils/extensions/RepoLoader/GitlabRepo/index.js
index c3ef513436..7a519b9cbf 100644
--- a/collector/utils/extensions/RepoLoader/GitlabRepo/index.js
+++ b/collector/utils/extensions/RepoLoader/GitlabRepo/index.js
@@ -75,7 +75,7 @@ async function loadGitlabRepo(args, response) {
}
data.wordCount = pageContent.split(" ").length;
- data.token_count_estimate = tokenizeString(pageContent).length;
+ data.token_count_estimate = tokenizeString(pageContent);
data.pageContent = pageContent;
console.log(
diff --git a/collector/utils/extensions/WebsiteDepth/index.js b/collector/utils/extensions/WebsiteDepth/index.js
index ea42176a7c..930d2feaa3 100644
--- a/collector/utils/extensions/WebsiteDepth/index.js
+++ b/collector/utils/extensions/WebsiteDepth/index.js
@@ -122,7 +122,7 @@ async function bulkScrapePages(links, outFolderPath) {
published: new Date().toLocaleString(),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
writeToServerDocuments(data, data.title, outFolderPath);
diff --git a/collector/utils/extensions/YoutubeTranscript/index.js b/collector/utils/extensions/YoutubeTranscript/index.js
index 8e5815e7ea..4234b06bbb 100644
--- a/collector/utils/extensions/YoutubeTranscript/index.js
+++ b/collector/utils/extensions/YoutubeTranscript/index.js
@@ -107,7 +107,7 @@ async function loadYouTubeTranscript({ url }) {
published: new Date().toLocaleString(),
wordCount: content.split(" ").length,
pageContent: content,
- token_count_estimate: tokenizeString(content).length,
+ token_count_estimate: tokenizeString(content),
};
console.log(`[YouTube Loader]: Saving ${metadata.title} to ${outFolder}`);
diff --git a/collector/utils/files/index.js b/collector/utils/files/index.js
index 0e2f5061ae..87fd3016e7 100644
--- a/collector/utils/files/index.js
+++ b/collector/utils/files/index.js
@@ -6,16 +6,62 @@ const documentsFolder =
? path.resolve("/storage/documents") // hardcoded to Render storage mount.
: path.resolve(__dirname, "../../../server/storage/documents");
+/**
+ * Checks if a file is text by checking the mime type and then falling back to buffer inspection.
+ * This way we can capture all the cases where the mime type is not known but still parseable as text
+ * without having to constantly add new mime type overrides.
+ * @param {string} filepath - The path to the file.
+ * @returns {boolean} - Returns true if the file is text, false otherwise.
+ */
function isTextType(filepath) {
+ if (!fs.existsSync(filepath)) return false;
+ const result = isKnownTextMime(filepath);
+ if (result.valid) return true; // Known text type - return true.
+ if (result.reason !== "generic") return false; // If any other reason than generic - return false.
+ return parseableAsText(filepath); // Fallback to parsing as text via buffer inspection.
+}
+
+/**
+ * Checks if a file is known to be text by checking the mime type.
+ * @param {string} filepath - The path to the file.
+ * @returns {boolean} - Returns true if the file is known to be text, false otherwise.
+ */
+function isKnownTextMime(filepath) {
try {
- if (!fs.existsSync(filepath)) return false;
const mimeLib = new MimeDetector();
const mime = mimeLib.getType(filepath);
- if (mimeLib.badMimes.includes(mime)) return false;
+ if (mimeLib.badMimes.includes(mime))
+ return { valid: false, reason: "bad_mime" };
const type = mime.split("/")[0];
- if (mimeLib.nonTextTypes.includes(type)) return false;
- return true;
+ if (mimeLib.nonTextTypes.includes(type))
+ return { valid: false, reason: "non_text_mime" };
+ return { valid: true, reason: "valid_mime" };
+ } catch (e) {
+ return { valid: false, reason: "generic" };
+ }
+}
+
+/**
+ * Checks if a file is parseable as text by forcing it to be read as text in utf8 encoding.
+ * If the file looks too much like a binary file, it will return false.
+ * @param {string} filepath - The path to the file.
+ * @returns {boolean} - Returns true if the file is parseable as text, false otherwise.
+ */
+function parseableAsText(filepath) {
+ try {
+ const fd = fs.openSync(filepath, "r");
+ const buffer = Buffer.alloc(1024); // Read first 1KB of the file synchronously
+ const bytesRead = fs.readSync(fd, buffer, 0, 1024, 0);
+ fs.closeSync(fd);
+
+ const content = buffer.subarray(0, bytesRead).toString("utf8");
+ const nullCount = (content.match(/\0/g) || []).length;
+ const controlCount = (content.match(/[\x00-\x08\x0B\x0C\x0E-\x1F]/g) || [])
+ .length;
+
+ const threshold = bytesRead * 0.1;
+ return nullCount + controlCount < threshold;
} catch {
return false;
}
diff --git a/collector/utils/files/mime.js b/collector/utils/files/mime.js
index e20ebe65fd..9bf22c2227 100644
--- a/collector/utils/files/mime.js
+++ b/collector/utils/files/mime.js
@@ -1,7 +1,6 @@
const MimeLib = require("mime");
-const path = require("path");
class MimeDetector {
- nonTextTypes = ["multipart", "image", "model", "audio", "video"];
+ nonTextTypes = ["multipart", "image", "model", "audio", "video", "font"];
badMimes = [
"application/octet-stream",
"application/zip",
@@ -48,11 +47,6 @@ class MimeDetector {
);
}
- // These are file types that are not detected by the mime library and need to be processed as text files.
- // You should only add file types that are not detected by the mime library, are parsable as text, and are files
- // with no extension. Otherwise, their extension should be added to the overrides array.
- #specialTextFileTypes = ["dockerfile", "jenkinsfile", "dockerignore"];
-
/**
* Returns the MIME type of the file. If the file has no extension found, it will be processed as a text file.
* @param {string} filepath
@@ -61,12 +55,6 @@ class MimeDetector {
getType(filepath) {
const parsedMime = this.lib.getType(filepath);
if (!!parsedMime) return parsedMime;
-
- // If the mime could not be parsed, it could be a special file type like Dockerfile or Jenkinsfile
- // which we can reliably process as text files.
- const baseName = path.basename(filepath)?.toLowerCase();
- if (this.#specialTextFileTypes.includes(baseName)) return "text/plain";
-
return null;
}
}
diff --git a/collector/utils/tokenizer/index.js b/collector/utils/tokenizer/index.js
index 618a7cdc7a..2086be2574 100644
--- a/collector/utils/tokenizer/index.js
+++ b/collector/utils/tokenizer/index.js
@@ -1,15 +1,66 @@
const { getEncoding } = require("js-tiktoken");
-function tokenizeString(input = "") {
- try {
- const encoder = getEncoding("cl100k_base");
- return encoder.encode(input);
- } catch (e) {
- console.error("Could not tokenize string!");
- return [];
+class TikTokenTokenizer {
+ static MAX_KB_ESTIMATE = 10;
+ static DIVISOR = 8;
+
+ constructor() {
+ if (TikTokenTokenizer.instance) {
+ this.log(
+ "Singleton instance already exists. Returning existing instance."
+ );
+ return TikTokenTokenizer.instance;
+ }
+
+ this.encoder = getEncoding("cl100k_base");
+ TikTokenTokenizer.instance = this;
+ this.log("Initialized new TikTokenTokenizer instance.");
+ }
+
+ log(text, ...args) {
+ console.log(`\x1b[35m[TikTokenTokenizer]\x1b[0m ${text}`, ...args);
+ }
+
+ /**
+ * Check if the input is too long to encode
+ * this is more of a rough estimate and a sanity check to prevent
+ * CPU issues from encoding too large of strings
+ * Assumes 1 character = 2 bytes in JS
+ * @param {string} input
+ * @returns {boolean}
+ */
+ #isTooLong(input) {
+ const bytesEstimate = input.length * 2;
+ const kbEstimate = Math.floor(bytesEstimate / 1024);
+ return kbEstimate >= TikTokenTokenizer.MAX_KB_ESTIMATE;
+ }
+
+ /**
+ * Encode a string into tokens for rough token count estimation.
+ * @param {string} input
+ * @returns {number}
+ */
+ tokenizeString(input = "") {
+ try {
+ if (this.#isTooLong(input)) {
+ this.log("Input will take too long to encode - estimating");
+ return Math.ceil(input.length / TikTokenTokenizer.DIVISOR);
+ }
+
+ return this.encoder.encode(input).length;
+ } catch (e) {
+ this.log("Could not tokenize string! Estimating...", e.message, e.stack);
+ return Math.ceil(input?.length / TikTokenTokenizer.DIVISOR) || 0;
+ }
}
}
+const tokenizer = new TikTokenTokenizer();
module.exports = {
- tokenizeString,
+ /**
+ * Encode a string into tokens for rough token count estimation.
+ * @param {string} input
+ * @returns {number}
+ */
+ tokenizeString: (input) => tokenizer.tokenizeString(input),
};
diff --git a/frontend/src/components/Modals/ManageWorkspace/Documents/UploadFile/index.jsx b/frontend/src/components/Modals/ManageWorkspace/Documents/UploadFile/index.jsx
index 01d79dd125..4dd04b023a 100644
--- a/frontend/src/components/Modals/ManageWorkspace/Documents/UploadFile/index.jsx
+++ b/frontend/src/components/Modals/ManageWorkspace/Documents/UploadFile/index.jsx
@@ -40,9 +40,11 @@ export default function UploadFile({
setFetchingUrl(false);
};
- // Don't spam fetchKeys, wait 1s between calls at least.
- const handleUploadSuccess = debounce(() => fetchKeys(true), 1000);
- const handleUploadError = (_msg) => null; // stubbed.
+ // Queue all fetchKeys calls through the same debouncer to prevent spamming the server.
+ // either a success or error will trigger a fetchKeys call so the UI is not stuck loading.
+ const debouncedFetchKeys = debounce(() => fetchKeys(true), 1000);
+ const handleUploadSuccess = () => debouncedFetchKeys();
+ const handleUploadError = () => debouncedFetchKeys();
const onDrop = async (acceptedFiles, rejections) => {
const newAccepted = acceptedFiles.map((file) => {
diff --git a/server/utils/AiProviders/deepseek/index.js b/server/utils/AiProviders/deepseek/index.js
index 7bc804bbb6..b91332a84a 100644
--- a/server/utils/AiProviders/deepseek/index.js
+++ b/server/utils/AiProviders/deepseek/index.js
@@ -2,10 +2,12 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
+const { v4: uuidv4 } = require("uuid");
+const { MODEL_MAP } = require("../modelMap");
const {
- handleDefaultStreamResponseV2,
+ writeResponseChunk,
+ clientAbortedHandler,
} = require("../../helpers/chat/responses");
-const { MODEL_MAP } = require("../modelMap");
class DeepSeekLLM {
constructor(embedder = null, modelPreference = null) {
@@ -27,6 +29,11 @@ class DeepSeekLLM {
this.embedder = embedder ?? new NativeEmbedder();
this.defaultTemp = 0.7;
+ this.log("Initialized with model:", this.model);
+ }
+
+ log(text, ...args) {
+ console.log(`\x1b[36m[${this.constructor.name}]\x1b[0m ${text}`, ...args);
}
#appendContext(contextTexts = []) {
@@ -71,6 +78,21 @@ class DeepSeekLLM {
return [prompt, ...chatHistory, { role: "user", content: userPrompt }];
}
+ /**
+ * Parses and prepends reasoning from the response and returns the full text response.
+ * @param {Object} response
+ * @returns {string}
+ */
+ #parseReasoningFromResponse({ message }) {
+ let textResponse = message?.content;
+ if (
+ !!message?.reasoning_content &&
+ message.reasoning_content.trim().length > 0
+ )
+ textResponse = `${message.reasoning_content}${textResponse}`;
+ return textResponse;
+ }
+
async getChatCompletion(messages = null, { temperature = 0.7 }) {
if (!(await this.isValidChatCompletionModel(this.model)))
throw new Error(
@@ -90,13 +112,15 @@ class DeepSeekLLM {
);
if (
- !result.output.hasOwnProperty("choices") ||
- result.output.choices.length === 0
+ !result?.output?.hasOwnProperty("choices") ||
+ result?.output?.choices?.length === 0
)
- return null;
+ throw new Error(
+ `Invalid response body returned from DeepSeek: ${JSON.stringify(result.output)}`
+ );
return {
- textResponse: result.output.choices[0].message.content,
+ textResponse: this.#parseReasoningFromResponse(result.output.choices[0]),
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
@@ -127,8 +151,143 @@ class DeepSeekLLM {
return measuredStreamRequest;
}
+ // TODO: This is a copy of the generic handleStream function in responses.js
+ // to specifically handle the DeepSeek reasoning model `reasoning_content` field.
+ // When or if ever possible, we should refactor this to be in the generic function.
handleStream(response, stream, responseProps) {
- return handleDefaultStreamResponseV2(response, stream, responseProps);
+ const { uuid = uuidv4(), sources = [] } = responseProps;
+ let hasUsageMetrics = false;
+ let usage = {
+ completion_tokens: 0,
+ };
+
+ return new Promise(async (resolve) => {
+ let fullText = "";
+ let reasoningText = "";
+
+ // Establish listener to early-abort a streaming response
+ // in case things go sideways or the user does not like the response.
+ // We preserve the generated text but continue as if chat was completed
+ // to preserve previously generated content.
+ const handleAbort = () => {
+ stream?.endMeasurement(usage);
+ clientAbortedHandler(resolve, fullText);
+ };
+ response.on("close", handleAbort);
+
+ try {
+ for await (const chunk of stream) {
+ const message = chunk?.choices?.[0];
+ const token = message?.delta?.content;
+ const reasoningToken = message?.delta?.reasoning_content;
+
+ if (
+ chunk.hasOwnProperty("usage") && // exists
+ !!chunk.usage && // is not null
+ Object.values(chunk.usage).length > 0 // has values
+ ) {
+ if (chunk.usage.hasOwnProperty("prompt_tokens")) {
+ usage.prompt_tokens = Number(chunk.usage.prompt_tokens);
+ }
+
+ if (chunk.usage.hasOwnProperty("completion_tokens")) {
+ hasUsageMetrics = true; // to stop estimating counter
+ usage.completion_tokens = Number(chunk.usage.completion_tokens);
+ }
+ }
+
+ // Reasoning models will always return the reasoning text before the token text.
+ if (reasoningToken) {
+ // If the reasoning text is empty (''), we need to initialize it
+ // and send the first chunk of reasoning text.
+ if (reasoningText.length === 0) {
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: `${reasoningToken}`,
+ close: false,
+ error: false,
+ });
+ reasoningText += `${reasoningToken}`;
+ continue;
+ } else {
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: reasoningToken,
+ close: false,
+ error: false,
+ });
+ reasoningText += reasoningToken;
+ }
+ }
+
+ // If the reasoning text is not empty, but the reasoning token is empty
+ // and the token text is not empty we need to close the reasoning text and begin sending the token text.
+ if (!!reasoningText && !reasoningToken && token) {
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: ``,
+ close: false,
+ error: false,
+ });
+ fullText += `${reasoningText}`;
+ reasoningText = "";
+ }
+
+ if (token) {
+ fullText += token;
+ // If we never saw a usage metric, we can estimate them by number of completion chunks
+ if (!hasUsageMetrics) usage.completion_tokens++;
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: token,
+ close: false,
+ error: false,
+ });
+ }
+
+ // LocalAi returns '' and others return null on chunks - the last chunk is not "" or null.
+ // Either way, the key `finish_reason` must be present to determine ending chunk.
+ if (
+ message?.hasOwnProperty("finish_reason") && // Got valid message and it is an object with finish_reason
+ message.finish_reason !== "" &&
+ message.finish_reason !== null
+ ) {
+ writeResponseChunk(response, {
+ uuid,
+ sources,
+ type: "textResponseChunk",
+ textResponse: "",
+ close: true,
+ error: false,
+ });
+ response.removeListener("close", handleAbort);
+ stream?.endMeasurement(usage);
+ resolve(fullText);
+ break; // Break streaming when a valid finish_reason is first encountered
+ }
+ }
+ } catch (e) {
+ console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`);
+ writeResponseChunk(response, {
+ uuid,
+ type: "abort",
+ textResponse: null,
+ sources: [],
+ close: true,
+ error: e.message,
+ });
+ stream?.endMeasurement(usage);
+ resolve(fullText); // Return what we currently have - if anything.
+ }
+ });
}
async embedTextInput(textInput) {
diff --git a/server/utils/AiProviders/openRouter/index.js b/server/utils/AiProviders/openRouter/index.js
index 08f040150f..88fbcfb633 100644
--- a/server/utils/AiProviders/openRouter/index.js
+++ b/server/utils/AiProviders/openRouter/index.js
@@ -167,6 +167,18 @@ class OpenRouterLLM {
return content.flat();
}
+ /**
+ * Parses and prepends reasoning from the response and returns the full text response.
+ * @param {Object} response
+ * @returns {string}
+ */
+ #parseReasoningFromResponse({ message }) {
+ let textResponse = message?.content;
+ if (!!message?.reasoning && message.reasoning.trim().length > 0)
+ textResponse = `${message.reasoning}${textResponse}`;
+ return textResponse;
+ }
+
constructPrompt({
systemPrompt = "",
contextTexts = [],
@@ -200,6 +212,9 @@ class OpenRouterLLM {
model: this.model,
messages,
temperature,
+ // This is an OpenRouter specific option that allows us to get the reasoning text
+ // before the token text.
+ include_reasoning: true,
})
.catch((e) => {
throw new Error(e.message);
@@ -207,13 +222,15 @@ class OpenRouterLLM {
);
if (
- !result.output.hasOwnProperty("choices") ||
- result.output.choices.length === 0
+ !result?.output?.hasOwnProperty("choices") ||
+ result?.output?.choices?.length === 0
)
- return null;
+ throw new Error(
+ `Invalid response body returned from OpenRouter: ${result.output?.error?.message || "Unknown error"} ${result.output?.error?.code || "Unknown code"}`
+ );
return {
- textResponse: result.output.choices[0].message.content,
+ textResponse: this.#parseReasoningFromResponse(result.output.choices[0]),
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
@@ -236,6 +253,9 @@ class OpenRouterLLM {
stream: true,
messages,
temperature,
+ // This is an OpenRouter specific option that allows us to get the reasoning text
+ // before the token text.
+ include_reasoning: true,
}),
messages
// We have to manually count the tokens
@@ -262,6 +282,7 @@ class OpenRouterLLM {
return new Promise(async (resolve) => {
let fullText = "";
+ let reasoningText = "";
let lastChunkTime = null; // null when first token is still not received.
// Establish listener to early-abort a streaming response
@@ -313,8 +334,55 @@ class OpenRouterLLM {
for await (const chunk of stream) {
const message = chunk?.choices?.[0];
const token = message?.delta?.content;
+ const reasoningToken = message?.delta?.reasoning;
lastChunkTime = Number(new Date());
+ // Reasoning models will always return the reasoning text before the token text.
+ // can be null or ''
+ if (reasoningToken) {
+ // If the reasoning text is empty (''), we need to initialize it
+ // and send the first chunk of reasoning text.
+ if (reasoningText.length === 0) {
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: `${reasoningToken}`,
+ close: false,
+ error: false,
+ });
+ reasoningText += `${reasoningToken}`;
+ continue;
+ } else {
+ // If the reasoning text is not empty, we need to append the reasoning text
+ // to the existing reasoning text.
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: reasoningToken,
+ close: false,
+ error: false,
+ });
+ reasoningText += reasoningToken;
+ }
+ }
+
+ // If the reasoning text is not empty, but the reasoning token is empty
+ // and the token text is not empty we need to close the reasoning text and begin sending the token text.
+ if (!!reasoningText && !reasoningToken && token) {
+ writeResponseChunk(response, {
+ uuid,
+ sources: [],
+ type: "textResponseChunk",
+ textResponse: ``,
+ close: false,
+ error: false,
+ });
+ fullText += `${reasoningText}`;
+ reasoningText = "";
+ }
+
if (token) {
fullText += token;
writeResponseChunk(response, {
diff --git a/server/utils/helpers/tiktoken.js b/server/utils/helpers/tiktoken.js
index a3fa3b6396..394f261874 100644
--- a/server/utils/helpers/tiktoken.js
+++ b/server/utils/helpers/tiktoken.js
@@ -1,10 +1,36 @@
const { getEncodingNameForModel, getEncoding } = require("js-tiktoken");
+/**
+ * @class TokenManager
+ *
+ * @notice
+ * We cannot do estimation of tokens here like we do in the collector
+ * because we need to know the model to do it.
+ * Other issues are we also do reverse tokenization here for the chat history during cannonballing.
+ * So here we are stuck doing the actual tokenization and encoding until we figure out what to do with prompt overflows.
+ */
class TokenManager {
+ static instance = null;
+ static currentModel = null;
+
constructor(model = "gpt-3.5-turbo") {
+ if (TokenManager.instance && TokenManager.currentModel === model) {
+ this.log("Returning existing instance for model:", model);
+ return TokenManager.instance;
+ }
+
this.model = model;
this.encoderName = this.#getEncodingFromModel(model);
this.encoder = getEncoding(this.encoderName);
+
+ TokenManager.instance = this;
+ TokenManager.currentModel = model;
+ this.log("Initialized new TokenManager instance for model:", model);
+ return this;
+ }
+
+ log(text, ...args) {
+ console.log(`\x1b[35m[TokenManager]\x1b[0m ${text}`, ...args);
}
#getEncodingFromModel(model) {
@@ -15,9 +41,11 @@ class TokenManager {
}
}
- // Pass in an empty array of disallowedSpecials to handle all tokens as text and to be tokenized.
- // https://github.com/openai/tiktoken/blob/9e79899bc248d5313c7dd73562b5e211d728723d/tiktoken/core.py#L91C20-L91C38
- // Returns number[]
+ /**
+ * Pass in an empty array of disallowedSpecials to handle all tokens as text and to be tokenized.
+ * @param {string} input
+ * @returns {number[]}
+ */
tokensFromString(input = "") {
try {
const tokens = this.encoder.encode(String(input), undefined, []);
@@ -28,17 +56,31 @@ class TokenManager {
}
}
+ /**
+ * Converts an array of tokens back to a string.
+ * @param {number[]} tokens
+ * @returns {string}
+ */
bytesFromTokens(tokens = []) {
const bytes = this.encoder.decode(tokens);
return bytes;
}
- // Returns number
+ /**
+ * Counts the number of tokens in a string.
+ * @param {string} input
+ * @returns {number}
+ */
countFromString(input = "") {
const tokens = this.tokensFromString(input);
return tokens.length;
}
+ /**
+ * Estimates the number of tokens in a string or array of strings.
+ * @param {string | string[]} input
+ * @returns {number}
+ */
statsFrom(input) {
if (typeof input === "string") return this.countFromString(input);