diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index e1d5ca281407..3b671113ec69 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -90,7 +90,13 @@ raise_sys_exit_with_custom_error_message, ) from ray.actor import ActorClass -from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError +from ray.exceptions import ( + ActorHandleNotFoundError, + ObjectStoreFullError, + RayError, + RaySystemError, + RayTaskError, +) from ray.experimental import tqdm_ray from ray.experimental.compiled_dag_ref import CompiledDAGRef from ray.experimental.internal_kv import ( @@ -3289,7 +3295,20 @@ def kill(actor: "ray.actor.ActorHandle", *, no_restart: bool = True): "ray.kill() only supported for actors. For tasks, try ray.cancel(). " "Got: {}.".format(type(actor)) ) - worker.core_worker.kill_actor(actor._ray_actor_id, no_restart) + + try: + worker.core_worker.kill_actor(actor._ray_actor_id, no_restart) + except ActorHandleNotFoundError as e: + actor_job_id = actor._ray_actor_id.job_id + current_job_id = worker.current_job_id + raise ActorHandleNotFoundError( + f"ActorHandle objects are not valid across Ray sessions. " + f"The actor handle was created in job {actor_job_id.hex()}, " + f"but the current job is {current_job_id.hex()}. " + f"This typically happens when you try to use an actor handle " + f"from a previous session after calling ray.shutdown() and ray.init(). " + f"Please create a new actor handle in the current session." + ) from e @PublicAPI diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ae7c689ceb10..ead1f05cd998 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -196,8 +196,9 @@ include "includes/rpc_token_authentication.pxi" import ray from ray.exceptions import ( - RayActorError, + ActorHandleNotFoundError, ActorDiedError, + RayActorError, RayError, RaySystemError, RayTaskError, @@ -3796,10 +3797,16 @@ cdef class CoreWorker: def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: CActorID c_actor_id = actor_id.native() + CRayStatus status = CRayStatus.OK() with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().KillActor( - c_actor_id, True, no_restart)) + status = CCoreWorkerProcess.GetCoreWorker().KillActor( + c_actor_id, True, no_restart) + + if status.IsNotFound(): + raise ActorHandleNotFoundError(status.message().decode()) + + check_status(status) def cancel_task(self, ObjectRef object_ref, c_bool force_kill, c_bool recursive): diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 5ad80a2fab7e..a7b82bc4a42f 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -1032,6 +1032,28 @@ def __str__(self): return self.error_message +@DeveloperAPI +class ActorHandleNotFoundError(ValueError, RayError): + """Raised when trying to kill an actor handle that doesn't exist. + + This typically happens when using an actor handle from a previous Ray session + after calling ray.shutdown() and ray.init(). + + Note that this error is not only a subclass of RayError, but also a subclass of ValueError, + to maintain backward compatibility. + + Args: + error_message: The error message that contains information about the actor handle. + """ + + def __init__(self, error_message: str): + super().__init__(error_message) + self.error_message = error_message + + def __str__(self): + return self.error_message + + RAY_EXCEPTION_TYPES = [ PlasmaObjectNotAvailable, RayError, @@ -1063,5 +1085,6 @@ def __str__(self): RayCgraphCapacityExceeded, UnserializableException, ActorAlreadyExistsError, + ActorHandleNotFoundError, AuthenticationError, ] diff --git a/python/ray/tests/test_ray_shutdown.py b/python/ray/tests/test_ray_shutdown.py index ba95b5570b20..529008242724 100644 --- a/python/ray/tests/test_ray_shutdown.py +++ b/python/ray/tests/test_ray_shutdown.py @@ -508,5 +508,33 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") +def test_kill_actor_after_restart(shutdown_only): + """Test that killing an actor from a previous session raises a helpful error.""" + # Set include_dashboard=False to have faster startup. + ray.init(num_cpus=1, include_dashboard=False) + + @ray.remote + class A: + pass + + a = A.remote() + + # Restart ray + ray.shutdown() + ray.init(num_cpus=1, include_dashboard=False) + + # Attempting to kill an actor from the previous session should raise + # a helpful error message instead of crashing the interpreter. + with pytest.raises( + ray.exceptions.ActorHandleNotFoundError, + match="ActorHandle objects are not valid across Ray sessions", + ): + ray.kill(a) + + ray.shutdown() + wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index cee44b4cb8da..34b46e3569b9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2574,12 +2574,16 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r } else { std::stringstream stream; stream << "Failed to find a corresponding actor handle for " << actor_id; - cb(Status::Invalid(stream.str())); + cb(Status::NotFound(stream.str())); } }, "CoreWorker.KillActor"); const auto &status = f.get(); - actor_manager_->OnActorKilled(actor_id); + // Only call OnActorKilled if the kill was successful (status is OK). + // If the actor handle doesn't exist, OnActorKilled would crash. + if (status.ok()) { + actor_manager_->OnActorKilled(actor_id); + } return status; }