Skip to content

Conversation

@rjpower
Copy link
Collaborator

@rjpower rjpower commented Jan 7, 2026

This implements a distributed "job" backend for Fray using Connect/RPC and our own implementation instead of Ray.

This is a complete implementation, including actor & task support. There is no CLI or web support yet, and it is not yet enabled by default.

In preparation for this, I've migrated Zephyr to use GCS spilling instead of returning generators. This simplifies our RPC implementation requirements. In the future, we may consider having Zephyr use it's own RPC service to avoid spilling if performance is critical.

rjpower

This comment was marked as outdated.

* Remove stale code.
* Split context code in prep for introducing new Fray RPC context.
@rjpower rjpower force-pushed the rjpower/20260106-fray-rpc-v0 branch from c3eded6 to d62c72a Compare January 8, 2026 01:40
Enables stateful distributed computation with automatic placement, lifecycle management, and failure recovery. Actors are distributed using least-loaded placement and automatically restart on worker failures.
- Update exception handling test to expect original exception type
- Add worker server fixtures for proper actor instantiation testing
- Fix actor task routing to only assign to hosting worker
- Use unique actor names to avoid test isolation issues
- Convert ConnectError to ValueError for actor name conflicts
Use specific git commit (5342eace) from connect-python main branch that fixes
the duplicate google.rpc.Status symbol conflict with googleapis-common-protos.

The fix renames the package from google.rpc to grpc.status.v1 to avoid conflicts.
This resolves the itest (3.12) failure caused by importing both google-cloud-storage
and connect-python.

Ref: connectrpc/connect-python#76
@rjpower rjpower changed the title Rjpower/20260106 fray rpc v0 Initial implementation of the Fray/RPC controller. Jan 8, 2026
serialized_call=serialized_call,
)

for attempt in range(self._max_retries):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be better handled.

This should have _worker and _controller client stubs.

First ask the controller for the worker owning the actor, retrying in an exp backoff loop until worker is ready or user timeout hits. If controller isn't avaialble, handle as a normal backoff as well.

Create stub to the resolved worker. Call that stub with exp backoff. This is an inner loop:

while not successful:
stub = controller.resolve_worker(actor)
call_method(stub)



@pytest.mark.asyncio
async def test_actor_restart_on_worker_failure_integration(controller_url):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new worker at the end of this test, validate the actor recovers on the new worker if retries are enabled



@pytest.mark.asyncio
async def test_report_complete_nonexistent_task(client_url):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove all these, controller will now probe

assert future._task_id == "task-123"


def test_actor_retry_exponential_backoff_timing(rpc_controller):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this test

assert future._task_id == "task-123"


def test_actor_retry_max_delay_cap(rpc_controller):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file can now be removed, the functionality is tested elsewhere

@rjpower rjpower requested review from dlwh and ravwojdyla January 8, 2026 23:42
@rjpower
Copy link
Collaborator Author

rjpower commented Jan 8, 2026

Still cleaning up a bunch of Claude stuff, but probably worth taking a peek at.

@ravwojdyla might be interested in the Zephyr changes.

I'm still debating how to integrate this into our cluster & gang-scheduling system, but we'll want some variation on this in any case. Ray will still be the default RPC for now, but this will let me test independently of Ray as we roll out the cluster changes.

@rjpower rjpower force-pushed the rjpower/20260106-fray-rpc-v0 branch from 9dcaac7 to c47b4a0 Compare January 9, 2026 01:13
@ravwojdyla
Copy link
Contributor

ravwojdyla commented Jan 10, 2026

@rjpower somewhat high level question. I've been thinking what is the minimal amount of abstractions/components required to express Marin workloads (i.e. zephyr, RL, training), and specifically if RPC (and thus service discovery, back-pressure etc) is required?

Could we live with 3 components:

  • controller
  • cluster manager (à la Frey, abstraction over resources, can trigger docker container)
    • containers are stateless, most of the times they just write to GCS/pubsub
  • (potentially durable) pubsub

If low latency RPC is required then GCS/pubsub may not work, but afaiu that's not the case.

Is pubsub a dumb idea?

@rjpower
Copy link
Collaborator Author

rjpower commented Jan 10, 2026

@rjpower somewhat high level question. I've been thinking what is the minimal amount of abstractions/components required to express Marin workloads (i.e. zephyr, RL, training), and specifically if RPC (and thus service discovery, back-pressure etc) is required?

Could we live with 3 components:

  • controller

  • cluster manager (à la Frey, abstraction over resources, can trigger docker container)

    • containers are stateless, most of the times they just write to GCS/pubsub
  • (potentially durable) pubsub

If low latency RPC is required then GCS/pubsub may not work, but afaiu that's not the case.

Is pubsub a dumb idea?

I agree, I've been thinking something similar. I totally agree on the containers - that feels like we can have a really simple system there that's clear.

I don't think we can just do pubsub but I think we can do something pretty close (e.g. for some of our jobs we want to explicitly tell all of the workers a start training message, so we want to be able to send that signal somehow). I think a basic registration system will handle all of our needs though. Our messaging system doesn't need to be particularly high performance to start with either. We kind of "cheat" today, and use Arrow Flight RPC explicitly to send weights around for RL (b/c Ray would OOM) -- if we did make our RPC system reasonably fast, we could use that instead -- but that can be later down the line.

rjpower and others added 11 commits January 10, 2026 16:47
Introduces the fluster library, a new actor and cluster management system built on connect-python and protobuf. Includes:
- Actor, controller, and worker proto definitions
- Connect-python RPC integration
- Cluster backend abstractions
- Type definitions and basic tests
- Buf configuration for protobuf generation
Implements Stage 4 of fluster worker: Docker image builder with content-addressed
tags, two-layer caching (dependencies + source), and LRU eviction.

- Content-addressed image tags: {registry}/fluster-job-{job_id}:{deps_hash[:8]}
- Dockerfile with UV + BuildKit cache mounts for dependency caching
- Cache hit detection via docker image inspect
- LRU eviction for old images based on creation time
- Only supports workspace-based builds (EnvironmentConfig.workspace required)

Also improves VenvCache with proper error handling and shell escaping.
Implements Stage 5 of the worker implementation plan:
- ContainerConfig and ContainerResult dataclasses
- DockerRuntime class for container lifecycle management
- Cgroups v2 resource limits (CPU/memory)
- Security hardening (no-new-privileges, cap-drop ALL)
- Log streaming via callback
- Timeout handling with forced kill
- Port mapping and volume mount support

All methods use async subprocess execution and proper error handling.
21 comprehensive tests verify all functionality including resource limits,
timeouts, log streaming, and security hardening.
Changes:
- Remove log_callback parameter and _stream_logs method from DockerRuntime
  Log fetching should be done via the FetchLogs RPC, not during execution
- Replace cpu_millicores and memory_mb fields with ResourceSpec proto
  Eliminates duplication between ContainerConfig and cluster.proto
- Add helper methods get_cpu_millicores() and get_memory_mb() to parse
  ResourceSpec into Docker-compatible values
- Update all tests to use ResourceSpec instead of raw fields
- Remove 2 log streaming-specific tests (19 tests remain, all passing)

DockerRuntime now properly aligns with the proto interface design where
logs are fetched separately rather than streamed during execution.
Implements Stage 6 of worker implementation with thread-safe port allocation
that tracks allocated ports to avoid conflicts and releases them on job
termination. Allocates from configurable range (default 30000-40000) and
verifies ports are available before allocation.

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Remove unused allocator fixture parameter from tests that create their own allocator instances with specific port ranges.
Implements Stage 7 of worker implementation: JobManager class that
coordinates job execution through all phases with proper resource
management and cleanup.

Key features:
- Job lifecycle phases: PENDING → BUILDING → RUNNING → terminal states
- Integrates BundleCache, VenvCache, ImageBuilder, DockerRuntime, PortAllocator
- Semaphore-based concurrency limiting (configurable max_concurrent_jobs)
- Port allocation and release for named ports
- Graceful job killing with SIGTERM → SIGKILL timeout
- Log collection via asyncio.Queue with tail support (negative start_line)
- Automatic cleanup: removes containers and releases ports on completion
- Background task execution via asyncio

Implementation files:
- src/fluster/cluster/worker/types.py: Job dataclass for internal state tracking
- src/fluster/cluster/worker/manager.py: JobManager class with lifecycle orchestration
- tests/cluster/worker/test_manager.py: Comprehensive test coverage
- Add type annotation for optional memory_str parameter
- Fix Entrypoint kwargs type to allow None
- Remove unused ContainerResult import
- Remove unused test fixture parameters
Replace incorrect in-memory queue-based logs with disk-based approach:
- Logs written to {job_workdir}/STDOUT and {job_workdir}/STDERR files
- Logs read from disk on demand
- Container output redirected to files via shell redirection

Changes:
- Remove log_queue from Job dataclass, add workdir (Path)
- Remove stream_logs() method from DockerRuntime
- Add stdout_file/stderr_file to ContainerConfig for log redirection
- Update DockerRuntime to mount log files and redirect container output
- Create job working directory in JobManager.submit_job()
- Pass log file paths to runtime in _execute_job()
- Rewrite get_logs() to read from disk files
- Add workdir cleanup to _execute_job() finally block
- Update tests to write to disk files instead of log queue

All 27 tests pass.
rjpower and others added 28 commits January 12, 2026 20:35
- Add WorkerConfig dataclass for static worker configuration
- Add load_workers_from_config() to register workers at startup
- Add find_worker_for_job() for simple first-fit worker selection
- Add tests for worker loading and selection

Part of controller-v0 implementation.
- Add Scheduler class with background daemon thread for job dispatch
- Wake-on-event pattern using threading.Event for timer + signal
- Dispatches pending jobs to available workers via provided dispatch_fn
- Updates job state to RUNNING on successful dispatch
- Marks workers unhealthy and re-queues jobs on dispatch failure
- Clean shutdown with 5-second timeout
- Add 9 comprehensive tests for scheduler behavior

Part of controller-v0 implementation.
- Add HeartbeatMonitor class with background thread for periodic worker polling
- Track consecutive heartbeat failures per worker (threshold: 3)
- Mark worker unhealthy and jobs as WORKER_FAILED when threshold exceeded
- Sync job states from heartbeat responses (terminal states remove from running_jobs)
- Call on_worker_failed callback to trigger retry logic
- Add 10 comprehensive tests covering failure detection and job state sync

Part of controller-v0 implementation.
- Add handle_job_failure() for individual job retry with failure type tracking
- Add handle_gang_failure() with all-or-nothing retry semantics
- Separate tracking for worker failures (preemption_count) vs job failures (failure_count)
- Gang retry only proceeds if ALL jobs have retries remaining
- Jobs marked KILLED only when retry not possible (avoids inconsistent state)
- Add 14 comprehensive tests covering retry limits, state reset, and gang behavior

Part of controller-v0 implementation.
- Add ControllerServiceImpl with RPC handlers for job management
- launch_job: submits job, wakes scheduler
- get_job_status: returns job status, raises NOT_FOUND if missing
- terminate_job: marks job KILLED (idempotent for terminal states)
- list_jobs: returns all jobs (uses thread-safe list_all_jobs)
- Add list_all_jobs() to ControllerState for thread-safe job listing
- Add 11 comprehensive tests for RPC handlers

Part of controller-v0 implementation.
- Add comprehensive integration tests exercising full controller system
- test_full_job_lifecycle: submission through completion
- test_job_failure_and_retry: failure with retry logic
- test_worker_failure_triggers_retry: worker death triggers job retry
- test_multiple_jobs_scheduled_sequentially: multiple jobs complete
- test_job_terminated_during_execution: RPC termination works
- test_scheduler_wakes_on_worker_registration: delayed dispatch
- test_list_jobs_shows_all_states: mixed state listing
- test_concurrent_job_execution_on_multiple_workers: multi-worker support

Part of controller-v0 implementation.
Add aiohttp-based HTTP dashboard for controller visibility:
- Index route (/) showing workers and jobs in HTML tables
- Health endpoint (/health) returning JSON status
- XSS protection via html.escape() for all user data
- Thread-safe state access via list_all_workers()

Includes comprehensive test coverage including XSS verification.
- Refactor dashboard HTML for better formatting
- Fix parameter naming in service to match protocol
- Update cluster_example.py with improved usage patterns
- Remove redundant fluster/uv.lock file
- Add action tracking tests for state module
- Add cloudpickle to minimal workspace dependencies for container jobs
- Serialize raw callable/args/kwargs tuple instead of Entrypoint class
  to avoid requiring fluster.cluster.types in containers
- Use host Python version for Docker base image to prevent bytecode
  incompatibility (cloudpickle serializes version-specific bytecode)
- Add click CLI with --wait/--no-wait flag for dashboard exploration
- Add exit code column to worker dashboard jobs table
Implement resource tracking and capacity-based job scheduling:

- Add JOB_STATE_UNSCHEDULABLE and scheduling_timeout_seconds to proto
- Add resources.py with memory parsing (via humanfriendly) and device helpers
- Add peek_pending_jobs() and remove_from_queue() to state for non-blocking scheduling
- Add get_committed_resources(), worker_can_fit_job() to workers.py
- Update scheduler to skip jobs that don't fit (prevents head-of-line blocking)
- Jobs that can't be scheduled within timeout are marked UNSCHEDULABLE
- Resource matching: CPU, memory, device type, device variant, GPU count

Tests: 119 controller tests passing
…oint registry

- Stage 1: ActorServer and ActorClient with cloudpickle serialization
- Stage 2: Resolver protocol with FixedResolver for static endpoints
- Stage 3: Controller endpoint registry with job-state filtering
- Stage 4: ClusterResolver for controller-based discovery (WIP - needs Connect RPC)
- ActorServer now implements ActorService protocol and uses ActorServiceASGIApplication
- ActorClient uses ActorServiceClientSync instead of raw httpx
- ClusterResolver uses ControllerServiceClientSync instead of raw httpx
- Added AsyncControllerServiceWrapper for testing with ASGI apps
- Fixed generated connect imports to use `from fluster import` pattern
- ActorPool for managing multiple actor endpoints
- Round-robin load balancing via call() proxy
- Parallel broadcast via broadcast() proxy with BroadcastFuture
- CallResult dataclass for individual endpoint results
- ThreadPoolExecutor for concurrent broadcast calls
- Add GcsResolver for service discovery via GCS VM instance metadata
- Add GcsApi protocol for dependency injection
- Add RealGcsApi for production use with google-cloud-compute
- Add MockGcsApi for testing without GCP credentials
- Export GcsResolver, GcsApi, and MockGcsApi from actor module
- Add comprehensive tests for namespace isolation, RUNNING status filtering, and multi-instance resolution

Part of fray-zero Actor and Resolver implementation.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- GcsApi protocol for dependency injection
- RealGcsApi using google-cloud-compute
- MockGcsApi for testing without GCP credentials
- GcsResolver queries VM instance metadata tags
- Metadata convention: fluster_actor_<name> -> port, fluster_namespace -> ns
- Added ListMethodsRequest/Response and ListActorsRequest/Response proto messages
- ActorServer implements list_methods() and list_actors() handlers
- Uses inspect.signature for method signatures and inspect.getdoc for docstrings
- Added 4 new tests for introspection functionality
Add three comprehensive examples demonstrating actor patterns:

1. Basic Actor Pattern (example_actor_basic):
   - Creating and registering actor servers
   - Using ActorClient with ClusterResolver for service discovery
   - Calling actor methods with arguments and return values
   - Example: Calculator actor with add/multiply/get_history methods

2. Coordinator Pattern (example_actor_coordinator):
   - Coordinator actor managing a task queue
   - Worker actors fetching and processing tasks
   - Context injection via current_ctx() for actor-to-actor communication
   - Pull-based task distribution pattern

3. Actor Pool Pattern (example_actor_pool):
   - ActorPool for load-balanced round-robin calls
   - Broadcast operations to all actor instances
   - Collecting results from broadcast with wait_all()
   - Example: Multiple inference servers with load balancing and broadcast

CLI enhancements:
- Added --mode option: all (default), actors, jobs
- Actor examples don't require Docker
- Updated help text and error messages

Part of fray-zero actor and resolver implementation.
- Update remaining references to use Controller.* and Worker.* prefixes
- Remove dead Heartbeat RPC tests (RPC was unused - controller polls workers)
- Remove heartbeat method from AsyncControllerServiceWrapper test mock
Following the Controller pattern, move all job management logic directly
into Worker class:

- Move PortAllocator and job execution logic from manager.py to worker.py
- Delete manager.py entirely
- Add JobProvider Protocol to service.py for cleaner interface
- Simplify main.py CLI to use Worker + WorkerConfig
- Merge test_manager.py and test_integration.py into test_worker.py
- Update cluster_example.py to set worker_id via WorkerConfig
- Update docs/worker.md

Worker now directly owns job state (_jobs, _lock, _semaphore) and handles
the full job lifecycle (submit, build, run, monitor, cleanup).
@rjpower rjpower closed this Jan 14, 2026
@rjpower rjpower deleted the rjpower/20260106-fray-rpc-v0 branch January 14, 2026 19:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants