diff --git a/scripts/ingest.ts b/scripts/ingest.ts index 053a8f8..a8ef2b3 100644 --- a/scripts/ingest.ts +++ b/scripts/ingest.ts @@ -18,18 +18,30 @@ export type EnrichedChunk = { } export type EmbeddedChunk = EnrichedChunk & { embedding: number[] } +/** + * Orchestrates the full ingestion pipeline: loads vault files, splits them + * into chunks, enriches with metadata, embeds with Cohere, and stores in Supabase. + */ const runIngestionPipeline = async () => { console.log("Starting ingestion pipeline ...") const loadSpinner = ora("Loading vault...").start() - const docs = loadVaultFiles("./vault/") - const chunks = splitMarkdownDocs(docs) - const enrichedChunks = enrichChunksWithMetadata(chunks) - loadSpinner.succeed( - `Loaded ${docs.length} files → ${enrichedChunks.length} chunks` - ) + let enrichedChunks: EnrichedChunk[] + try { + const docs = loadVaultFiles("./vault/") + const chunks = splitMarkdownDocs(docs) + enrichedChunks = enrichChunksWithMetadata(chunks) + loadSpinner.succeed( + `Loaded ${docs.length} files → ${enrichedChunks.length} chunks` + ) + console.log("🧂 Enriched chunks with metadata") + } catch (err) { + loadSpinner.fail(err instanceof Error ? err.message : String(err)) + throw err + } + const embeddedChunks = await embedChunks(enrichedChunks) - storeChunks(embeddedChunks) + await storeChunks(embeddedChunks) } /** @@ -43,7 +55,9 @@ const runIngestionPipeline = async () => { export const loadVaultFiles = (dir: string) => { const results: Document[] = [] - for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const entries = fs.readdirSync(dir, { withFileTypes: true }) + + for (const entry of entries) { const fullPath = path.join(dir, entry.name) if (entry.isDirectory()) { @@ -57,6 +71,14 @@ export const loadVaultFiles = (dir: string) => { return results } +/** + * Splits an array of documents into smaller chunks by markdown heading boundaries. + * Each heading (H1–H3) starts a new chunk; oversized chunks are further split + * by {@link splitLargeChunks}. Empty chunks are filtered out. + * + * @param documents - Raw documents loaded from the vault. + * @returns A flat array of `{ source, content }` chunks ready for enrichment. + */ export const splitMarkdownDocs = (documents: Document[]) => { return documents.flatMap((doc) => { const { content, source } = doc @@ -68,6 +90,14 @@ export const splitMarkdownDocs = (documents: Document[]) => { }) } +/** + * Splits a single chunk into fixed-size slices if it exceeds {@link MAX_CHUNK_SIZE}. + * Slices are cut on character boundaries without regard for word boundaries. + * + * @param source - Original file path, propagated to each slice. + * @param content - Text content to split. + * @returns A single-element array if the content fits, otherwise an array of slices. + */ const splitLargeChunks = (source: string, content: string) => { if (content.length <= MAX_CHUNK_SIZE) return [{ source, content }] @@ -118,7 +148,6 @@ const extractTitleFromPath = (filepath: string) => { export const enrichChunksWithMetadata = ( chunks: Document[] ): EnrichedChunk[] => { - console.log("🧂 Enriching chunks with metadata") return chunks.map((chunk) => { const title = extractTitleFromPath(chunk.source) return { @@ -155,31 +184,40 @@ export const embedChunks = async ( ) bar.start(batches.length, 0) - const allEmbeddings = await batches.reduce( - async (accPromise, batch, batchIndex) => { - const acc = await accPromise - const { embeddings } = await embedMany({ - model, - values: batch.map((c) => c.content), - }) - bar.increment() - // Cohere trial tier allows ~100k tokens/minute; pause between batches to avoid rate limit. - // Skip the delay after the last batch so ingestion finishes immediately. - const isLastBatch = batchIndex === batches.length - 1 - if (!isLastBatch) { - await new Promise((r) => setTimeout(r, 15_000)) - } - return [...acc, ...embeddings] - }, - Promise.resolve([] as number[][]) - ) - - bar.stop() - - console.log(`✅ Embedded ${allEmbeddings.length} chunks`) - return chunks.map((chunk, i) => ({ ...chunk, embedding: allEmbeddings[i] })) + try { + const allEmbeddings = await batches.reduce( + async (accPromise, batch, batchIndex) => { + const acc = await accPromise + const { embeddings } = await embedMany({ + model, + values: batch.map((c) => c.content), + }) + bar.increment() + // Cohere trial tier allows ~100k tokens/minute; pause between batches to avoid rate limit. + // Skip the delay after the last batch so ingestion finishes immediately. + const isLastBatch = batchIndex === batches.length - 1 + if (!isLastBatch) { + await new Promise((r) => setTimeout(r, 15_000)) + } + return [...acc, ...embeddings] + }, + Promise.resolve([] as number[][]) + ) + + console.log(`✅ Embedded ${allEmbeddings.length} chunks`) + return chunks.map((chunk, i) => ({ ...chunk, embedding: allEmbeddings[i] })) + } finally { + bar.stop() + } } +/** + * Persists embedded chunks to Supabase, replacing any previously stored data. + * Clears the table first to prevent duplicate entries, then bulk-inserts all chunks. + * + * @param chunks - Embedded chunks to store. + * @throws If the delete or insert Supabase operation fails. + */ const storeChunks = async (chunks: EmbeddedChunk[]) => { // Delete existing documents to prevent duplicate entries // PostgREST requires a filter clause on DELETE; `.neq("id", 0)` is the conventional workaround to delete all rows.