diff --git a/src/agent/library/skill_library.js b/src/agent/library/skill_library.js index 4470586f1..b803e3e24 100644 --- a/src/agent/library/skill_library.js +++ b/src/agent/library/skill_library.js @@ -1,6 +1,7 @@ import { cosineSimilarity } from '../../utils/math.js'; import { getSkillDocs } from './index.js'; import { wordOverlapScore } from '../../utils/text.js'; +import { embedWithProgress } from '../../utils/rate_limiter.js'; export class SkillLibrary { constructor(agent,embedding_model) { @@ -15,13 +16,27 @@ export class SkillLibrary { this.skill_docs = skillDocs; if (this.embedding_model) { try { - const embeddingPromises = skillDocs.map((doc) => { - return (async () => { - let func_name_desc = doc.split('\n').slice(0, 2).join(''); - this.skill_docs_embeddings[doc] = await this.embedding_model.embed(func_name_desc); - })(); - }); - await Promise.all(embeddingPromises); + const docsToEmbed = skillDocs.map(doc => ({ + doc, + text: doc.split('\n').slice(0, 2).join('') + })); + + const modelName = this.embedding_model.model_name || this.embedding_model.constructor?.name || 'unknown'; + + const embeddings = await embedWithProgress( + docsToEmbed, + async (text) => await this.embedding_model.embed(text), + 'skills', + { + cacheKey: 'skills', + modelName: modelName, + getTextFn: (item) => item.text + } + ); + + for (const [item, embedding] of embeddings) { + this.skill_docs_embeddings[item.doc] = embedding; + } } catch (error) { console.warn('Error with embedding model, using word-overlap instead.'); this.embedding_model = null; diff --git a/src/utils/embedding_cache.js b/src/utils/embedding_cache.js new file mode 100644 index 000000000..832bfaa9f --- /dev/null +++ b/src/utils/embedding_cache.js @@ -0,0 +1,125 @@ +/** + * Persistent cache for embeddings to avoid re-computing on restart + */ + +import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'fs'; +import { createHash } from 'crypto'; +import path from 'path'; + +const CACHE_DIR = './bots/.cache'; +const CACHE_VERSION = 1; // Bump this if cache format changes + +/** + * Get a hash of the content for cache keying + */ +function hashContent(content) { + return createHash('md5').update(content).digest('hex'); +} + +/** + * Load embeddings from cache + * @param {string} cacheKey - Unique key for this cache (e.g., 'examples', 'skills') + * @param {string} modelName - Model name to invalidate cache if model changes + * @returns {Object|null} Cached embeddings or null if not found/invalid + */ +export function loadEmbeddingCache(cacheKey, modelName) { + const cachePath = path.join(CACHE_DIR, `${cacheKey}_embeddings.json`); + + try { + if (!existsSync(cachePath)) { + return null; + } + + const cache = JSON.parse(readFileSync(cachePath, 'utf8')); + + // Validate cache version and model + if (cache.version !== CACHE_VERSION || cache.model !== modelName) { + console.log(`Embedding cache for ${cacheKey} invalidated (model or version changed)`); + return null; + } + + console.log(`Loaded ${Object.keys(cache.embeddings).length} cached embeddings for ${cacheKey}`); + return cache.embeddings; + } catch (err) { + console.warn(`Failed to load embedding cache for ${cacheKey}:`, err.message); + return null; + } +} + +/** + * Save embeddings to cache + * @param {string} cacheKey - Unique key for this cache + * @param {string} modelName - Model name for cache invalidation + * @param {Object} embeddings - Map of text -> embedding + */ +export function saveEmbeddingCache(cacheKey, modelName, embeddings) { + const cachePath = path.join(CACHE_DIR, `${cacheKey}_embeddings.json`); + + try { + mkdirSync(CACHE_DIR, { recursive: true }); + + const cache = { + version: CACHE_VERSION, + model: modelName, + timestamp: new Date().toISOString(), + embeddings: embeddings + }; + + writeFileSync(cachePath, JSON.stringify(cache), 'utf8'); + console.log(`Saved ${Object.keys(embeddings).length} embeddings to cache for ${cacheKey}`); + } catch (err) { + console.warn(`Failed to save embedding cache for ${cacheKey}:`, err.message); + } +} + +/** + * Get embeddings with caching support + * @param {Array} items - Items to embed + * @param {Function} getTextFn - Function to extract text from item: (item) => string + * @param {Function} embedFn - Async function to embed text: (text) => embedding + * @param {string} cacheKey - Cache key for this set of embeddings + * @param {string} modelName - Model name for cache invalidation + * @param {Function} progressFn - Optional progress callback: (current, total, item) => void + * @returns {Promise} Map of item -> embedding + */ +export async function getEmbeddingsWithCache(items, getTextFn, embedFn, cacheKey, modelName, progressFn = null) { + const results = new Map(); + const cachedEmbeddings = loadEmbeddingCache(cacheKey, modelName) || {}; + const toEmbed = []; + + // Check what's already cached + for (const item of items) { + const text = getTextFn(item); + const hash = hashContent(text); + + if (cachedEmbeddings[hash]) { + results.set(item, cachedEmbeddings[hash]); + } else { + toEmbed.push({ item, text, hash }); + } + } + + if (toEmbed.length === 0) { + console.log(`${cacheKey}: All ${items.length} embeddings loaded from cache`); + return results; + } + + console.log(`${cacheKey}: Embedding ${toEmbed.length} items (${items.length - toEmbed.length} cached)...`); + + // Embed missing items + const newEmbeddings = {}; + for (let i = 0; i < toEmbed.length; i++) { + const { item, text, hash } = toEmbed[i]; + + const embedding = await embedFn(text); + results.set(item, embedding); + newEmbeddings[hash] = embedding; + cachedEmbeddings[hash] = embedding; + } + + // Save updated cache + saveEmbeddingCache(cacheKey, modelName, cachedEmbeddings); + console.log(`${cacheKey}: Done (${toEmbed.length} embedded, ${results.size} total)`); + + return results; +} diff --git a/src/utils/examples.js b/src/utils/examples.js index 470663d20..889584668 100644 --- a/src/utils/examples.js +++ b/src/utils/examples.js @@ -1,12 +1,14 @@ import { cosineSimilarity } from './math.js'; import { stringifyTurns, wordOverlapScore } from './text.js'; +import { embedWithProgress } from './rate_limiter.js'; export class Examples { - constructor(model, select_num=2) { + constructor(model, select_num=2, cacheKey='examples') { this.examples = []; this.model = model; this.select_num = select_num; this.embeddings = {}; + this.cacheKey = cacheKey; } turnsToText(turns) { @@ -26,17 +28,23 @@ export class Examples { return; try { - // Create array of promises first - const embeddingPromises = examples.map(example => { - const turn_text = this.turnsToText(example); - return this.model.embed(turn_text) - .then(embedding => { - this.embeddings[turn_text] = embedding; - }); - }); + const textsToEmbed = examples.map(example => this.turnsToText(example)); + const modelName = this.model.model_name || this.model.constructor?.name || 'unknown'; - // Wait for all embeddings to complete - await Promise.all(embeddingPromises); + const embeddings = await embedWithProgress( + textsToEmbed, + async (text) => await this.model.embed(text), + this.cacheKey, + { + cacheKey: this.cacheKey, + modelName: modelName, + getTextFn: (text) => text + } + ); + + for (const [text, embedding] of embeddings) { + this.embeddings[text] = embedding; + } } catch (err) { console.warn('Error with embedding model, using word-overlap instead.'); this.model = null; diff --git a/src/utils/rate_limiter.js b/src/utils/rate_limiter.js new file mode 100644 index 000000000..f9bf5a3c0 --- /dev/null +++ b/src/utils/rate_limiter.js @@ -0,0 +1,105 @@ +/** + * Utility for rate-limited operations with exponential backoff retry + */ + +import { getEmbeddingsWithCache } from './embedding_cache.js'; + +/** + * Execute an async function with exponential backoff retry on rate limit errors + * @param {Function} fn - Async function to execute + * @param {Object} options - Options + * @param {number} options.maxRetries - Maximum number of retries (default: 5) + * @param {number} options.initialDelay - Initial delay in ms (default: 1000) + * @param {number} options.maxDelay - Maximum delay in ms (default: 60000) + * @returns {Promise} Result of the function + */ +export async function withRetry(fn, options = {}) { + const { maxRetries = 5, initialDelay = 1000, maxDelay = 60000 } = options; + let lastError; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (err) { + lastError = err; + const errMsg = err.message || String(err); + + // Check if it's a rate limit error + const isRateLimit = errMsg.includes('429') || + errMsg.includes('rate limit') || + errMsg.includes('Too Many Requests') || + errMsg.includes('throttled'); + + if (!isRateLimit || attempt === maxRetries) { + throw err; + } + + // Parse retry_after from error if available, otherwise use exponential backoff + let delay = initialDelay * Math.pow(2, attempt); + const retryAfterMatch = errMsg.match(/retry.after[^\d]*(\d+)/i); + if (retryAfterMatch) { + delay = parseInt(retryAfterMatch[1]) * 1000 + 1000; // Add 1s buffer + } + delay = Math.min(delay, maxDelay); + + console.log(`Rate limited, retrying in ${(delay/1000).toFixed(1)}s (attempt ${attempt + 1}/${maxRetries})...`); + await new Promise(resolve => setTimeout(resolve, delay)); + } + } + throw lastError; +} + +/** + * Process items with embedding, showing progress, handling rate limits, and caching + * @param {Array} items - Items to process + * @param {Function} embedFn - Async function to embed an item: (item, index) => embedding + * @param {string} label - Label for progress display (e.g., "examples", "skills") + * @param {Object} options - Options including retry options and cache settings + * @param {string} options.cacheKey - Cache key for persistent storage + * @param {string} options.modelName - Model name for cache invalidation + * @param {Function} options.getTextFn - Function to extract text from item for caching + * @returns {Promise} Map of item -> embedding + */ +export async function embedWithProgress(items, embedFn, label = 'items', options = {}) { + const { cacheKey, modelName, getTextFn } = options; + const total = items.length; + + if (total === 0) return new Map(); + + // If caching is enabled, use the cache system + if (cacheKey && modelName && getTextFn) { + const embedWithRetry = async (text) => { + return await withRetry(() => embedFn(text), options); + }; + + const results = await getEmbeddingsWithCache( + items, + getTextFn, + embedWithRetry, + cacheKey, + modelName, + null // No per-item progress to avoid spam + ); + + return results; + } + + // Fallback to non-cached embedding + const results = new Map(); + console.log(`${label}: Embedding ${total} items...`); + + for (let i = 0; i < total; i++) { + const item = items[i]; + + try { + const embedding = await withRetry(() => embedFn(item, i), options); + results.set(item, embedding); + } catch (err) { + console.error(`${label}: Failed to embed item ${i + 1}: ${err.message}`); + throw err; + } + } + + console.log(`${label}: Done (${total} embedded)`); + return results; +}