Skip to content

Commit

Permalink
docs: minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sroussey committed Jan 18, 2025
1 parent f0de635 commit 47710a9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 8 deletions.
10 changes: 7 additions & 3 deletions docs/developers/01_getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ builder
})
.rename("text", "message")
.DebugLog();
builder.run();
await builder.run();

// Export the graph
const graphJson = builder.toJSON();
console.log(graphJson);
```

## Using Task and TaskGraph directly (& a config helper)

This is equivalent to creating the graph directly:
This is equivalent to creating the graph directly, with additional features like caching and reactive execution:

```ts
import {
Expand Down Expand Up @@ -118,7 +122,7 @@ graph.addDataFlow(
);

const runner = new TaskGraphRunner(graph);
runner.run();
await runner.run();
```

## Using Task and TaskGraph directly (no config helper)
Expand Down
38 changes: 33 additions & 5 deletions docs/developers/02_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,25 @@ style Task type:abstract,stroke-dasharray: 5 5
classDiagram
class TaskGraphRunner{
TaskGraph _graph
run() TaskOutput
Map<number, Task[]> layers
Map<unknown, TaskInput> provenanceInput
TaskGraph dag
TaskOutputRepository repository
assignLayers(Task[] sortedNodes)
runGraph(TaskInput parentProvenance) TaskOutput
runGraphReactive() TaskOutput
}
```

The TaskGraphRunner is responsible for executing tasks in a task graph. Key features include:

- **Layer-based Execution**: Tasks are organized into layers based on dependencies, allowing parallel execution of independent tasks
- **Provenance Tracking**: Tracks the lineage and input data that led to each task's output
- **Caching Support**: Can use a TaskOutputRepository to cache task outputs and avoid re-running tasks
- **Reactive Mode**: Supports reactive execution where tasks can respond to input changes without full re-execution
- **Smart Task Scheduling**: Automatically determines task execution order based on dependencies

## TaskGraphBuilder

```mermaid
Expand All @@ -279,9 +292,16 @@ classDiagram
class TaskGraphBuilder{
-TaskGraphRunner _runner
-TaskGraph _graph
-TaskOutputRepository _repository
-DataFlow[] _dataFlows
+EventEmitter events
run() TaskOutput
+rename(string outputName, string inputName)
+pop()
pop()
parallel(...builders) TaskGraphBuilder
rename(string source, string target, number index) TaskGraphBuilder
reset() TaskGraphBuilder
toJSON() TaskGraphJson
toDependencyJSON() JsonTaskItem[]
+DownloadModel(model)
+TextEmbedding(model text)
+TextGeneration(model prompt)
Expand All @@ -294,9 +314,17 @@ classDiagram
+JavaScript(code input)
}
```

The TaskGraphBuilder provides a fluent interface for constructing task graphs. Key features include:

- **Event System**: Emits events for graph changes and execution status
- **Parallel Execution**: Can run multiple task graphs in parallel
- **Repository Integration**: Optional integration with TaskOutputRepository for caching
- **JSON Support**: Can import/export graphs as JSON
- **Smart Task Connection**: Automatically connects task outputs to inputs based on naming
- **Task Management**: Methods for adding, removing, and modifying tasks in the graph

# Warnings / ToDo

-**Items marked with \* are not yet implemented.** These are good items for a first time contributor to work on. ;)
Expand Down
36 changes: 36 additions & 0 deletions packages/storage/src/browser/inmemory/InMemoryJobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ import { nanoid } from "nanoid";
import { Job, JobStatus, JobQueue, ILimiter } from "ellmers-core";
import { makeFingerprint } from "../../util/Misc";

/**
* In-memory implementation of a job queue that manages asynchronous tasks.
* Supports job scheduling, status tracking, and result caching.
*/
export class InMemoryJobQueue<Input, Output> extends JobQueue<Input, Output> {
/**
* Creates a new in-memory job queue
* @param queue - Name of the queue
* @param limiter - Rate limiter to control job execution
* @param waitDurationInMilliseconds - Polling interval for checking new jobs
* @param jobClass - Optional custom Job class implementation
*/
constructor(
queue: string,
limiter: ILimiter,
Expand All @@ -20,15 +31,24 @@ export class InMemoryJobQueue<Input, Output> extends JobQueue<Input, Output> {
this.jobQueue = [];
}

/** Internal array storing all jobs */
private jobQueue: Job<Input, Output>[];

/**
* Returns a filtered and sorted list of pending jobs that are ready to run
* Sorts by creation time to maintain FIFO order
*/
private reorderedQueue() {
return this.jobQueue
.filter((job) => job.status === JobStatus.PENDING)
.filter((job) => job.runAfter.getTime() <= Date.now())
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
}

/**
* Adds a new job to the queue
* Generates an ID and fingerprint if not provided
*/
public async add(job: Job<Input, Output>) {
job.id = job.id ?? nanoid();
job.queueName = this.queue;
Expand All @@ -50,6 +70,10 @@ export class InMemoryJobQueue<Input, Output> extends JobQueue<Input, Output> {
return this.jobQueue.filter((job) => job.status === JobStatus.PROCESSING);
}

/**
* Retrieves the next available job that is ready to be processed
* Updates the job status to PROCESSING before returning
*/
public async next() {
const top = this.reorderedQueue();

Expand All @@ -64,6 +88,13 @@ export class InMemoryJobQueue<Input, Output> extends JobQueue<Input, Output> {
return this.jobQueue.filter((j) => j.status === status).length;
}

/**
* Marks a job as complete with its output or error
* Handles retries for failed jobs and triggers completion callbacks
* @param id - ID of the job to complete
* @param output - Result of the job execution
* @param error - Optional error message if job failed
*/
public async complete(id: unknown, output: any, error?: string) {
const job = this.jobQueue.find((j) => j.id === id);
if (!job) {
Expand Down Expand Up @@ -91,6 +122,11 @@ export class InMemoryJobQueue<Input, Output> extends JobQueue<Input, Output> {
this.jobQueue = [];
}

/**
* Looks up cached output for a given task type and input
* Uses input fingerprinting for efficient matching
* @returns The cached output or null if not found
*/
public async outputForInput(taskType: string, input: Input) {
const fingerprint = await makeFingerprint(input);
return (
Expand Down

0 comments on commit 47710a9

Please sign in to comment.