Skip to content

feat(pipe): add dynamic concurrency autoscaling#119

Open
fxsml wants to merge 6 commits into
developfrom
feature/worker-pool
Open

feat(pipe): add dynamic concurrency autoscaling#119
fxsml wants to merge 6 commits into
developfrom
feature/worker-pool

Conversation

@fxsml
Copy link
Copy Markdown
Owner

@fxsml fxsml commented Feb 10, 2026

Summary

  • Add AutoscaleConfig to pipe.Config for automatic worker scaling based on load
  • Scale up when workers are saturated, scale down after idle timeout
  • Configurable min/max workers, cooldowns, and check intervals
  • Sensible defaults (1 to NumCPU workers, 30s idle timeout)

Changes

  • pipe/autoscale.go: public AutoscaleConfig type
  • pipe/internal/autoscale/pool.go: pool implementation with scaling logic
  • pipe/processing.go: integration with ProcessPipe
  • Design docs in docs/plans/pipe-autoscaling.md and pipe-ordering.md

Test Plan

  • Unit tests cover scaling behavior and edge cases (646 lines)
  • Benchmarks included (293 lines)
  • make check passes

claude and others added 6 commits January 22, 2026 06:01
Add AutoscaleConfig to Config struct enabling automatic worker scaling
based on backpressure. When all workers are busy, new workers spawn up
to MaxWorkers. When workers are idle beyond ScaleDownAfter, they are
stopped down to MinWorkers.

Features:
- AutoscaleConfig with MinWorkers, MaxWorkers, ScaleDownAfter
- ScaleUpCooldown and ScaleDownCooldown to prevent thrashing
- CheckInterval for scaler evaluation frequency
- Backward compatible: existing Concurrency config still works

Implementation:
- pipe/autoscale.go: AutoscaleConfig type
- pipe/internal/autoscale/: Pool with worker lifecycle management
- Integrates with existing startProcessing via delegation
Add comprehensive edge case tests for autoscale pool:
- Goroutine leak tests (input close, context cancel, Stop())
- Empty input handling
- Context cancellation behavior
- Scale-up cooldown enforcement
- Multiple outputs per input

Add benchmark tests comparing static concurrency vs autoscale:
- Various worker counts and item counts
- Direct comparison benchmarks
- Burst load pattern benchmarks

Results show autoscale performs better for burst workloads
(~76ms vs ~146ms) by scaling up during load spikes.
- Extract startStaticProcessing() for consistent abstraction level
- Make startProcessing() a simple dispatcher between static/autoscale
- Add AutoscaleConfig.parse() method with defaultAutoscaleConfig
- Remove duplicate Config type from internal/autoscale package
- Remove ProcessFunc duplication, use generic function parameter
- Rename internal autoscale.Config to PoolConfig for clarity
- Update tests to use PoolConfig directly

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Mark status as Implemented
- Update file structure to match actual implementation
- Add Convention Alignment section documenting patterns used
- Simplify to focus on what was built vs original design
- Remove implementation steps (already done)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update pipe-autoscaling.md to use PoolConfig/Workers naming convention
  (aligned with message package)
- Add reference to future ordering extension
- Create pipe-ordering.md with sequence-based reordering proposal
- Update plans README with new ordering plan
- Remove nested AutoscaleConfig, unify into single PoolConfig
- Workers serves dual purpose: static count or autoscale minimum
- MaxWorkers > Workers enables autoscaling (implicit mode selection)
- Separate Config (pipe behavior) from PoolConfig (worker config)
- Update ordering plan to use new config structure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants