-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathexceptions.py
More file actions
249 lines (174 loc) · 7.71 KB
/
exceptions.py
File metadata and controls
249 lines (174 loc) · 7.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""Exceptions for the Durable Executions SDK.
Avoid any non-stdlib references in this module, it is at the bottom of the dependency chain.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import datetime
class TerminationReason(Enum):
"""Reasons why a durable execution terminated."""
UNHANDLED_ERROR = "UNHANDLED_ERROR"
INVOCATION_ERROR = "INVOCATION_ERROR"
EXECUTION_ERROR = "EXECUTION_ERROR"
CHECKPOINT_FAILED = "CHECKPOINT_FAILED"
NON_DETERMINISTIC_EXECUTION = "NON_DETERMINISTIC_EXECUTION"
STEP_INTERRUPTED = "STEP_INTERRUPTED"
CALLBACK_ERROR = "CALLBACK_ERROR"
SERIALIZATION_ERROR = "SERIALIZATION_ERROR"
class DurableExecutionsError(Exception):
"""Base class for Durable Executions exceptions"""
class UnrecoverableError(DurableExecutionsError):
"""Base class for errors that terminate execution."""
def __init__(self, message: str, termination_reason: TerminationReason):
super().__init__(message)
self.termination_reason = termination_reason
class ExecutionError(UnrecoverableError):
"""Error that returns FAILED status without retry."""
def __init__(
self,
message: str,
termination_reason: TerminationReason = TerminationReason.EXECUTION_ERROR,
):
super().__init__(message, termination_reason)
class InvocationError(UnrecoverableError):
"""Error that should cause Lambda retry by throwing from handler."""
def __init__(
self,
message: str,
termination_reason: TerminationReason = TerminationReason.INVOCATION_ERROR,
):
super().__init__(message, termination_reason)
class CallbackError(ExecutionError):
"""Error in callback handling."""
def __init__(self, message: str, callback_id: str | None = None):
super().__init__(message, TerminationReason.CALLBACK_ERROR)
self.callback_id = callback_id
class CheckpointFailedError(InvocationError):
"""Error when checkpoint operation fails."""
def __init__(self, message: str, step_id: str | None = None):
super().__init__(message, TerminationReason.CHECKPOINT_FAILED)
self.step_id = step_id
class NonDeterministicExecutionError(ExecutionError):
"""Error when execution is non-deterministic."""
def __init__(self, message: str, step_id: str | None = None):
super().__init__(message, TerminationReason.NON_DETERMINISTIC_EXECUTION)
self.step_id = step_id
class CheckpointError(CheckpointFailedError):
"""Failure to checkpoint. Will terminate the lambda."""
def __init__(self, message: str):
super().__init__(message)
@classmethod
def from_exception(cls, exception: Exception) -> CheckpointError:
return cls(message=str(exception))
class ValidationError(DurableExecutionsError):
"""Incorrect arguments to a Durable Function operation."""
class InvalidStateError(DurableExecutionsError):
"""Raised when an operation is attempted on an object in an invalid state."""
class UserlandError(DurableExecutionsError):
"""Failure in user-land - i.e code passed into durable executions from the caller."""
class CallableRuntimeError(UserlandError):
"""This error wraps any failure from inside the callable code that you pass to a Durable Function operation."""
def __init__(
self,
message: str | None,
error_type: str | None,
data: str | None,
stack_trace: list[str] | None,
) -> None:
super().__init__(message)
self.message = message
self.error_type = error_type
self.data = data
self.stack_trace = stack_trace
class StepInterruptedError(InvocationError):
"""Raised when a step is interrupted before it checkpointed at the end."""
def __init__(self, message: str, step_id: str | None = None):
super().__init__(message, TerminationReason.STEP_INTERRUPTED)
self.step_id = step_id
class SuspendExecution(BaseException):
"""Raise this exception to suspend the current execution by returning PENDING to DAR.
Note this derives from BaseException - in keeping with system-exiting exceptions like
KeyboardInterrupt or SystemExit.
"""
def __init__(self, message: str):
super().__init__(message)
class TimedSuspendExecution(SuspendExecution):
"""Suspend execution until a specific timestamp.
This is a specialized form of SuspendExecution that includes a scheduled resume time.
Attributes:
scheduled_timestamp (float): Unix timestamp in seconds at which to resume.
"""
def __init__(self, message: str, scheduled_timestamp: float):
super().__init__(message)
self.scheduled_timestamp = scheduled_timestamp
@classmethod
def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
"""Create a timed suspension with the delay calculated from now.
Args:
message: Descriptive message for the suspension
delay_seconds: Duration to suspend in seconds from current time
Returns:
TimedSuspendExecution: Instance with calculated resume time
Example:
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
>>> # Will suspend for 30 seconds from now
"""
resume_time = time.time() + delay_seconds
return cls(message, scheduled_timestamp=resume_time)
@classmethod
def from_datetime(
cls, message: str, datetime_timestamp: datetime.datetime
) -> TimedSuspendExecution:
"""Create a timed suspension with the delay calculated from now.
Args:
message: Descriptive message for the suspension
datetime_timestamp: Unix datetime timestamp in seconds at which to resume
Returns:
TimedSuspendExecution: Instance with calculated resume time
"""
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())
class OrderedLockError(DurableExecutionsError):
"""An error from OrderedLock.
Typically raised when a previous lock in the sequentially ordered chain of lock acquire requests failed.
Because of the order guarantee of OrderedLock, subsequent queued up lock acquire requests cannot proceed,
and will get this error instead.
Attributes:
source_exception (Exception): The exception that caused the lock to break.
"""
def __init__(self, message: str, source_exception: Exception | None = None) -> None:
"""Initialize with the message and the exception source"""
msg = (
f"{message} {type(source_exception).__name__}: {source_exception}"
if source_exception
else message
)
super().__init__(msg)
self.source_exception: Exception | None = source_exception
@dataclass(frozen=True)
class CallableRuntimeErrorSerializableDetails:
"""Serializable error details."""
type: str
message: str
@classmethod
def from_exception(
cls, exception: Exception
) -> CallableRuntimeErrorSerializableDetails:
"""Create an instance from an Exception, using its type and message.
Args:
exception: An Exception instance
Returns:
A CallableRuntimeErrorDetails instance with the exception's type name and message
"""
return cls(type=exception.__class__.__name__, message=str(exception))
def __str__(self) -> str:
"""
Return a string representation of the object.
Returns:
A string in the format "type: message"
"""
return f"{self.type}: {self.message}"
class SerDesError(DurableExecutionsError):
"""Raised when serialization fails."""