Skip to content

Commit

Permalink
Merge pull request #160 from mtutty/add-observable-reader
Browse files Browse the repository at this point in the history
Add observer/callback feature to SimpleDirectoryReader
  • Loading branch information
yisding authored Oct 30, 2023
2 parents fe9056f + e938a4d commit 72f6271
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# dependencies
node_modules
.pnp
.pnpm-store
.pnp.js

# testing
Expand Down
24 changes: 24 additions & 0 deletions apps/simple/directory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { SimpleDirectoryReader } from "llamaindex";

function callback(
category: string,
name: string,
status: any,
message?: string,
): boolean {
console.log(category, name, status, message);
if (name.endsWith(".pdf")) {
console.log("I DON'T WANT PDF FILES!");
return false;
}
return true;
}

async function main() {
// Load page
const reader = new SimpleDirectoryReader(callback);
const params = { directoryPath: "./data" };
await reader.loadData(params);
}

main().catch(console.error);
79 changes: 72 additions & 7 deletions packages/core/src/readers/SimpleDirectoryReader.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import _ from "lodash";
import { Document } from "../Node";
import { DEFAULT_FS } from "../storage/constants";
import { CompleteFileSystem, walk } from "../storage/FileSystem";
import { BaseReader } from "./base";
import { DEFAULT_FS } from "../storage/constants";
import { PapaCSVReader } from "./CSVReader";
import { DocxReader } from "./DocxReader";
import { HTMLReader } from "./HTMLReader";
import { MarkdownReader } from "./MarkdownReader";
import { PDFReader } from "./PDFReader";
import { BaseReader } from "./base";

type ReaderCallback = (
category: "file" | "directory",
name: string,
status: ReaderStatus,
message?: string,
) => boolean;
enum ReaderStatus {
STARTED = 0,
COMPLETE,
ERROR,
}

/**
* Read a .txt file
Expand All @@ -22,7 +34,7 @@ export class TextFileReader implements BaseReader {
}
}

const FILE_EXT_TO_READER: Record<string, BaseReader> = {
export const FILE_EXT_TO_READER: Record<string, BaseReader> = {
txt: new TextFileReader(),
pdf: new PDFReader(),
csv: new PapaCSVReader(),
Expand All @@ -40,37 +52,90 @@ export type SimpleDirectoryReaderLoadDataProps = {
};

/**
* Read all of the documents in a directory. Currently supports PDF and TXT files.
* Read all of the documents in a directory.
* By default, supports the list of file types
* in the FILE_EXIT_TO_READER map.
*/
export class SimpleDirectoryReader implements BaseReader {
constructor(private observer?: ReaderCallback) {}

async loadData({
directoryPath,
fs = DEFAULT_FS as CompleteFileSystem,
defaultReader = new TextFileReader(),
fileExtToReader = FILE_EXT_TO_READER,
}: SimpleDirectoryReaderLoadDataProps): Promise<Document[]> {
// Observer can decide to skip the directory
if (
!this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED)
) {
return [];
}

let docs: Document[] = [];
for await (const filePath of walk(fs, directoryPath)) {
try {
const fileExt = _.last(filePath.split(".")) || "";

// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
continue;
}

let reader = null;

if (fileExt in fileExtToReader) {
reader = fileExtToReader[fileExt];
} else if (!_.isNil(defaultReader)) {
reader = defaultReader;
} else {
console.warn(`No reader for file extension of ${filePath}`);
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);

// In an error condition, observer's false cancels the whole process.
if (
!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)
) {
return [];
}

continue;
}

const fileDocs = await reader.loadData(filePath, fs);
docs.push(...fileDocs);

// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
console.error(`Error reading file ${filePath}: ${e}`);
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);

// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}
}
}

// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);

return docs;
}

private doObserverCheck(
category: "file" | "directory",
name: string,
status: ReaderStatus,
message?: string,
): boolean {
if (this.observer) {
return this.observer(category, name, status, message);
}
return true;
}
}

0 comments on commit 72f6271

Please sign in to comment.