Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/plans/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

## Current

| Plan | Title | Status |
|------|-------|--------|
| [0007](0007-cesql-pattern-matching.md) | CESQL Pattern Matching | Proposed |
| Plan | Status | Description |
|------|--------|-------------|
| [0007](0007-cesql-pattern-matching.md) | Proposed | CESQL Pattern Matching |
| [pipe-autoscaling](pipe-autoscaling.md) | In Progress | Dynamic worker pool |
| [pipe-ordering](pipe-ordering.md) | Proposed | Preserved message ordering |

## Agent Guidance

Expand Down
244 changes: 244 additions & 0 deletions docs/plans/pipe-autoscaling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# Dynamic Worker Pool for gopipe

**Status:** In Progress

## Overview

Provide a unified worker pool abstraction for the `pipe` package that supports:
1. Static worker configuration (fixed concurrency)
2. Dynamic autoscaling based on load
3. Future: Preserved message ordering (see [pipe-ordering](pipe-ordering.md))

The API aligns with the `message` package's `PoolConfig` pattern for consistency.

## Design Approach

### Naming Convention (aligned with message package)

| message package | pipe package |
|-----------------|--------------|
| `PoolConfig.Workers` | `PoolConfig.Workers` |
| `PoolConfig.BufferSize` | `PoolConfig.BufferSize` |
| Named pools via `AddPoolWithConfig` | `Pool` struct for internal management |

### Unified Config: Workers + MaxWorkers

The key insight is that `Workers` serves dual purpose:
- **Static mode**: The fixed worker count
- **Autoscale mode**: The minimum worker count (floor)

Autoscaling is enabled when `MaxWorkers > Workers`. This eliminates the need for a separate `MinWorkers` field or nested `AutoscaleConfig` struct.

| Workers | MaxWorkers | Mode | Result |
|---------|------------|------|--------|
| 0 | 0 | static | 1 worker (default) |
| 4 | 0 | static | 4 workers |
| 4 | 4 | static | 4 workers |
| 2 | 16 | autoscale | 2-16 workers |

### Backpressure-based scaling (validated against industry patterns)

| Library | Scale-Up Method | Scale-Down Method |
|---------|-----------------|-------------------|
| **Pond** | All workers busy + queue depth | Immediate on idle or IdleTimeout |
| **Ants** | Fixed capacity (no autoscale) | Periodic scavenger (ExpiryDuration, default 1s) |
| **workerpool-go** | Load avg > threshold (EWMA) | Load avg < threshold |
| **Watermill** | N/A (partition-based, implicit) | N/A |

**Our approach** (aligns with Pond's simpler model):
- Scale up: when all workers are busy (activeWorkers == totalWorkers) AND workers < MaxWorkers
- Scale down: when a worker has been idle for `ScaleDownAfter` AND workers > Workers
- Cooldown periods prevent thrashing

**Why not Watermill's approach?** Watermill relies on message broker partitions for parallelism. gopipe is a general-purpose pipeline library, so explicit worker management is more appropriate.

## Implementation

### File Structure

```
pipe/
├── pool.go # PoolConfig type with parse() and isAutoscale()
├── processing.go # Config + Dispatcher + startStaticProcessing() + startAutoscaledProcessing()
└── internal/autoscale/
├── config.go # Default constants
├── pool.go # Pool struct, worker management, scaler loop
└── pool_test.go # Unit tests
```

### Configuration API

```go
// In pipe/pool.go
type PoolConfig struct {
// Workers sets worker count (static mode) or minimum workers (autoscale mode).
// Default: 1
Workers int

// MaxWorkers enables autoscaling when > Workers.
// Workers scale between Workers and MaxWorkers based on backpressure.
// If <= Workers (including 0), uses static mode with Workers count.
// Default: Workers (static mode)
MaxWorkers int

// Autoscale timing (only used when MaxWorkers > Workers)
ScaleDownAfter time.Duration // Default: 30s
ScaleUpCooldown time.Duration // Default: 5s
ScaleDownCooldown time.Duration // Default: 10s
CheckInterval time.Duration // Default: 1s

// BufferSize sets the output channel buffer size.
// Default: 0 (unbuffered)
BufferSize int
}

func (c PoolConfig) isAutoscale() bool {
return c.MaxWorkers > c.Workers
}

// In pipe/processing.go
type Config struct {
// Pool configures the worker pool.
Pool PoolConfig

// ErrorHandler is called when processing fails.
// Default logs via slog.Error.
ErrorHandler func(in any, err error)

// CleanupHandler is called when processing is complete.
CleanupHandler func(ctx context.Context)

// CleanupTimeout sets the timeout for cleanup operations.
CleanupTimeout time.Duration

// ShutdownTimeout controls shutdown behavior on context cancellation.
// If <= 0, waits indefinitely for input to close naturally.
// If > 0, waits up to this duration then forces shutdown.
ShutdownTimeout time.Duration
}
```

### Usage Examples

```go
// Static workers (simple case)
p := pipe.NewProcessPipe(fn, pipe.Config{
Pool: pipe.PoolConfig{Workers: 4},
})

// Autoscaling workers (MaxWorkers > Workers enables it)
p := pipe.NewProcessPipe(fn, pipe.Config{
Pool: pipe.PoolConfig{Workers: 2, MaxWorkers: 16},
})

// Default (1 static worker)
p := pipe.NewProcessPipe(fn, pipe.Config{})

// Future: Ordered processing (phase 2)
p := pipe.NewProcessPipe(fn, pipe.Config{
Pool: pipe.PoolConfig{Workers: 4, PreserveOrder: true},
})
```

### Processing Dispatcher

```go
func startProcessing[In, Out any](...) <-chan Out {
cfg = cfg.parse()

if cfg.Pool.isAutoscale() {
return startAutoscaledProcessing(ctx, in, fn, cfg)
}
return startStaticProcessing(ctx, in, fn, cfg)
}
```

### Internal Pool (pipe/internal/autoscale/pool.go)

```go
type Config struct {
MinWorkers, MaxWorkers int
ScaleDownAfter time.Duration
ScaleUpCooldown, ScaleDownCooldown time.Duration
CheckInterval time.Duration
}

type Pool[In, Out any] struct {
cfg Config
fn func(context.Context, In) ([]Out, error)
workers map[int]*worker
totalWorkers atomic.Int64
activeWorkers atomic.Int64
// ...
}

func NewPool[In, Out any](cfg Config, fn func(...), ...) *Pool[In, Out]
func (p *Pool) Start(ctx context.Context, in <-chan In, bufferSize int) <-chan Out
func (p *Pool) Stop()
func (p *Pool) TotalWorkers() int64
func (p *Pool) ActiveWorkers() int64
```

Note: Internal pool uses `MinWorkers` which maps from `PoolConfig.Workers`.

## Default Values (0 = use default)

| Field | 0 means | Default value |
|-------|---------|---------------|
| Workers | use default | 1 |
| MaxWorkers | use Workers | Workers (static mode) |
| ScaleUpCooldown | use default | 5s |
| ScaleDownCooldown | use default | 10s |
| ScaleDownAfter | use default | 30s |
| CheckInterval | use default | 1s |

## Convention Alignment

The implementation follows repository conventions:

| Pattern | Implementation |
|---------|----------------|
| Config naming | `PoolConfig` aligns with message package |
| Field naming | `Workers` aligns with message package |
| Separation of concerns | `PoolConfig` for workers, `Config` for pipe behavior |
| Config defaults | `parse()` method applies defaults |
| Internal packages | `pipe/internal/autoscale/` for pool implementation |
| Dispatcher pattern | `startProcessing()` dispatches based on `isAutoscale()` |

## Verification

- **Unit tests**: 15+ tests covering min/max enforcement, scale-up triggers, scale-down on idle, cooldowns, goroutine leak detection
- **Benchmarks**: Comparison between static and autoscale processing under various loads
- **All tests pass**: `go test ./...` succeeds

## Future Enhancements

### Phase 2: Preserved Message Ordering

See [pipe-ordering.md](pipe-ordering.md) for the detailed plan.

The `PoolConfig` will be extended with:
```go
type PoolConfig struct {
// ... existing fields ...

// PreserveOrder enables in-order message delivery.
// When true, outputs are reordered to match input sequence
// despite parallel processing. Has memory/latency overhead.
// Default: false
PreserveOrder bool

// OrderBufferSize is the max items to buffer while waiting
// for in-sequence items. Only used when PreserveOrder is true.
// Default: MaxWorkers * 2 (or Workers * 2 if static)
OrderBufferSize int
}
```

### Other Future Enhancements (out of scope)

Based on research, these could be added later:
- **Strategies** (like Pond): Eager/Balanced/Lazy scaling aggressiveness
- **EWMA load averaging** (like workerpool-go): Smoother scaling decisions
- **Metrics callbacks**: OnScaleUp/OnScaleDown hooks for observability
- **Custom ScaleStrategy interface**: User-defined scaling logic (like gopool)
Loading