Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions src/agent/library/skill_library.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { cosineSimilarity } from '../../utils/math.js';
import { getSkillDocs } from './index.js';
import { wordOverlapScore } from '../../utils/text.js';
import { embedWithProgress } from '../../utils/rate_limiter.js';

export class SkillLibrary {
constructor(agent,embedding_model) {
Expand All @@ -15,13 +16,27 @@ export class SkillLibrary {
this.skill_docs = skillDocs;
if (this.embedding_model) {
try {
const embeddingPromises = skillDocs.map((doc) => {
return (async () => {
let func_name_desc = doc.split('\n').slice(0, 2).join('');
this.skill_docs_embeddings[doc] = await this.embedding_model.embed(func_name_desc);
})();
});
await Promise.all(embeddingPromises);
const docsToEmbed = skillDocs.map(doc => ({
doc,
text: doc.split('\n').slice(0, 2).join('')
}));

const modelName = this.embedding_model.model_name || this.embedding_model.constructor?.name || 'unknown';

const embeddings = await embedWithProgress(
docsToEmbed,
async (text) => await this.embedding_model.embed(text),
'skills',
{
cacheKey: 'skills',
modelName: modelName,
getTextFn: (item) => item.text
}
);

for (const [item, embedding] of embeddings) {
this.skill_docs_embeddings[item.doc] = embedding;
}
} catch (error) {
console.warn('Error with embedding model, using word-overlap instead.');
this.embedding_model = null;
Expand Down
125 changes: 125 additions & 0 deletions src/utils/embedding_cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Persistent cache for embeddings to avoid re-computing on restart
*/

import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'fs';
import { createHash } from 'crypto';
import path from 'path';

const CACHE_DIR = './bots/.cache';
const CACHE_VERSION = 1; // Bump this if cache format changes

/**
* Get a hash of the content for cache keying
*/
function hashContent(content) {
return createHash('md5').update(content).digest('hex');
}

/**
* Load embeddings from cache
* @param {string} cacheKey - Unique key for this cache (e.g., 'examples', 'skills')
* @param {string} modelName - Model name to invalidate cache if model changes
* @returns {Object|null} Cached embeddings or null if not found/invalid
*/
export function loadEmbeddingCache(cacheKey, modelName) {
const cachePath = path.join(CACHE_DIR, `${cacheKey}_embeddings.json`);

try {
if (!existsSync(cachePath)) {
return null;
}

const cache = JSON.parse(readFileSync(cachePath, 'utf8'));

// Validate cache version and model
if (cache.version !== CACHE_VERSION || cache.model !== modelName) {
console.log(`Embedding cache for ${cacheKey} invalidated (model or version changed)`);
return null;
}

console.log(`Loaded ${Object.keys(cache.embeddings).length} cached embeddings for ${cacheKey}`);
return cache.embeddings;
} catch (err) {
console.warn(`Failed to load embedding cache for ${cacheKey}:`, err.message);
return null;
}
}

/**
* Save embeddings to cache
* @param {string} cacheKey - Unique key for this cache
* @param {string} modelName - Model name for cache invalidation
* @param {Object} embeddings - Map of text -> embedding
*/
export function saveEmbeddingCache(cacheKey, modelName, embeddings) {
const cachePath = path.join(CACHE_DIR, `${cacheKey}_embeddings.json`);

try {
mkdirSync(CACHE_DIR, { recursive: true });

const cache = {
version: CACHE_VERSION,
model: modelName,
timestamp: new Date().toISOString(),
embeddings: embeddings
};

writeFileSync(cachePath, JSON.stringify(cache), 'utf8');
console.log(`Saved ${Object.keys(embeddings).length} embeddings to cache for ${cacheKey}`);
} catch (err) {
console.warn(`Failed to save embedding cache for ${cacheKey}:`, err.message);
}
}

/**
* Get embeddings with caching support
* @param {Array} items - Items to embed
* @param {Function} getTextFn - Function to extract text from item: (item) => string
* @param {Function} embedFn - Async function to embed text: (text) => embedding
* @param {string} cacheKey - Cache key for this set of embeddings
* @param {string} modelName - Model name for cache invalidation
* @param {Function} progressFn - Optional progress callback: (current, total, item) => void
* @returns {Promise<Map>} Map of item -> embedding
*/
export async function getEmbeddingsWithCache(items, getTextFn, embedFn, cacheKey, modelName, progressFn = null) {
const results = new Map();
const cachedEmbeddings = loadEmbeddingCache(cacheKey, modelName) || {};
const toEmbed = [];

// Check what's already cached
for (const item of items) {
const text = getTextFn(item);
const hash = hashContent(text);

if (cachedEmbeddings[hash]) {
results.set(item, cachedEmbeddings[hash]);
} else {
toEmbed.push({ item, text, hash });
}
}

if (toEmbed.length === 0) {
console.log(`${cacheKey}: All ${items.length} embeddings loaded from cache`);
return results;
}

console.log(`${cacheKey}: Embedding ${toEmbed.length} items (${items.length - toEmbed.length} cached)...`);

// Embed missing items
const newEmbeddings = {};
for (let i = 0; i < toEmbed.length; i++) {
const { item, text, hash } = toEmbed[i];

const embedding = await embedFn(text);
results.set(item, embedding);
newEmbeddings[hash] = embedding;
cachedEmbeddings[hash] = embedding;
}

// Save updated cache
saveEmbeddingCache(cacheKey, modelName, cachedEmbeddings);
console.log(`${cacheKey}: Done (${toEmbed.length} embedded, ${results.size} total)`);

return results;
}
30 changes: 19 additions & 11 deletions src/utils/examples.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { cosineSimilarity } from './math.js';
import { stringifyTurns, wordOverlapScore } from './text.js';
import { embedWithProgress } from './rate_limiter.js';

export class Examples {
constructor(model, select_num=2) {
constructor(model, select_num=2, cacheKey='examples') {
this.examples = [];
this.model = model;
this.select_num = select_num;
this.embeddings = {};
this.cacheKey = cacheKey;
}

turnsToText(turns) {
Expand All @@ -26,17 +28,23 @@ export class Examples {
return;

try {
// Create array of promises first
const embeddingPromises = examples.map(example => {
const turn_text = this.turnsToText(example);
return this.model.embed(turn_text)
.then(embedding => {
this.embeddings[turn_text] = embedding;
});
});
const textsToEmbed = examples.map(example => this.turnsToText(example));
const modelName = this.model.model_name || this.model.constructor?.name || 'unknown';

// Wait for all embeddings to complete
await Promise.all(embeddingPromises);
const embeddings = await embedWithProgress(
textsToEmbed,
async (text) => await this.model.embed(text),
this.cacheKey,
{
cacheKey: this.cacheKey,
modelName: modelName,
getTextFn: (text) => text
}
);

for (const [text, embedding] of embeddings) {
this.embeddings[text] = embedding;
}
} catch (err) {
console.warn('Error with embedding model, using word-overlap instead.');
this.model = null;
Expand Down
105 changes: 105 additions & 0 deletions src/utils/rate_limiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Utility for rate-limited operations with exponential backoff retry
*/

import { getEmbeddingsWithCache } from './embedding_cache.js';

/**
* Execute an async function with exponential backoff retry on rate limit errors
* @param {Function} fn - Async function to execute
* @param {Object} options - Options
* @param {number} options.maxRetries - Maximum number of retries (default: 5)
* @param {number} options.initialDelay - Initial delay in ms (default: 1000)
* @param {number} options.maxDelay - Maximum delay in ms (default: 60000)
* @returns {Promise} Result of the function
*/
export async function withRetry(fn, options = {}) {
const { maxRetries = 5, initialDelay = 1000, maxDelay = 60000 } = options;
let lastError;

for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
const errMsg = err.message || String(err);

// Check if it's a rate limit error
const isRateLimit = errMsg.includes('429') ||
errMsg.includes('rate limit') ||
errMsg.includes('Too Many Requests') ||
errMsg.includes('throttled');

if (!isRateLimit || attempt === maxRetries) {
throw err;
}

// Parse retry_after from error if available, otherwise use exponential backoff
let delay = initialDelay * Math.pow(2, attempt);
const retryAfterMatch = errMsg.match(/retry.after[^\d]*(\d+)/i);
if (retryAfterMatch) {
delay = parseInt(retryAfterMatch[1]) * 1000 + 1000; // Add 1s buffer
}
delay = Math.min(delay, maxDelay);

console.log(`Rate limited, retrying in ${(delay/1000).toFixed(1)}s (attempt ${attempt + 1}/${maxRetries})...`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError;
}

/**
* Process items with embedding, showing progress, handling rate limits, and caching
* @param {Array} items - Items to process
* @param {Function} embedFn - Async function to embed an item: (item, index) => embedding
* @param {string} label - Label for progress display (e.g., "examples", "skills")
* @param {Object} options - Options including retry options and cache settings
* @param {string} options.cacheKey - Cache key for persistent storage
* @param {string} options.modelName - Model name for cache invalidation
* @param {Function} options.getTextFn - Function to extract text from item for caching
* @returns {Promise<Map>} Map of item -> embedding
*/
export async function embedWithProgress(items, embedFn, label = 'items', options = {}) {
const { cacheKey, modelName, getTextFn } = options;
const total = items.length;

if (total === 0) return new Map();

// If caching is enabled, use the cache system
if (cacheKey && modelName && getTextFn) {
const embedWithRetry = async (text) => {
return await withRetry(() => embedFn(text), options);
};

const results = await getEmbeddingsWithCache(
items,
getTextFn,
embedWithRetry,
cacheKey,
modelName,
null // No per-item progress to avoid spam
);

return results;
}

// Fallback to non-cached embedding
const results = new Map();
console.log(`${label}: Embedding ${total} items...`);

for (let i = 0; i < total; i++) {
const item = items[i];

try {
const embedding = await withRetry(() => embedFn(item, i), options);
results.set(item, embedding);
} catch (err) {
console.error(`${label}: Failed to embed item ${i + 1}: ${err.message}`);
throw err;
}
}

console.log(`${label}: Done (${total} embedded)`);
return results;
}