Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: LlamaParseReader: update supported file types, add support for array of file Paths, add support for directory Paths + example #808

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file removed examples/data/manga.pdf
Binary file not shown.
31 changes: 18 additions & 13 deletions examples/readers/src/llamaparse_2.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
import fs from "fs/promises";
import { LlamaParseReader } from "llamaindex";
import { LlamaParseReader, VectorStoreIndex } from "llamaindex";

async function main() {
// Load PDF using LlamaParse. set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
const reader = new LlamaParseReader({
resultType: "markdown",
language: "en",
numWorkers: 3, //Load files in batches of 2
parsingInstruction:
"The provided document is a manga comic book. Most pages do NOT have title. It does not contain tables. Try to reconstruct the dialogue happening in a cohesive way. Output any math equation in LATEX markdown (between $$)",
"The provided documents are datasheets and Quick-Installation-Guides for Solplanet's Ai-LB series of batteries. They contain tables and graphics. There is also a lot of technical information. The goal is to extract and structure the knowledge in a coherent way",
});
const documents = await reader.loadData("../data/manga.pdf"); // The manga.pdf in the data folder is just a copy of the TOS, due to copyright laws. You have to place your own. I used "The Manga Guide to Calculus" by Hiroyuki Kojima
// Can either accept a single file path an array[] of file paths or a directory path
const documents = await reader.loadData("../data/LlamaParseData");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use all the files in the existing data folder?

Suggested change
const documents = await reader.loadData("../data/LlamaParseData");
const documents = await reader.loadData("../data");

we can add new files, but then we need to take care about the license

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure, could also just split the brk-2022.pdf file into 6 parts and put them into a separate folder to showcase parallel processing if that makes things easier.


// Assuming documents contain an array of pages or sections
const parsedManga = documents.map((page) => page.text).join("\n---\n");
// Flatten the array of arrays of files
const flatdocuments = documents.flat();

// Output the parsed manga to .md file. Will be placed in ../example/readers/
try {
await fs.writeFile("./parsedManga.md", parsedManga);
console.log("Output successfully written to parsedManga.md");
} catch (err) {
console.error("Error writing to file:", err);
}
// Split text and create embeddings. Store them in a VectorStoreIndex
const index = await VectorStoreIndex.fromDocuments(flatdocuments);

// Query the index
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
query: "Which Batteries can be used in parallel connection?",
});

// Output response
console.log(response.toString());
}

main().catch(console.error);
11 changes: 10 additions & 1 deletion packages/core/src/ingestion/IngestionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,16 @@ export class IngestionPipeline {
inputNodes.push(this.documents);
}
if (this.reader) {
inputNodes.push(await this.reader.loadData());
try {
const loadedData = await this.reader.loadData();
if (Array.isArray(loadedData)) {
inputNodes.push(loadedData.flat()); // Ensure flat structure
} else {
inputNodes.push([loadedData]);
}
} catch (error) {
console.error(`Error loading data: ${error}`);
}
}
return inputNodes.flat();
}
Expand Down
139 changes: 119 additions & 20 deletions packages/core/src/readers/LlamaParseReader.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
import { defaultFS, getEnv, type GenericFileSystem } from "@llamaindex/env";
import { defaultFS, getEnv, type CompleteFileSystem } from "@llamaindex/env";
import { filetypemime } from "magic-bytes.js";
import { Document } from "../Node.js";
import type { FileReader, Language, ResultType } from "./type.js";
import { walk } from "../storage/FileSystem.js";
import type { Language, MultiReader, ResultType } from "./type.js";
import { SupportedFileTypes, SupportedMimeTypes } from "./utils.js";

/**
* Represents a reader for parsing files using the LlamaParse API.
* See https://github.com/run-llama/llama_parse
*/
export class LlamaParseReader implements FileReader {
export class LlamaParseReader implements MultiReader {
// The API key for the LlamaParse API.
apiKey: string;
// The base URL of the Llama Parsing API.
baseUrl: string = "https://api.cloud.llamaindex.ai/api/parsing";
// The maximum timeout in seconds to wait for the parsing to finish.
maxTimeout = 2000;
// The interval in seconds to check if the parsing is done.
checkInterval = 1;
// Whether to print the progress of the parsing.
verbose = true;
// The result type for the parser.
resultType: ResultType = "text";
// The number of "workers" to use sending API requests when parsing multiple files. Must be greater than 0 and less than 10.
numWorkers: number = 4;
// The interval in seconds to check if the parsing is done.
checkInterval: number = 1;
// The maximum timeout in seconds to wait for the parsing to finish.
maxTimeout: number = 2000;
// Whether to print the progress of the parsing.
verbose: boolean = true;
// Show progress when parsing multiple files.
showProgress: boolean = true;
// The language of the text to parse.
language: Language = "en";
// The parsing instruction for the parser.
parsingInstruction: string = "";
// Whether or not to ignore and skip errors raised during parsing.
ignoreErrors: boolean = true;

constructor(params: Partial<LlamaParseReader> = {}) {
Object.assign(this, params);
Expand All @@ -33,22 +41,62 @@ export class LlamaParseReader implements FileReader {
"API Key is required for LlamaParseReader. Please pass the apiKey parameter or set the LLAMA_CLOUD_API_KEY environment variable.",
);
}
if (this.numWorkers < 1 || this.numWorkers > 9) {
throw new Error(
"The number of workers must be greater than 0 and less than 10.",
);
}

this.apiKey = params.apiKey;
}

async loadData(file: string, fs?: CompleteFileSystem): Promise<Document[]>;
async loadData(
files: string[],
fs?: CompleteFileSystem,
): Promise<Document[][]>;
async loadData(
directoryPath: string,
fs?: CompleteFileSystem,
): Promise<Document[][]>;

async loadData(
input: string | string[],
fs: CompleteFileSystem = defaultFS,
): Promise<Document[] | Document[][]> {
if (typeof input === "string") {
const stats = await fs.stat(input);
if (stats.isDirectory()) {
return await this.loadDirectory(input, fs);
} else {
return [await this.loadFile(input, fs)];
}
} else {
return await this.loadFiles(input, fs);
}
}

// Load a single file
private async loadFile(
file: string,
fs: GenericFileSystem = defaultFS,
fs: CompleteFileSystem = defaultFS,
): Promise<Document[]> {
if (!file.endsWith(".pdf")) {
throw new Error("Currently, only PDF files are supported.");
// Load data, set the mime type
const data = await fs.readRawFile(file);
const mimeType = await this.getMimeType(data);
if (!SupportedMimeTypes.includes(mimeType)) {
throw new Error(
`File has type ${mimeType} which does not match supported MIME Types. Supported Types include these formats: ${SupportedFileTypes.join(", ")}`,
);
}

const metadata = { file_path: file };

// Load data, set the mime type
const data = await fs.readRawFile(file);
const mimeType = await this.getMimeType(data);
if (this.verbose) {
console.log(`Starting load for file: ${file}`);
}

// Prepare the request body
const body = new FormData();
body.set("file", new Blob([data], { type: mimeType }), file);
body.append("language", this.language);
Expand All @@ -67,7 +115,7 @@ export class LlamaParseReader implements FileReader {
headers,
});
if (!response.ok) {
throw new Error(`Failed to parse the PDF file: ${await response.text()}`);
throw new Error(`Failed to parse the file: ${await response.text()}`);
}
const jsonResponse = await response.json();

Expand All @@ -94,7 +142,7 @@ export class LlamaParseReader implements FileReader {
const end = Date.now();
if (end - start > this.maxTimeout * 1000) {
throw new Error(
`Timeout while parsing the PDF file: ${await response.text()}`,
`Timeout while parsing the file: ${await response.text()}`,
);
}
if (this.verbose && tries % 10 === 0) {
Expand All @@ -114,11 +162,62 @@ export class LlamaParseReader implements FileReader {
}
}

// Load multiple files in parallel
private async loadFiles(
filePaths: string[],
fs: CompleteFileSystem,
): Promise<Document[][]> {
const results: Document[][] = [];
for (let i = 0; i < filePaths.length; i += this.numWorkers) {
const batch = filePaths.slice(i, i + this.numWorkers);
const batchResults = await Promise.all(
batch.map(async (file) => {
const stats = await fs.stat(file);
if (!stats.isDirectory()) {
return await this.loadFile(file, fs);
} else {
console.warn(`Skipping directory: ${file}`);
return [];
}
}),
);

results.push(...batchResults);
if (this.verbose && this.showProgress) {
console.log(
`Processed batch ${i / this.numWorkers + 1}/${Math.ceil(filePaths.length / this.numWorkers)}.`,
);
}
}
if (this.verbose) {
console.log("All files loaded successfully.");
}
return results;
}

// Load all files in a directory
private async loadDirectory(
directoryPath: string,
fs: CompleteFileSystem,
): Promise<Document[][]> {
const filePaths = [];
for await (const filePath of walk(fs, directoryPath)) {
filePaths.push(filePath);
}

return await this.loadFiles(filePaths, fs);
}

// Get the MIME type of a file
private async getMimeType(data: Buffer): Promise<string> {
const mimes = filetypemime(data);
if (!mimes.includes("application/pdf")) {
throw new Error("Currently, only PDF files are supported.");
const validMimes = mimes.find((mime) => SupportedMimeTypes.includes(mime));
if (!validMimes) {
throw new Error(
`File has type ${mimes} which does not match supported MIME Types. Supported Types include these formats: ${SupportedFileTypes.join(", ")}`,
);
}
return "application/pdf";

return validMimes;
}
}
14 changes: 13 additions & 1 deletion packages/core/src/readers/SimpleDirectoryReader.edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@ export class SimpleDirectoryReader implements BaseReader {
continue;
}

const fileDocs = await reader.loadData(filePath, fs);
let fileDocs: Document<Metadata>[] = [];

try {
const loadedData = await reader.loadData(filePath, fs);
if (Array.isArray(loadedData)) {
fileDocs = loadedData.flat(); // Ensure flat structure
} else {
fileDocs = [loadedData];
}
} catch (error) {
console.error(`Error loading data for file ${filePath}: ${error}`);
}

fileDocs.forEach(addMetaData(filePath));

// Observer can still cancel addition of the resulting docs from this file
Expand Down
16 changes: 14 additions & 2 deletions packages/core/src/readers/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { Document } from "../Node.js";
* A reader takes imports data into Document objects.
*/
export interface BaseReader {
loadData(...args: unknown[]): Promise<Document[]>;
loadData(...args: unknown[]): Promise<Document[] | Document[][]>;
}

/**
Expand All @@ -15,7 +15,19 @@ export interface FileReader extends BaseReader {
loadData(filePath: string, fs?: CompleteFileSystem): Promise<Document[]>;
}

// For LlamaParseReader.ts
/**
* A reader takes single and multiple file paths as well as a directory Path and imports data into an array of Document objects.
*/
export interface MultiReader extends BaseReader {
loadData(filePath: string, fs?: CompleteFileSystem): Promise<Document[]>;
loadData(filePaths: string[], fs?: CompleteFileSystem): Promise<Document[][]>;
loadData(
directoryPath: string,
fs?: CompleteFileSystem,
): Promise<Document[][]>;
Comment on lines +22 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support directory input?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think directory input is essential in order to take full advantage of the ability to load and send multiple files at once.

But yeah the smarter way would be to integrate it in Simple Directory Reader but I didn't figure out how to not overload the Parser when adding parallel processing to the Directory Reader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna check it out again and see. I'll probably have some time in 2-3 days.

The Connect Timeout Error seems to be kind of random. Sometimes it works as expected, sometimes it times out, so I'll recheck everything anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the SimpleDirectoryReader already supports loading multiple files from a directory.

I think we have three concerns:

  1. Walking the filesystem (already done by the SimpleDirectoryReader)
  2. Reading a single file (done by the reader)
  3. Loading files in parallel (new concern in this PR)

If we handle this new concern by a single reader (here LlamaParseReader), then we can only use it with this reader.

How about adding numWorkers to SimpleDirectoryReader instead?

@InsightByAI if you like you can split your PR in two, the first one just adding the missing filetypes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marcusschiesser The issue for me with adding numWorkers to SimpleDirectoryReader was that it would load in batches just fine, but it would load the next batch right after finishing the 1st without waiting for the parse results.

I think the options would be to either:

  1. have SimpleDirectoryReader wait for a finish signal from the Parser
  2. have SimpleDirectoryReader pass on a list of all filePaths and let the parallel loading be handled by the Parser

I concentrated only on LlamaParse because I think the requirements for numWorkers in most other readers is quite different, since in LlamaParse the "workers" kind of refers to the max amount of files the LlamaParse API can accept at once, which is limited to 10.

If I have powerful enough hardware, I could probably use a 100 or more "workers" to load a 100 files parallel with PDFReader.
Not sure if a numWorkers that limits one reader to 10 workers and has to wait for results from an API endpoint, while allowing another one to use unlimited workers and only be limited by hardware speed would be easy to implement/work that well?

I'm going to check how python implements this. The Llama-Parse standalone can accept arrays of files but not a directory afaik, so I'll check how that works with an equivalent llama-index python directory Reader.

I'll split the PR later. Thanks for the elaborate review!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you re-add numWorkers to SimpleDirectorReader and add a test case for the problem? Then we can have a look at it

}

// For LlamaParseReader

export type ResultType = "text" | "markdown" | "json";
export type Language =
Expand Down
Loading