Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AI.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ const sendWelcomeEmail = r
- Use `.on(onAnyOf(...))` to listen to several events while keeping inference.
- Hooks can set `.order(priority)`; lower numbers run first. Call `event.stopPropagation()` inside `run` to cancel downstream hooks.
- Wildcard hooks use `.on("*")` and receive every emission except events tagged with `globals.tags.excludeFromGlobalHooks`.
- Use `.parallel(true)` on event definitions to enable batched parallel execution. Listeners with the same order run concurrently; batches run sequentially. Failures in one batch prevent subsequent batches from running; propagation is checked between batches (if stopped up front, nothing runs).

### Middleware

Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,39 @@ const emergencyHandler = r
.build();
```

> **runtime:** "'A really good office messenger.' That’s me in rollerblades. You launch a 'userRegistered' flare and I sprint across the building, high‑fiving hooks and dodging middleware. `stopPropagation` is you sweeping my legs mid‑stride. Rude. Effective. Slightly thrilling."
> **runtime:** "'A really good office messenger.' That's me in rollerblades. You launch a 'userRegistered' flare and I sprint across the building, high-fiving hooks and dodging middleware. `stopPropagation` is you sweeping my legs mid-stride. Rude. Effective. Slightly thrilling."

#### Parallel Event Execution

When an event fan-out needs more throughput, mark it as parallel to run same-priority listeners concurrently while preserving priority boundaries:

```typescript
const parallelEvent = r.event("app.events.parallel").parallel(true).build();

r.hook("app.hooks.first")
.on(parallelEvent)
.order(0)
.run(async (event) => {
await doWork(event.data);
})
.build();

r.hook("app.hooks.second")
.on(parallelEvent)
.order(0)
.run(async () => log.info("Runs alongside first"))
.build();

r.hook("app.hooks.after")
.on(parallelEvent)
.order(1) // Waits for order 0 batch to complete
.run(async () => followUp())
.build();
```

- Listeners sharing the same `order` run together; the next `order` starts after the batch settles.
- If any listener in a batch throws, the emission rejects and later batches are skipped.
- `stopPropagation()` is evaluated between batches. If it is set before the first batch, nothing runs; setting it inside a batch does not cancel peers already executing in that batch.

### Middleware

Expand Down
22 changes: 13 additions & 9 deletions examples/fastify-mikroorm/readmes/runner-AI.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ const internal = event({
});

// Performance: runtime event emission cycle detection
// run(app, { runtimeCycleDetection: true }) // To prevent deadlocks from happening.
// run(app, { runtimeCycleDetection: true }) // To prevent event-driven deadlocks from happening.
```

### Multiple Events per Hook
Expand Down Expand Up @@ -332,7 +332,7 @@ import { createContext } from "@bluelibs/runner";

const UserCtx = createContext<{ userId: string }>("app.userContext");

// In middleware or entry-point
// In middleware-entry-point
UserCtx.provide({ userId: "u1" }, async () => {
await someTask(); // has access to the context
});
Expand Down Expand Up @@ -641,21 +641,21 @@ export const cResource = resource({
}) as IResource<void, string>; // void config, returns string
```

## Validation (optional and libraryagnostic)
## Validation (optional and library-agnostic)

## Event Cycle Safety

To prevent eventdriven deadlocks, the runner detects cycles during emission:
To prevent event-driven deadlocks, the runner detects cycles during emission:

- A cycle occurs when an event emits another event that eventually reemits the original event within the same emission chain (for example: `e1 -> e2 -> e1`).
- A cycle occurs when an event emits another event that eventually re-emits the original event within the same emission chain (for example: `e1 -> e2 -> e1`).
- When a cycle is detected, an `EventCycleError` is thrown with a readable chain to help debugging.
- A hook reemitting the same event it currently handles is allowed only when the emission originates from the same hook instance (useful for idempotent/noop retries); other cases are blocked.
- A hook re-emitting the same event it currently handles is allowed only when the emission originates from the same hook instance (useful for idempotent/no-op retries); other cases are blocked.

Guidance:

- Prefer oneway flows; avoid mutual crossemits between hooks.
- Use `event.stopPropagation()` to shortcircuit handlers when appropriate.
- Use tags (for example, `globals.tags.excludeFromGlobalHooks`) to scope listeners and avoid unintended reentry via global hooks.
- Prefer one-way flows; avoid mutual cross-emits between hooks.
- Use `event.stopPropagation()` to short-circuit handlers when appropriate.
- Use tags (for example, `globals.tags.excludeFromGlobalHooks`) to scope listeners and avoid unintended re-entry via global hooks.

Interface any library can implement:

Expand Down Expand Up @@ -695,3 +695,7 @@ middleware({
configSchema, // runs on .with()
});
```




59 changes: 59 additions & 0 deletions src/__tests__/models/EventManagerFailure.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

import { EventManager } from "../../models/EventManager";
import { defineEvent } from "../../define";
import { IEvent } from "../../defs";

describe("EventManager Parallel Failure Behavior", () => {
let eventManager: EventManager;
let parallelEvent: IEvent<string>;

beforeEach(() => {
eventManager = new EventManager({ runtimeCycleDetection: true });
parallelEvent = defineEvent<string>({ id: "parallelEvent", parallel: true });
});

it("should execute all listeners in a batch even if one fails, but stop before next batch", async () => {
const results: string[] = [];
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// Batch 0: Listener 1 (Throws immediately)
eventManager.addListener(
parallelEvent,
async () => {
throw new Error("Fail immediately");
},
{ order: 0 }
);

// Batch 0: Listener 2 (Succeeds after delay)
eventManager.addListener(
parallelEvent,
async () => {
await delay(50);
results.push("batch0-slow-success");
},
{ order: 0 }
);

// Batch 1: Listener 3 (Should not run)
eventManager.addListener(
parallelEvent,
async () => {
results.push("batch1-should-not-run");
},
{ order: 1 }
);

// Expect the emit to throw
await expect(eventManager.emit(parallelEvent, "data", "test")).rejects.toThrow("Fail immediately");

// Wait a bit to ensure the slow listener had time to finish (if it was running)
await delay(100);

// Verify behavior
// 1. "batch0-slow-success" SHOULD be in results (it started running)
// 2. "batch1-should-not-run" SHOULD NOT be in results (stopped at group level)
expect(results).toContain("batch0-slow-success");
expect(results).not.toContain("batch1-should-not-run");
});
});
Loading