Skip to content

Latest commit

 

History

History
235 lines (172 loc) · 5.96 KB

README.md

File metadata and controls

235 lines (172 loc) · 5.96 KB

@ellmers/job-queue

A simple job queue system for managing and processing asynchronous tasks with rate limiting, and cross-platform persistence.

Features

  • Multiple storage backends (In-Memory, IndexedDB, SQLite, PostgreSQL)
  • Rate limiting strategies (Concurrency, Delay, Composite)
  • Job prioritization and retry logic
  • Progress tracking and event listeners
  • Cross-platform support (Browser, Node.js, Bun)
  • TypeScript-first implementation

Installation

bun add @ellmers/job-queue

Basic Usage

Creating a Queue

import { Job, InMemoryJobQueue } from "@ellmers/job-queue";

// Define your job type
interface MyJobInput {
  data: string;
}
interface MyJobOutput {
  result: number;
}

class MyJob extends Job<MyJobInput, MyJobOutput> {
  async execute(input: MyJobInput): Promise<MyJobOutput> {
    return { result: input.data.length };
  }
}

// Create queue with in-memory storage
const queue = new InMemoryJobQueue<MyJobInput, MyJobOutput, MyJob>("my-queue", MyJob, {
  limiter: new ConcurrencyLimiter(5), // 5 concurrent jobs
  deleteAfterCompletionMs: 60_000, // clean up completed jobs after 1 minute
  deleteAfterFailureMs: 86_400_000, // clean up failed jobs after 1 day
});

await queue.start();

Queue Lifecycle

await queue.start();
await queue.stop();
await queue.reset();

Adding Jobs

const job = new Job({
  input: { data: "process-me" },
  maxRetries: 3,
});

const jobId = await queue.add(job);

Job Events

queue.on("job_start", (queueName, jobId) => {
  console.log(`Job ${jobId} started in ${queueName}`);
});

queue.on("job_progress", (queueName, jobId, progress) => {
  console.log(`Job ${jobId} progress: ${progress}%`);
});

Rate Limiting Strategies

Composite Limiter

import { CompositeLimiter, ConcurrencyLimiter, DelayLimiter } from "@ellmers/job-queue";

const limiter = new CompositeLimiter([
  new ConcurrencyLimiter(5), // Max 5 concurrent jobs
  new DelayLimiter(100), // Minimum 100ms between job starts
]);

Rate Limiter

  • InMemoryRateLimiter - Rate limiter using in-memory storage
  • SqliteRateLimiter - Rate limiter using SQLite storage
  • PostgresRateLimiter - Rate limiter using PostgreSQL storage

Concurrency Limiter

import { ConcurrencyLimiter } from "@ellmers/job-queue";

const limiter = new ConcurrencyLimiter(15); // Max 15 jobs at a time

Delay Limiter

import { DelayLimiter } from "@ellmers/job-queue";

const limiter = new DelayLimiter(100); // Minimum 100ms between job starts

Storage Backends

InMemory (Anywhere)

import { Job, InMemoryJobQueue, InMemoryRateLimiter } from "@ellmers/job-queue";

const queue = new InMemoryJobQueue<MyJobInput, MyJobOutput>("browser-queue", Job, {
  limiter: new InMemoryRateLimiter(10, 1), // 10 jobs/second
});

// equivalent example on how to use the storage class directly

import { Job, JobQueue, InMemoryRateLimiter } from "@ellmers/job-queue";
import { InMemoryQueueStorage } from "@ellmers/storage";

const queue = new JobQueue<MyJobInput, MyJobOutput>("browser-queue", Job, {
  storage: new InMemoryQueueStorage("browser-queue"),
  limiter: new InMemoryRateLimiter(10, 1), // 10 jobs/second
});

IndexedDB (Browser)

import { IndexedDbJobQueue, IndexedDbRateLimiter } from "@ellmers/job-queue";

const queue = new IndexedDbJobQueue<MyJobInput, MyJobOutput>("browser-queue", Job, {
  limiter: new InMemoryRateLimiter(10, 1), // 10 jobs/second
});

Sqlite (Node.js/Bun)

import { SqliteJobQueue, SqliteRateLimiter } from "@ellmers/job-queue";

const queue = new SqliteJobQueue(db, "sqlite-queue", Job, {
  limiter: new SqliteRateLimiter(10, 1), // 10 jobs/second
});

PostgreSQL (Node.js/Bun)

import { PostgresJobQueue, PostgresRateLimiter } from "@ellmers/job-queue";

const queue = new PostgresJobQueue(postgresPool, "pg-queue", Job, {
  limiter: new PostgresRateLimiter(10, 1), // 10 jobs/second
});

API Overview

Core Classes

  • Job: Base job class with progress tracking and retry logic
  • JobQueue: Main queue management class
  • IJobQueue: Interface for queue implementations

Rate Limiters

  • ConcurrencyLimiter: Limits concurrent job executions
  • DelayLimiter: Enforces minimum delay between jobs
  • CompositeLimiter: Combines multiple limiters
  • NullLimiter: No-op limiter for development
  • InMemoryRateLimiter: Rate limiter using in-memory storage
  • IndexedDbRateLimiter: Rate limiter using IndexedDB storage
  • SqliteRateLimiter: Rate limiter using SQLite storage
  • PostgresRateLimiter: Rate limiter using PostgreSQL storage

Storage Implementations

  • InMemoryJobQueue - Volatile memory storage using InMemoryQueueStorage
  • IndexedDbJobQueue - Browser persistent storage using IndexedDbQueueStorage
  • SqliteJobQueue - Local SQLite storage using SqliteQueueStorage
  • PostgresJobQueue - PostgreSQL persistent storage using PostgresQueueStorage

Testing

Run all tests:

bun test

License

Apache 2.0 - See LICENSE for details