Skip to content

Event bus batching: consolidate high-frequency events instead of per-item spray #465

@joelteply

Description

@joelteply

Problem

The event bus fires individual events for every data operation, every index entry, every telemetry data point. This floods the event loop, blocks health checks, and causes the 5090 tower to never become healthy on first start.

Root Cause Analysis

Hot paths found:

  • data.rs:138publish_event() per ORM create/update/delete
  • code.rs:68 — per-file codebase index event
  • 283 TS→Rust IPC request sites
  • 38 Rust bus.publish sites
  • 196 logger calls in Rust modules

The MessageBus (message_bus.rs:190) sends immediately via sender.send(event). No batching, no coalescing, no rate awareness.

Design: BatchingBus Wrapper

/// Wraps MessageBus with transparent event batching.
/// Callers emit normally. Bus accumulates events by prefix,
/// flushes on interval or count threshold.
pub struct BatchingBus {
    inner: Arc<MessageBus>,
    // Per-prefix accumulator: "data:users" → Vec<BusEvent>
    accumulators: DashMap<String, Vec<BusEvent>>,
    // Config per prefix
    config: BatchConfig,
}

pub struct BatchConfig {
    // Default: batch for 50ms OR 100 events, whichever first
    default_interval_ms: u64,  // 50
    default_max_count: usize,  // 100
    // Per-prefix overrides
    overrides: HashMap<String, (u64, usize)>,
    // Passthrough prefixes (no batching — real-time events)
    passthrough: HashSet<String>,
}

impl BatchingBus {
    pub fn publish(&self, event_name: &str, payload: Value) {
        let prefix = extract_prefix(event_name); // "data:users:created" → "data:users"
        
        // Check passthrough
        if self.config.passthrough.contains(&prefix) {
            self.inner.publish_async_only(event_name, payload);
            return;
        }
        
        // Accumulate
        let mut acc = self.accumulators.entry(prefix).or_default();
        acc.push(BusEvent { name: event_name.to_string(), payload });
        
        // Flush if count exceeded
        let (_, max_count) = self.config.for_prefix(&prefix);
        if acc.len() >= max_count {
            self.flush_prefix(&prefix);
        }
    }
    
    /// Timer-driven flush (runs on interval)
    pub fn tick(&self) {
        for entry in self.accumulators.iter() {
            if !entry.value().is_empty() {
                self.flush_prefix(entry.key());
            }
        }
    }
    
    fn flush_prefix(&self, prefix: &str) {
        if let Some((_, events)) = self.accumulators.remove(prefix) {
            // Emit ONE consolidated event
            self.inner.publish_async_only(
                &format!("{}:batch", prefix),
                json!({ "count": events.len(), "events": events }),
            );
        }
    }
}

Default Configuration

chat:*           → passthrough (real-time, user-facing)
presence:*       → passthrough (typing indicators)
sentinel:*:status → passthrough (progress updates)

data:*           → batch 50ms / 100 events
code_index:*     → batch 200ms / 500 events
gpu:*            → batch 1000ms / 50 events
training:*       → batch 500ms / 20 events

Integration Points

  1. MessageBus: Wrap with BatchingBus, transparent to all callers
  2. ORM: data.rs publish_event already goes through bus — gets batched automatically
  3. CodebaseIndexer: index events get batched automatically
  4. Telemetry: GPU/memory readings get consolidated
  5. IPC: TS→Rust requests could batch too (same pattern on the IPC transport)
  6. Subscribers: need to handle both single events AND batch events (adapter trait)

The Trait

/// Subscribers implement this to handle both modes
trait BatchAwareSubscriber {
    fn on_event(&self, event: &BusEvent);
    fn on_batch(&self, events: &[BusEvent]) {
        // Default: iterate and call on_event
        for e in events { self.on_event(e); }
    }
}

Subscribers that can handle batches efficiently override on_batch. Others get the default iterate-and-dispatch.

Impact

  • Tower startup: indexer events batched → bus not flooded → health check succeeds
  • Latency: fewer event dispatches → less overhead → faster response
  • IPC: batched Rust→TS events → fewer WebSocket frames
  • ORM: bulk operations emit ONE event, not N
  • Logger: batch log writes to disk

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions