Skip to content

Commit

Permalink
Wrap errors in operand execution to protect scheduling service (#2964)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Apr 26, 2022
1 parent b93c02b commit 3278699
Show file tree
Hide file tree
Showing 18 changed files with 392 additions and 57 deletions.
1 change: 1 addition & 0 deletions mars/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# noinspection PyUnresolvedReferences
from ..typing import ChunkType, TileableType, EntityType, OperandType
from .base import ExecutionError
from .entity import (
Entity,
EntityData,
Expand Down
6 changes: 6 additions & 0 deletions mars/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,9 @@ def serial(self, obj: Base, context: Dict):

class MarsError(Exception):
pass


class ExecutionError(MarsError):
def __init__(self, nested_error: BaseException):
super().__init__(nested_error)
self.nested_error = nested_error
7 changes: 3 additions & 4 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import functools
import itertools
import logging
import typing
import uuid
from typing import List
from typing import Callable, Dict, List

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -129,8 +128,8 @@ def _group_kurt(x, *args, **kwargs):

def build_mock_agg_result(
groupby: GROUPBY_TYPE,
groupby_params: typing.Dict,
raw_func: typing.Callable,
groupby_params: Dict,
raw_func: Callable,
**raw_func_kw,
):
try:
Expand Down
13 changes: 13 additions & 0 deletions mars/lib/cython/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
30 changes: 30 additions & 0 deletions mars/lib/cython/libcpp.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# complementary header for C++ STL libs not included in Cython

from libc.stdint cimport uint_fast64_t


cdef extern from "<random>" namespace "std" nogil:
cdef cppclass mt19937_64:
ctypedef uint_fast64_t result_type

mt19937_64() except +
mt19937_64(result_type seed) except +
result_type operator()() except +
result_type min() except +
result_type max() except +
void discard(size_t z) except +
void seed(result_type seed) except +
3 changes: 2 additions & 1 deletion mars/oscar/backends/mars/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from ....utils import get_next_port, dataslots, ensure_coverage
from ..config import ActorPoolConfig
from ..message import CreateActorMessage
from ..message import CreateActorMessage, reset_random_seed as reset_message_seed
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler


Expand Down Expand Up @@ -168,6 +168,7 @@ def _start_sub_pool(

# make sure enough randomness for every sub pool
random.seed(uuid.uuid1().bytes)
reset_message_seed()

conf = actor_config.get_pool_config(process_index)
suspend_sigint = conf["suspend_sigint"]
Expand Down
214 changes: 214 additions & 0 deletions mars/oscar/backends/message.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from types import TracebackType
from typing import Any, Type

from ..core import ActorRef

DEFAULT_PROTOCOL: int = 0

class MessageType(Enum):
control = 0
result = 1
error = 2
create_actor = 3
destroy_actor = 4
has_actor = 5
actor_ref = 6
send = 7
tell = 8
cancel = 9

class ControlMessageType(Enum):
stop = 0
restart = 1
sync_config = 2
get_config = 3
wait_pool_recovered = 4
add_sub_pool_actor = 5

class _MessageBase:
message_type: MessageType
protocol: int
message_id: bytes
message_trace: list
profiling_context: Any

def __init__(
self,
message_id: bytes = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
profiling_context: Any = None,
): ...
def __repr__(self): ...

class ControlMessage(_MessageBase):
message_type = MessageType.control

address: str
control_message_type: ControlMessageType
content: Any

def __init__(
self,
message_id: bytes = None,
address: str = None,
control_message_type: ControlMessageType = None,
content: Any = None,
protocol: int = DEFAULT_PROTOCOL,
): ...

class ResultMessage(_MessageBase):
message_type = MessageType.result

result: Any

def __init__(
self,
message_id: bytes = None,
result: Any = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
profiling_context: Any = None,
): ...

class ErrorMessage(_MessageBase):
message_type = MessageType.error

address: str
pid: int
error_type: Type
error: BaseException
traceback: TracebackType

def __init__(
self,
message_id: bytes = None,
address: str = None,
pid: int = -1,
error_type: Type[BaseException] = None,
error: BaseException = None,
traceback: TracebackType = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...
def as_instanceof_cause(self) -> BaseException: ...

class CreateActorMessage(_MessageBase):
message_type = MessageType.create_actor

actor_cls: Type
actor_id: bytes
args: tuple
kwargs: dict
allocate_strategy: Any
from_main: bool

def __init__(
self,
message_id: bytes = None,
actor_cls: Type = None,
actor_id: bytes = None,
args: tuple = None,
kwargs: dict = None,
allocate_strategy: Any = None,
from_main: bool = False,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...

class DestroyActorMessage(_MessageBase):
message_type = MessageType.destroy_actor

actor_ref: ActorRef
from_main: bool

def __init__(
self,
message_id: bytes = None,
actor_ref: ActorRef = None,
from_main: bool = False,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...

class HasActorMessage(_MessageBase):
message_type = MessageType.has_actor

actor_ref: ActorRef

def __init__(
self,
message_id: bytes = None,
actor_ref: ActorRef = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...

class ActorRefMessage(_MessageBase):
message_type = MessageType.actor_ref

actor_ref: ActorRef

def __init__(
self,
message_id: bytes = None,
actor_ref: ActorRef = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...

class SendMessage(_MessageBase):
message_type = MessageType.send

actor_ref: ActorRef
content: Any

def __init__(
self,
message_id: bytes = None,
actor_ref: ActorRef = None,
content: object = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
profiling_context: Any = None,
): ...

class TellMessage(SendMessage):
message_type = MessageType.tell

class CancelMessage(_MessageBase):
message_type = MessageType.cancel

address: str
cancel_message_id: bytes

def __init__(
self,
message_id: bytes = None,
address: str = None,
cancel_message_id: bytes = None,
protocol: int = DEFAULT_PROTOCOL,
message_trace: list = None,
): ...

class DeserializeMessageFailed(RuntimeError):
def __init__(self, message_id: bytes): ...
def __str__(self): ...

def reset_random_seed(): ...
def new_message_id() -> bytes: ...
39 changes: 28 additions & 11 deletions mars/oscar/backends/message.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
# distutils: language = c++
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,29 +14,27 @@
# limitations under the License.

from enum import Enum
from random import getrandbits
from types import TracebackType
from typing import Any, Type

from libc.stdint cimport uint_fast64_t

from ...lib.cython.libcpp cimport mt19937_64
from ...lib.tblib import pickling_support
from ...serialization.core cimport Serializer
from ...utils import wrap_exception
from ..core cimport ActorRef

try:
from random import randbytes
except ImportError: # pragma: no cover
from random import getrandbits

def randbytes(long n) -> bytes:
return getrandbits(n * 8).to_bytes(n, "little")


# make sure traceback can be pickled
pickling_support.install()

cdef int _DEFAULT_PROTOCOL = 0
DEFAULT_PROTOCOL = _DEFAULT_PROTOCOL

cdef mt19937_64 _rnd_gen
cdef bint _rnd_is_seed_set = False


class MessageType(Enum):
control = 0
Expand Down Expand Up @@ -552,5 +551,23 @@ cdef class MessageSerializer(Serializer):
MessageSerializer.register(_MessageBase)


cpdef reset_random_seed():
cdef bytes seed_bytes
global _rnd_is_seed_set

seed_bytes = getrandbits(64).to_bytes(8, "little")
# memcpy(&seed, <char *>seed_bytes, 8)
_rnd_gen.seed((<uint_fast64_t *><char *>seed_bytes)[0])
_rnd_is_seed_set = True


cpdef bytes new_message_id():
return randbytes(32)
cdef uint_fast64_t res_array[4]
cdef int i

if not _rnd_is_seed_set:
reset_random_seed()

for i in range(4):
res_array[i] = _rnd_gen()
return <bytes>((<char *>&(res_array[0]))[:32])
Loading

0 comments on commit 3278699

Please sign in to comment.