Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 70 additions & 32 deletions scripts/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ export type EnrichedChunk = {
}
export type EmbeddedChunk = EnrichedChunk & { embedding: number[] }

/**
* Orchestrates the full ingestion pipeline: loads vault files, splits them
* into chunks, enriches with metadata, embeds with Cohere, and stores in Supabase.
*/
const runIngestionPipeline = async () => {
console.log("Starting ingestion pipeline ...")

const loadSpinner = ora("Loading vault...").start()
const docs = loadVaultFiles("./vault/")
const chunks = splitMarkdownDocs(docs)
const enrichedChunks = enrichChunksWithMetadata(chunks)
loadSpinner.succeed(
`Loaded ${docs.length} files → ${enrichedChunks.length} chunks`
)
let enrichedChunks: EnrichedChunk[]
try {
const docs = loadVaultFiles("./vault/")
const chunks = splitMarkdownDocs(docs)
enrichedChunks = enrichChunksWithMetadata(chunks)
loadSpinner.succeed(
`Loaded ${docs.length} files → ${enrichedChunks.length} chunks`
)
console.log("🧂 Enriched chunks with metadata")
} catch (err) {
loadSpinner.fail(err instanceof Error ? err.message : String(err))
throw err
}

const embeddedChunks = await embedChunks(enrichedChunks)
storeChunks(embeddedChunks)
await storeChunks(embeddedChunks)
}

/**
Expand All @@ -43,7 +55,9 @@ const runIngestionPipeline = async () => {
export const loadVaultFiles = (dir: string) => {
const results: Document[] = []

for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
const entries = fs.readdirSync(dir, { withFileTypes: true })

for (const entry of entries) {
const fullPath = path.join(dir, entry.name)

if (entry.isDirectory()) {
Expand All @@ -57,6 +71,14 @@ export const loadVaultFiles = (dir: string) => {
return results
}

/**
* Splits an array of documents into smaller chunks by markdown heading boundaries.
* Each heading (H1–H3) starts a new chunk; oversized chunks are further split
* by {@link splitLargeChunks}. Empty chunks are filtered out.
*
* @param documents - Raw documents loaded from the vault.
* @returns A flat array of `{ source, content }` chunks ready for enrichment.
*/
export const splitMarkdownDocs = (documents: Document[]) => {
return documents.flatMap((doc) => {
const { content, source } = doc
Expand All @@ -68,6 +90,14 @@ export const splitMarkdownDocs = (documents: Document[]) => {
})
}

/**
* Splits a single chunk into fixed-size slices if it exceeds {@link MAX_CHUNK_SIZE}.
* Slices are cut on character boundaries without regard for word boundaries.
*
* @param source - Original file path, propagated to each slice.
* @param content - Text content to split.
* @returns A single-element array if the content fits, otherwise an array of slices.
*/
const splitLargeChunks = (source: string, content: string) => {
if (content.length <= MAX_CHUNK_SIZE) return [{ source, content }]

Expand Down Expand Up @@ -118,7 +148,6 @@ const extractTitleFromPath = (filepath: string) => {
export const enrichChunksWithMetadata = (
chunks: Document[]
): EnrichedChunk[] => {
console.log("🧂 Enriching chunks with metadata")
return chunks.map((chunk) => {
const title = extractTitleFromPath(chunk.source)
return {
Expand Down Expand Up @@ -155,31 +184,40 @@ export const embedChunks = async (
)
bar.start(batches.length, 0)

const allEmbeddings = await batches.reduce(
async (accPromise, batch, batchIndex) => {
const acc = await accPromise
const { embeddings } = await embedMany({
model,
values: batch.map((c) => c.content),
})
bar.increment()
// Cohere trial tier allows ~100k tokens/minute; pause between batches to avoid rate limit.
// Skip the delay after the last batch so ingestion finishes immediately.
const isLastBatch = batchIndex === batches.length - 1
if (!isLastBatch) {
await new Promise((r) => setTimeout(r, 15_000))
}
return [...acc, ...embeddings]
},
Promise.resolve([] as number[][])
)

bar.stop()

console.log(`✅ Embedded ${allEmbeddings.length} chunks`)
return chunks.map((chunk, i) => ({ ...chunk, embedding: allEmbeddings[i] }))
try {
const allEmbeddings = await batches.reduce(
async (accPromise, batch, batchIndex) => {
const acc = await accPromise
const { embeddings } = await embedMany({
model,
values: batch.map((c) => c.content),
})
bar.increment()
// Cohere trial tier allows ~100k tokens/minute; pause between batches to avoid rate limit.
// Skip the delay after the last batch so ingestion finishes immediately.
const isLastBatch = batchIndex === batches.length - 1
if (!isLastBatch) {
await new Promise((r) => setTimeout(r, 15_000))
}
return [...acc, ...embeddings]
},
Promise.resolve([] as number[][])
)

console.log(`✅ Embedded ${allEmbeddings.length} chunks`)
return chunks.map((chunk, i) => ({ ...chunk, embedding: allEmbeddings[i] }))
} finally {
bar.stop()
}
}

/**
* Persists embedded chunks to Supabase, replacing any previously stored data.
* Clears the table first to prevent duplicate entries, then bulk-inserts all chunks.
*
* @param chunks - Embedded chunks to store.
* @throws If the delete or insert Supabase operation fails.
*/
const storeChunks = async (chunks: EmbeddedChunk[]) => {
// Delete existing documents to prevent duplicate entries
// PostgREST requires a filter clause on DELETE; `.neq("id", 0)` is the conventional workaround to delete all rows.
Expand Down