Skip to content

Conversation

@antfin-oss
Copy link

This Pull Request was created automatically to merge the latest changes from master into main branch.

πŸ“… Created: 2025-11-21
πŸ”€ Merge direction: master β†’ main
πŸ€– Triggered by: Scheduled

Please review and merge if everything looks good.

sampan-s-nayak and others added 30 commits November 3, 2025 12:33
This PR adds support for token-based authentication in the Ray
bi-directional syncer, for both client and server sides. It also
includes tests to verify the functionality.

---------

Signed-off-by: sampan <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: sampan <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
Support token based authentication in runtime env (client and server).
refactor existing dashboard head code so that the utils and midleware
can be reused by runtime env agent as well

---------

Signed-off-by: sampan <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: sampan <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
…un. (ray-project#58335)

## Description
> Add spark master model validation to let Ray run on Spark-On-YARN
mode.

## Why need this?
> If we directly run Ray on a YARN cluster, we need to do more tests and
integration, and also need to setup related tools and environments. If
support ray-on-spark-on-yarn and we already have Spark envs setup, we
don't need to do other things, can use Spark and let the user run
pyspark.

Signed-off-by: Cai Zhanqi <[email protected]>
Co-authored-by: Cai Zhanqi <[email protected]>
upgrading reef tests to run on 3.10

Signed-off-by: elliot-barn <[email protected]>
The issue with the current implementation of core worker HandleKillActor
is that it won't send a reply when the RPC completes because the worker
is dead. The application code from the GCS doesn't really care since it
just logs the response if one is received, a response is only sent if
the actor ID of the actor on the worker and in the RPC don't match, and
the GCS will just log it and move on with its life.

Hence we can't differentiate in the case of a transient network failure
whether there was a network issue, or the actor was successfully killed.
What I think is the most straightforward approach is instead of the GCS
directly calling core worker KillActor, we have the GCS talk to the
raylet instead and call a new RPC KillLocalActor that in turn calls
KillActor. Since the raylet that receives KillLocalActor is local to the
worker that the actor is on, we're guaranteed to kill it at that point
(either through using KillActor, or if it hangs falling back to
SIGKILL).

Thus the main intuition is that the GCS now talks to the raylet, and
this layer implements retries. Once the raylet receives the
KillLocalActor request, it routes this to KillActor. This layer between
the raylet and core worker does not have retries enabled because we can
assume that RPCs between the local raylet and worker won't fail (same
machine). We then check on the status of the worker after a while (5
seconds via kill_worker_timeout_milliseconds) and if it still hasn't
been killed then we call DestroyWorker that in turn sends the SIGKILL.

---------

Signed-off-by: joshlee <[email protected]>
upgrading data ci tests to py3.10

postmerge build:
https://buildkite.com/ray-project/postmerge/builds/14192

---------

Signed-off-by: elliot-barn <[email protected]>
Co-authored-by: Lonnie Liu <[email protected]>
upgrading serve tests to run on python 3.10

Post merge run: https://buildkite.com/ray-project/postmerge/builds/14190

---------

Signed-off-by: elliot-barn <[email protected]>
Co-authored-by: Lonnie Liu <[email protected]>
…roject#58307)

There was a video object detection Ray Data workload hang reported. 

An initial investigation by @jjyao and @dayshah observed that it was due
to an actor restart and the actor creation task was being spilled to a
raylet that had an outdated resource view. This was found by looking at
the raylet state dump. This actor creation task required 1 GPU and 1
CPU, and the raylet where this actor creation task was being spilled to
had a cluster view that reported no available GPUs. However there were
many available GPUs, and all the other raylet state dumps correctly
reported this. Furthermore in the raylet logs for the oudated raylet
there was a "Failed to send a message to node: " originating from the
ray syncer. Hence an initial hypothesis was formed that the ray syncer
retry policy was not working as intended.

A follow up investigation by @edoakes and I revealed an incorrect usage
of the grpc streaming callback API.
Currently how retries works in the ray syncer on fail to send/write is:
- OnWriteDone/OnReadDone(ok = false) is called after a failed read/write
- Disconnect() (the one in *_bidi_reactor.h!) is called which flips
_disconnected to true and calls DoDisconnect()
- DoDisconnect() notifies grpc we will no longer write to the channel
via StartWritesDone() and removes the hold via RemoveHold()
- GRPC will see that the channel is idle and has no hold so will call
OnDone()
- we've overriden OnDone() to hold a cleanup_cb that contains the retry
policy that reinitializes the bidi reactor and connects to the same
server at a repeated interval of 2 seconds until it succeeds
- fault tolerance accomplished! :) 

However from logs that we added we weren't seeing OnDone() being called
after DoDisconnect() happens. From reading the grpc streaming callback
best practices here:

https://grpc.io/docs/languages/cpp/best_practices/#callback-streaming-api
it states that "The best practice is always to read until ok=false on
the client side"
From the OnDone grpc documentation:
https://grpc.github.io/grpc/cpp/classgrpc_1_1_client_bidi_reactor.html#a51529f76deeda6416ce346291577ffa9:
it states that "Notifies the application that all operations associated
with this RPC have completed and all Holds have been removed"

Since we call StartWritesDone() and removed the hold, this should notify
grpc that all operations associated with this bidi reactor are
completed. HOWEVER reads may not be finished, i.e. we have not read all
incoming data.
Consider the following scenario:
1.) We receive a bunch of resource view messages from the GCS and have
not processed all of them
2.) OnWriteDone(ok = false) is called => Disconnected() => disconnected_
= false
3.) OnReadDone(ok = true) is called however because disconnected_ = true
we early return and STOP processing any more reads as shown below:

https://github.com/ray-project/ray/blob/275a585203bef4e48c04b46b2b7778bd8265cf46/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h#L178-L180
4.) Pending reads left in queue, and prevent grpc from calling OnDone
since not all operations are done
5.) Hang, we're left in a zombie state and drop all incoming resource
view messages and don't send any resource view updates due to the
disconnected check

Hence the solution is to remove the disconnected check in OnReadDone and
simply allow all incoming data to be read.

There's a couple of interesting observations/questions remaining:
1.) The raylet with the outdated view is the local raylet to the gcs and
we're seeing read/write errors despite being on the same node
2.) From the logs I see that the gcs syncer thinks that the channel to
the raylet syncer is still available. There's no error logs on the gcs
side, its still sending messages to the raylet. Hence even though the
raylet gets the "Failed to write error: " we don't see a corresponding
error log on the GCS side.

---------

Signed-off-by: joshlee <[email protected]>
…project#58161)

## Description
kai-scheduler supports gang scheduling at
[v0.9.3](NVIDIA/KAI-Scheduler#500 (comment)).

But gang scheduling doesn't work at v0.9.4. However, it works again at
v0.10.0-rc1.

## Related issues

## Additional information
The reason might be as follow.

The `numOfHosts` is taken into consideration at v0.9.3.

https://github.com/NVIDIA/KAI-Scheduler/blob/0a680562b3cdbae7d81688a81ab4d829332abd0a/pkg/podgrouper/podgrouper/plugins/ray/ray_grouper.go#L156-L162

The snippet of code is missing at v0.9.4.

https://github.com/NVIDIA/KAI-Scheduler/blob/281f4269b37ad864cf7213f44c1d64217a31048f/pkg/podgrouper/podgrouper/plugins/ray/ray_grouper.go#L131-L140

Then, it shows up at v0.10.0-rc1.

https://github.com/NVIDIA/KAI-Scheduler/blob/96b4d22c31d5ec2b7375b0de0e78e59a57baded6/pkg/podgrouper/podgrouper/plugins/ray/ray_grouper.go#L156-L162

---------

Signed-off-by: fscnick <[email protected]>
It is sometimes intuitive for users to provide their extensions with '.'
at the start. This PR takes care of that and removed the '.' when it is
provided.

For example, when using `ray.data.read_parquet`, the parameter
`file_extensions` needs to be something like `['parquet']`. However,
intuitively some users may interpret this parameter as being able to use
`['.parquet']`.

This commit allows users to switch from:

```python
train_data = ray.data.read_parquet(
    'example_parquet_folder/',
    file_extensions=['parquet'],
)
```

to

```python
train_data = ray.data.read_parquet(
    'example_parquet_folder/',
    file_extensions=['.parquet'],  # Now will read files, instead of silently not reading anything
)
```
…roject#58372)

When starting a Ray cluster in a Kuberay environment, the startup
process may sometimes be slow. In such cases, it is necessary to
increase the timeout duration for proper startup, otherwise, the error
"ray client connection timeout" will occur. Therefore, we need to make
the timeout and retry policies for the Ray worker configurable.

---------

Signed-off-by: OneSizeFitsQuorum <[email protected]>
…#58277)

## Description
Rich progress currently doesn't support reporting progress from worker.
As this is expected to take a lot of design into consideration, default
to using tqdm progress (which supports progress reporting from worker)

furthermore, we don't have an auto-detect to set `use_ray_tqdm`, so the
requirement is for that to be disabled as well.

In summary, requirements for rich progress as of now:
- rich progress bars enabled
- use_ray_tqdm disabled.

## Related issues
Fixes ray-project#58250

## Additional information
N/A

---------

Signed-off-by: kyuds <[email protected]>
Signed-off-by: Daniel Shin <[email protected]>
…#58381)

and also use 12.8.1 cuda base image for default

Signed-off-by: Lonnie Liu <[email protected]>
python 3.9 is out of its life cycle

Signed-off-by: Lonnie Liu <[email protected]>
it is using the same docker file, but was not updated.

Signed-off-by: Lonnie Liu <[email protected]>
Updating ray examples to run on python 3.10 as the min

Release build link:
https://buildkite.com/ray-project/release/builds/66525

Signed-off-by: elliot-barn <[email protected]>
upgrading rllib release tests to run on python 3.10

Release link: https://buildkite.com/ray-project/release/builds/66495#_
All failing tests are disabled

Signed-off-by: elliot-barn <[email protected]>
## Description
Adding missing test for issue detection

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Matthew Owen <[email protected]>
…#58414)

Sorting requirements and constraints for raydepsets

---------

Signed-off-by: elliot-barn <[email protected]>
PyArrow URL-encodes partition values when writing to cloud storage. To
ensure the values are consistent when you read them back, this PR
updates the partitioning logic to URL-decode them. See
apache/arrow#34905.

Closes ray-project#57564

---------

Signed-off-by: Lucas Lam <[email protected]>
Signed-off-by: lucaschadwicklam97 <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: Lucas Lam <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
…58345)

## Summary
Adds a new method to expose all downstream deployments that a replica
calls into, enabling dependency graph construction.

## Motivation
Deployments call downstream deployments via handles in two ways:
1. **Stored handles**: Passed to `__init__()` and stored as attributes β†’
`self.model.func.remote()`
2. **Dynamic handles**: Obtained at runtime via
`serve.get_deployment_handle()` β†’ `model.func.remote()`

Previously, there was no way to programmatically discover these
dependencies from a running replica.

## Implementation

### Core Changes
- **`ReplicaActor.list_outbound_deployments()`**: Returns
`List[DeploymentID]` of all downstream deployments
- Recursively inspects user callable attributes to find stored handles
(including nested in dicts/lists)
- Tracks dynamic handles created via `get_deployment_handle()` at
runtime using a callback mechanism

- **Runtime tracking**: Modified `get_deployment_handle()` to register
handles when called from within a replica via
`ReplicaContext._handle_registration_callback`


Next PR: ray-project#58350

---------

Signed-off-by: abrar <[email protected]>
This PR: 
- adds a new page to the Ray Train docs called "Monitor your
Application" that lists and describes the Prometheus metrics emitted by
Ray Train
- Updates the Ray Core system metrics docs to include some missing
metrics

Link to example build:
https://anyscale-ray--58235.com.readthedocs.build/en/58235/train/user-guides/monitor-your-application.html

Preview Screenshot:

<img width="1630" height="662" alt="Screenshot 2025-10-29 at 2 46 07β€―PM"
src="https://github.com/user-attachments/assets/9ca7ea6d-522b-4033-909a-2ee626960e8a"
/>

---------

Signed-off-by: JasonLi1909 <[email protected]>
Currently, users that import ray.tune can run into an ImportError if
they do not have pydantic installed. This is because ray.tune imports
ray.train, which requires pydantic. This PR prevents this error by
adding pydantic as a ray tune dependency.

Relevant user issue: ray-project#58280

---------

Signed-off-by: JasonLi1909 <[email protected]>
The timeout is due to `moto-server` which mocks the s3. Remove the
remote storage for now.

---------

Signed-off-by: xgui <[email protected]>
…project#58229)

Ray Train's framework agnostic collective utilities
(`ray.train.collective.barrier`,
`ray.train.collective.broadcast_from_rank_zero`) currently timeout after
30 minutes if not all ranks join the operation. `ray.train.report` uses
these collective utilities internally, so users who don't call report on
every rank can run into deadlocks. For example, the report barrier can
deadlock with another worker waiting on others to join a backward pass
collective.

This PR changes the default Ray Train collective behavior to never
timeout and to only log warning messages about the missing ranks. User
code typically already has timeouts such as NCCL timeouts (also 30
minutes by default), so the extra timeout here doesn't really help and
increases the user burden of keeping track of environment variables to
set when debugging hanging jobs.

New default: `RAY_TRAIN_COLLECTIVE_TIMEOUT_S=-1`

This PR also generalizes the environment variable name:
`RAY_TRAIN_REPORT_BARRIER` -> `RAY_TRAIN_COLLECTIVE`.

---------

Signed-off-by: Justin Yu <[email protected]>
which depends on datalbuild

Signed-off-by: Lonnie Liu <[email protected]>
Removing unnecessary --strip-extras flag from raydepsets (only updates
depset lock file headers):

[--strip-extras](https://docs.astral.sh/uv/reference/cli/#uv-pip-compile--no-strip-extras)
Include extras in the output file.

By default, uv strips extras, as any packages pulled in by the extras
are already included as dependencies in the output file directly.
Further, output files generated with --no-strip-extras cannot be used as
constraints files in install and sync invocations.

---------

Signed-off-by: elliot-barn <[email protected]>
nrghosh and others added 20 commits November 19, 2025 21:30
…on enum (ray-project#58509)

PR ray-project#56871 introduced the AggregationFunction enum for autoscaling
metrics aggregation. However, the serve build command's YAML
serialization path was not updated to handle Enum objects, causing
RepresenterError when serializing AggregationFunction enum values in
autoscaling config.

ex. `yaml.representer.RepresenterError: ('cannot represent an object',
<AggregationFunction.MEAN: 'mean'>)`

This fix adds 
1. a helper function that recursively converts Enum objects to their
string values before YAML dump, ensuring proper serialization of all
enum types in the configuration.
2. a test that
(a) Creates a temporary output YAML file
(b) Reads the config from that file
(c) Verifies that AggregationFunction.MEAN is correctly serialized as
"mean" (string)

Fixes ray-project#58485

---------

Signed-off-by: Nikhil Ghosh <[email protected]>
Signed-off-by: Nikhil G <[email protected]>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…th Ray Data (ray-project#58492)

This pull request updates the documentation for reading Hugging Face
datasets, recommending the use of ray.data.read_parquet with
HfFileSystem for better performance and scalability.

---------

Signed-off-by: Robert Nishihara <[email protected]>
- introduce new exception class for authentication related exceptions
(`ray.exceptions.AuthenticationError`)
- add docs link in auth exception
- fix minor papercuts
- use `RAY_LOG(FATAL)` instead of `RAY_CHECK` to avoid including `An
unexpected system state has occurred. You have likely discovered a bug
in Ray ...` in error message
- update dashboard dialogue
- update ray start error message

---------

Signed-off-by: sampan <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: sampan <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
properly add `wait_cluster.py` in wait for nodes

Signed-off-by: Lonnie Liu <[email protected]>
…ct#56743)

## Why are these changes needed?

Follow up of RLlib PR: ray-project#55291
Torch `device` allows input types of `str | int | torch.device`, this PR
unifies the type in a type variable and allows for the `int` type as
well.

Upstream air PR: ray-project#56745

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ x I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] This PR is not tested :(

---------

Signed-off-by: Daniel Sperber <[email protected]>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
## Description
Make Test assertion agnostic of Read Logical operator name

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Goutam <[email protected]>
…y-project#58813)

> Thank you for contributing to Ray! πŸš€
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> πŸ’‘ Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Tune Concurrency Cap Backpressure objectstore budget ratio

Fix threshold when backpressure should kickin as more intuitive 

`OBJECT_STORE_BUDGET_RATIO = available budget / (available budget +
usage)
`

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
## Description

Prevent calling `sort_by` on empty blocks in the `Concat.finalize`
method. Attempting to sort an empty block can cause unnecessary
overhead. This change adds a check to ensure the block has data before
attempting to sort.

Signed-off-by: Balaji Veeramani <[email protected]>
…nd making it private so it's not used in other places. (ray-project#58834)

In an attempt to break up the `//src/ray/common:asio` target, I'm moving
classes that were optimistically added as shared utilities into separate
targets and limiting their use.

---------

Signed-off-by: irabbani <[email protected]>
…ect#58388)

## Description

Fix `flat_map` type annotation to support generator functions that yield
individual dictionaries. Changes the `fn` parameter type from
`UserDefinedFunction[Dict[str, Any],
List[Dict[str, Any]]]` to `UserDefinedFunction[Dict[str, Any], Dict[str,
Any]]`.

  ## Related issues

  Fixes ray-project#57942

  ## Additional information

This allows generator functions with signature `(Dict[str, Any]) ->
Iterator[Dict[str,
  Any]]` to work without requiring `# type: ignore`.

---------

Signed-off-by: meAmitPatil <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
…8495)

## Description
Update DreamerV3 to support `num_env_runners > 0` through pulling the
single observation and action space from `self.spaces` rather than the
local env runner which doesn't have a environment to get its spaces when
`num_env_runners > 0`

## Related issues
Closes ray-project#56749

---------

Signed-off-by: Mark Towers <[email protected]>
Co-authored-by: Mark Towers <[email protected]>
… Message Batching (ray-project#57641)

Ray employs decentralized local resource scheduling on each Raylet to
enhance scheduling efficiency and scalability. To maintain correctness,
each Raylet must hold a consistent cluster-wide resource view. This
consistency is achieved through a periodic synchronization mechanism
implemented by the RaySyncer, which acts as the communication layer
between the GCS and all Raylets. Each Raylet reports its resource
updates to the GCS whenever local resources change (typically within 100
ms). Upon receiving such updates, the GCS immediately broadcasts them to
all other Raylets in the cluster, ensuring that every node has a
synchronized view of the global state.

<img width="616" height="226" alt="Image"
src="https://github.com/user-attachments/assets/f786f3e9-0998-4634-a404-f1d563699a2b"
/>

### Problem

Maintaining a consistent cluster-wide resource view is fundamental to
Ray’s decentralized scheduling model. However, under large-scale and
bursty workloads, the current synchronization model between the GCS and
Raylets becomes a major scalability bottleneck. This mechanism works
well for small clusters or infrequent updates, but it fails to scale
when the update frequency increases or when the cluster size exceeds
thousands of nodes.

A representative example is placement group (PG) creation, which
stresses the synchronization path heavily. Each PG creation triggers
resource view updates across all Raylets, and these updates must be
propagated through the GCS before tasks can be scheduled. In our 1
K-node Ray cluster, creating one PG per node and invoking
`pg.ready()`β€”which schedules tasks with PG constraintsβ€”can take up to
**10 minutes** to complete. This delay originates from excessive
synchronization overhead between the GCS and Raylets.

<img width="624" height="206" alt="Image"
src="https://github.com/user-attachments/assets/b808a8a8-1bb7-4055-8f5a-40805a856f0c"
/>

Key root causes:

- O(NΒ²) synchronization amplification β€” every resource update on one
node leads to broadcasts to all Raylets.
- Cascading updates β€” each PG creation updates node resources, which in
turn triggers more sync messages, amplifying I/O and scheduling load.
- Scheduling blockage β€” Ray tasks wait for the local Raylet to receive
up-to-date PG information, otherwise will be blocked, so the
synchronization delay directly stalls the scheduling pipeline.

As a result, during large-scale PG creation, GCS becomes a bottleneck in
message dispatch, and Raylets cannot react promptly to scheduling
events.

### Analysis

To understand the synchronization bottleneck in depth, we instrumented
the GCS to record the number of resource view synchronization messages
it received and broadcast per second during a large-scale workload. We
used placement group (PG) creation as a representative stress case
because it triggers dense and bursty resource updates across the
cluster.

Our measurements on a 1 K-node Ray cluster show that the synchronization
phase lasts nearly 10 minutes before all Raylets converge to a
consistent cluster-wide resource view. During this prolonged convergence
period, most `pg.ready()` calls remain blocked, leaving the cluster
underutilized and idle for minutes.

<img width="2596" height="846" alt="Image"
src="https://github.com/user-attachments/assets/bc9c1cd1-004a-44e4-ab1d-df5c8fc7aa4f"
/>

A closer inspection of the trace reveals two distinct synchronization
waves:

- **First wave (0 – 10 s):** Each node creates a PG and immediately
reports its updated resource state to the GCS. The GCS, following the
push-based model, broadcasts these updates to all other Raylets,
creating an intense burst of synchronization traffic across the cluster.
- **Second wave (delayed propagation):** After the first wave, the head
node starts scheduling `pg.ready()` tasks. However, each local Raylet
still lacks the latest resource view of the other nodes, so many tasks
can’t be scheduled yet. A few tasks succeed, but each of them generates
new resource updates that go back to the GCS. The GCS queues all these
updates in the same message channel used for the first wave. As a
result, the newer PG-related updates have to wait behind the backlog of
earlier updates. This queue blocking effect slows down the
synchronization even more and keeps the cluster in a long recovery
cycle.

In short, the push-based sync model creates a feedback loop: every
resource update triggers more broadcasts, which increases GCS pressure
and message delay. This positive feedback makes global convergence much
slower and causes scheduling stalls throughout the cluster.

#### Additional analysis for Verification

We also tested the `pg.wait()` API, which checks PG status through the
GCS without scheduling any tasks. It completed in seconds, proving that
GCS tracks PG metadata correctly. However, when we immediately started
actor or task scheduling, the Raylets were still waiting for their
resource views to catch up, and many tasks stayed pending.

<img width="2592" height="848" alt="Image"
src="https://github.com/user-attachments/assets/dfd71d6f-a277-4b87-8ffb-124471d7b26a"
/>

A temporary workaround is to add a manual delay (sleep) before launching
tasks, so that pg creation synchronization can finish first. In our 1
K-node cluster, a 60s delay shortened the total scheduling time to about
1.5 minutes, as shown below. But this method only hides the problem β€” it
doesn’t fix the underlying synchronization bottleneck of the ray
cluster, and only fits for PG scheduling problem.

### Proposal

We enhance the GCS by introducing batched synchronization for resource
view updates. Instead of immediately broadcasting every single update it
receives, the GCS now waits for a short configurable timeout (e.g., 500
ms) to aggregate multiple sync messages from different nodes. After the
timeout expires, the GCS merges these incremental updates into a single
consolidated message and broadcasts it to all Raylets. This simple yet
effective design breaks the message explosion loop in the original
push-based model. By coalescing redundant updates, batching
significantly reduces both the number of GCS broadcasts and the per-node
processing overhead. As a result, synchronization traffic becomes
smoother and more predictable, enabling the cluster to converge much
faster even under high-frequency update scenarios such as large-scale
placement group creation.

<img width="617" height="235" alt="Image"
src="https://github.com/user-attachments/assets/9aa4c8e8-413f-4ccd-a7a7-5d650edc8bec"
/>

#### Mechanism summary:

GCS accumulates sync messages from Raylets β†’ merges their resource diffs
β†’ broadcasts one unified resource view to all Raylets.

With batching enabled, placement group scheduling that previously took
up to 10 minutes can now finish within one minute on a 1 K-node
clusterβ€”while maintaining correctness and consistency of the
cluster-wide resource view.

However, batching can slightly reduce synchronization freshness in small
or lightly loaded clusters. To address this, we expose two configurable
parametersβ€”batch timeout and batch sizeβ€”that let users control the
trade-off between scalability and timeliness. By tuning these
parameters, users can (1) cap the maximum size of a batch, (2) adjust
how long the GCS waits before broadcasting, or (3) completely disable
batching to revert to the original immediate-broadcast behavior for
small or low-update-frequency workloads.

### Evaluation

To evaluate the effectiveness of the proposed batching mechanism, we
conducted extensive experiments on a 1,000-node Ray cluster. By default,
the batching configuration was set to a batch size of 1,000 and a batch
timeout of 500 ms.

We examined the system under various cluster scales and batching
configurations using a representative workload, i.e., each node creates
a placement group and calls `pg.ready()` to trigger distributed
scheduling. This workload generates frequent resource view updates,
providing a good stress test for the synchronization mechanism.

```Python
#!/usr/bin/env python3
import os
import time
from datetime import datetime

import ray
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

def log(evt, **kw):
    s = " ".join(f"{k}={v}" for k, v in kw.items())
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {evt} {s}", flush=True)

def main():
    ray.init(address=os.getenv("RAY_ADDRESS", "auto"))

    skip_head         = int(os.getenv("SKIP_HEAD", "1")) == 1
    cpus_per_bundle   = float(os.getenv("CPUS_PER_BUNDLE", "1"))
    num_bundle_per_pg = int(os.getenv("NUM_BUNDLE_PER_PG", "8"))
    wait_before_ready = int(os.getenv("WAIT_BEFORE_READY", "0")) # second

    nodes_all  = ray.nodes()
    nodes_live = [n for n in nodes_all if n.get("Alive", False)]
    items = []
    for n in nodes_live:
        ip = n["NodeManagerAddress"]
        res = n.get("Resources", {}) or {}
        is_head = "node:__internal_head__" in res
        items.append((ip, is_head))

    if skip_head:
        items = [(ip, h) for ip, h in items if not h]

    target_ips = [ip for ip, _ in items]
    log("CLUSTER",
        total_nodes=len(nodes_all),
        live_nodes=len(nodes_live),
        target_nodes=len(target_ips),
        head_found=any(h for _, h in items))

    pg_list = []
    for node_ip in target_ips:
        bundles = [
            {"CPU": cpus_per_bundle, f"node:{node_ip}": 0.01}
            for _ in range(num_bundle_per_pg)
        ]
        pg = placement_group(bundles, strategy="STRICT_PACK")
        pg_list.append(pg)

    log("PG_CREATE_ENQUEUED", total=len(pg_list))

    if wait_before_ready > 0:
        log("SLEEP_BEFORE_READY", sec=wait_before_ready)
        time.sleep(wait_before_ready)

    t0 = time.time()
    ray.get([pg.ready() for pg in pg_list])
    log("PG_READY_DONE", count=len(pg_list), time_s=round(time.time() - t0, 2))

if __name__ == "__main__":
    main()
```

Overall, batching maintains comparable performance in small-scale
clusters while delivering up to 25Γ— acceleration in large-scale
deployments.

#### Impact of batching under varying scales of cluster

Batching introduces negligible additional latency when the cluster is
small (≀ 10 nodes) but yields substantial benefits as the cluster size
grows.

At large scale, batching greatly accelerates resource view convergence
and unblocks scheduling earlier.
For instance, in a 1,000-node cluster, the total scheduling time
decreased from over 240 seconds to only 12 seconds with batching
enabled, demonstrating that synchronization bottlenecks were effectively
removed.

Cluster Size | # of PGs per Node | With Batching (s) | Without Batching
(s) | Improvement Ratio | Remarks
-- | -- | -- | -- | -- | --
10 nodes | 1 | 1.6 | 2 | 1.25Γ— faster | Single PG latency β‰ˆ 0.8s (both
cases)
100 nodes | 1 | 2 | 2.2 | 1.1Γ— faster | Near identical behavior, minimal
overhead
200 nodes | 1 | 2 | 3.2 | 1.6Γ— faster | Still low synchronization
overhead
500 nodes | 1 | 6 | 26 | 4.3Γ— faster | Sync delay starts to dominate
1000 nodes | 1 | 12 | 242 | 20Γ— faster | Without batching, sync storm
causes ~4 min delay


#### Impact of varying batching settings

We further evaluated the batching mechanism on a 1,250-node cluster by
tuning the batch timeout and batch size parameters.

The results show that different configurations can yield slightly
different acceleration effects, and careful tuning can further optimize
performance for specific workloads. However, the default configuration β€”
batch size = 1000 and timeout = 500 ms β€” already achieves comparable
performance across all tested scenarios. This demonstrates that the
batching mechanism is robust and effective out of the box, without
requiring additional parameter tuning.

Batching | Timeout | Sync Time (s) | Speedup
-- | -- | -- | --
1 | 500 ms | ~600 | 1.00Γ—
50 | 500 ms | ~148 | 4.05Γ—
500 | 500 ms | ~28 | 21.43Γ—
1000 | 500 ms | ~34 | 17.65Γ—
5000 | 500 ms | ~40 | 15.00Γ—
1000 | 1000 ms | ~24 | 25.00Γ—
1000 | 100 ms | ~144 | 4.17Γ—

Closes ray-project#57640

---------

Signed-off-by: Mao Yancan <[email protected]>
Signed-off-by: Mao Yancan <[email protected]>
Co-authored-by: Mao Yancan <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
Co-authored-by: Jiajun Yao <[email protected]>
- Adds documentation page for token authentication.
- Updates `security.md` to recommend token authentication and link to
`token-auth.md`.

Largely ghost written by @dstrodtman :)

---------

Signed-off-by: Edward Oakes <[email protected]>
…etDatasource` method (ray-project#58840)

## Description

The `_derive_schema` helper function was previously defined at module
level in `parquet_datasource.py`, but it's exclusively called by the
`ParquetDatasource` class. This refactor moves it to a static method of
the class, improving encapsulation and making the code structure
clearer.

## Related issues

N/A - Code organization improvement

## Additional information

- No functional changes
- All existing logic and behavior remain the same
- The method signature and implementation are unchanged

Signed-off-by: Balaji Veeramani <[email protected]>
> Thank you for contributing to Ray! πŸš€
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> πŸ’‘ Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Handle prefetches buffering in iter_batches

**Requirements**
- Consumer of `iter_batches` requires predictable latency on batches,
though overall Data Pipeline is optimized for maximizing throughput and
utilization.
- Essentially prefetching is used by the Consumer to impedance match
Latency requirement with the Throughput optimized datapipeline.

**Issue**
- Queuing/buffering was not set up in `_iter_batches` to match
prefetching.
- Multiple Workers were set up for `_format_in_threadpool` as
f(prefetch_count) which adds to latency variance on `_iter_batches`.

**Fix**
- In `_iter_batches`, set up queue depth for buffering in
`make_async_gen` to honor prefetching.
- In `_format_in_threadpool`, restrict to maximum of 4 workers by
default, so as to optimize for `iter_batches` latency.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Co-authored-by: Hao Chen <[email protected]>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…ct#58723)

## Description
When hive partitioned, partition cols don't reside in the physical
schema of the table, so you can't do projection and predicate pushdown
of that subset of columns into the read layer. Basically we filter those
out before pushing down.

## Related issues
Fixes ray-project#58714 

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <[email protected]>
… op args when refreshing metadata (ray-project#58755)

## Description
We export dataset and operator metadata whenever there is a state
change. However, the size of the export file can be very large because
the metadata also includes the DataContext config and operator args,
which does not change over time, and they will be written multiple times
to the file. To reduce the file size and remove redundant info, we can
only export DataContext and operator args when the dataset is
[registered](https://github.com/ray-project/ray/blob/d1cce8c9dc8411fad7cfbd619350bec6f19839a3/python/ray/data/_internal/stats.py#L621),
and avoid them in the later state updates.

## Related issues
Related previous PRs:
[55355](ray-project#55355),
[53554](ray-project#53554)

Signed-off-by: cong.qian <[email protected]>
- Makes setup_mlflow compatible with Ray Train V2 by avoiding deprecated
API usage
- Adds tests for testing the compatibility of Train V2 with setup_mlflow

---------

Signed-off-by: JasonLi1909 <[email protected]>
Signed-off-by: Jason Li <[email protected]>
Co-authored-by: Justin Yu <[email protected]>
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull request #686 has too many files changed.

The GitHub API will only let us fetch up to 300 changed files, and this pull request has 5443.

@gemini-code-assist
Copy link

Summary of Changes

Hello @antfin-oss, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates the latest changes from the master branch into main, primarily focusing on modernizing and streamlining the project's build and continuous integration infrastructure. It involves a significant overhaul of how Docker images are built and managed, a shift in Python dependency resolution, and updates to code quality checks and documentation generation. These changes aim to improve build reliability, maintainability, and consistency across different development environments and platforms.

Highlights

  • CI/CD Infrastructure Updates: Significant refactoring and updates to Buildkite pipelines, including moving image definitions to dedicated YAML files, adding Python 3.10 support to various matrix builds, and introducing new build steps for Ray Core, Dashboard, and Java components. Several build steps now include a cibase tag for better categorization.
  • Dependency Management Migration: Transitioned from Miniconda to Miniforge3 for Docker builds and wheel verification across Linux and macOS. A new raydepsets tool has been introduced for managing Python dependencies, replacing previous manual compilation scripts.
  • Bazel Configuration Enhancements: The build --incompatible_strict_action_env option is now a default. Added workspace_status_command for Linux builds and /utf-8 CXX option for Windows. C++ test configurations were updated, including new pkg_files and pkg_zip rules for packaging, and RPC-related definitions were moved to a dedicated src/ray/rpc/BUILD.bazel.
  • Docker Image Tagging and Registry Integration: Updated Docker image tagging logic to include RAYCI_BUILD_ID and new *-extra variations. Azure Container Registry integration has been added for Anyscale Docker image pushes, alongside existing AWS ECR and GCP registries.
  • Linting and Code Quality Tools: The pre-commit configuration was updated to include semgrep, vale, cython-lint, and eslint hooks. pydoclint was upgraded and split into local and CI stages, with improved exclude patterns. Codeowners were also refined for better module ownership.
  • Documentation Build Process: The ReadTheDocs Python version was downgraded to 3.10, and documentation dependencies now use a lock file. New Buildkite steps for documentation builds, API annotation checks, and link checks were added, along with a new Ray documentation style guide.
Ignored Files
  • Ignored by pattern: .gemini/** (1)
    • .gemini/config.yaml
  • Ignored by pattern: .github/workflows/** (1)
    • .github/workflows/stale_pull_request.yaml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with πŸ‘ and πŸ‘Ž on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is an automated merge from master to main, containing a large number of changes. The most significant changes involve a major refactoring of the CI/CD pipelines and the Bazel build system. Key improvements include modularizing the Buildkite pipelines, centralizing dependency management with a new raydepsets system, replacing miniconda with miniforge, and adopting pre-commit for linting. The Bazel build has been simplified and modularized, for example by refactoring the root BUILD.bazel file and using standard packaging rules. There are also numerous smaller fixes and improvements across the codebase, such as updating C++ code to use modern idioms and fixing reference counting in local mode. Overall, these changes represent a significant improvement in the project's build and CI infrastructure, enhancing maintainability, reproducibility, and developer experience. I have one suggestion regarding a build script that appears to be incomplete.

Comment on lines +1 to +13
#!/bin/bash

set -euo pipefail

if [[ "${USER:-}" =~ "@" ]]; then
echo "ERROR: \$USER ('${USER:-}') contains invalid char '@'" >&2
exit 1
fi

if [[ "${HOME:-}" =~ "@" ]]; then
echo "ERROR: \$HOME ('${HOME:-}') contains invalid char '@'" >&2
exit 1
fi

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This script is used as a workspace_status_command, which is expected to print key-value pairs for build stamping. However, this script only performs checks on environment variables and doesn't output any status keys on success. This will result in no build information being stamped into binaries. If this is intentional, could you add a comment explaining why? If not, the script should be updated to output the desired build status keys, for example:

echo "STABLE_GIT_COMMIT $(git rev-parse HEAD)"

@github-actions
Copy link

github-actions bot commented Dec 6, 2025

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale label Dec 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.