Skip to content

Commit 28a066f

Browse files
committed
[StopAutomation] convert to DSL
1 parent 1acb91a commit 28a066f

17 files changed

Lines changed: 282 additions & 147 deletions

File tree

packages/commons/octobot_commons/dsl_interpreter/operators/re_callable_operator_mixin.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def is_re_calling_operator_result(result: typing.Any) -> bool:
3838
Check if the result is a re-calling operator result.
3939
"""
4040
return isinstance(result, dict) and (
41-
"reset_to_id" in result or "last_execution_result" in result
41+
ReCallingOperatorResult.__name__ in result
4242
)
4343

4444
def get_next_call_time(self) -> typing.Optional[float]:
@@ -88,7 +88,9 @@ def get_last_execution_result(
8888
(result_dict := param_by_name.get(self.LAST_EXECUTION_RESULT_KEY, None))
8989
and ReCallingOperatorResult.is_re_calling_operator_result(result_dict)
9090
):
91-
return ReCallingOperatorResult.from_dict(result_dict).last_execution_result
91+
return ReCallingOperatorResult.from_dict(result_dict[
92+
ReCallingOperatorResult.__name__
93+
]).last_execution_result
9294
return None
9395

9496
def build_re_callable_result(
@@ -101,11 +103,13 @@ def build_re_callable_result(
101103
"""
102104
Builds a dict formatted re-callable result from the given parameters.
103105
"""
104-
return ReCallingOperatorResult(
105-
reset_to_id=reset_to_id,
106-
last_execution_result={
107-
ReCallingOperatorResultKeys.WAITING_TIME.value: waiting_time,
108-
ReCallingOperatorResultKeys.LAST_EXECUTION_TIME.value: last_execution_time,
109-
**kwargs,
110-
},
111-
).to_dict(include_default_values=False)
106+
return {
107+
ReCallingOperatorResult.__name__: ReCallingOperatorResult(
108+
reset_to_id=reset_to_id,
109+
last_execution_result={
110+
ReCallingOperatorResultKeys.WAITING_TIME.value: waiting_time,
111+
ReCallingOperatorResultKeys.LAST_EXECUTION_TIME.value: last_execution_time,
112+
**kwargs,
113+
},
114+
).to_dict(include_default_values=False)
115+
}

packages/commons/tests/dsl_interpreter/operators/test_re_callable_operator_mixin.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
class TestReCallingOperatorResult:
2424
def test_is_re_calling_operator_result_with_reset_to_id(self):
2525
assert re_callable_operator_mixin.ReCallingOperatorResult.is_re_calling_operator_result(
26-
{"reset_to_id": "some_id"}
26+
{re_callable_operator_mixin.ReCallingOperatorResult.__name__: {"reset_to_id": "some_id"}}
2727
) is True
2828

2929
def test_is_re_calling_operator_result_with_last_execution_result(self):
3030
assert re_callable_operator_mixin.ReCallingOperatorResult.is_re_calling_operator_result(
31-
{"last_execution_result": {"waiting_time": 5, "last_execution_time": 1000.0}}
31+
{
32+
re_callable_operator_mixin.ReCallingOperatorResult.__name__: {
33+
"last_execution_result": {"waiting_time": 5, "last_execution_time": 1000.0},
34+
}
35+
}
3236
) is True
3337

3438
def test_is_re_calling_operator_result_false_for_non_dict(self):
@@ -124,7 +128,9 @@ def test_get_last_execution_result_returns_inner_dict_for_valid_format(self):
124128
}
125129
result = operator.get_last_execution_result({
126130
dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY: {
127-
"last_execution_result": inner,
131+
re_callable_operator_mixin.ReCallingOperatorResult.__name__: {
132+
"last_execution_result": inner,
133+
},
128134
},
129135
})
130136
assert result == inner
@@ -134,8 +140,10 @@ def test_get_last_execution_result_with_reset_to_id_format(self):
134140
inner = {"waiting_time": 3.0, "last_execution_time": 500.0}
135141
result = operator.get_last_execution_result({
136142
dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY: {
137-
"reset_to_id": "abc",
138-
"last_execution_result": inner,
143+
re_callable_operator_mixin.ReCallingOperatorResult.__name__: {
144+
"reset_to_id": "abc",
145+
"last_execution_result": inner,
146+
},
139147
},
140148
})
141149
assert result == inner
@@ -146,11 +154,12 @@ def test_build_re_callable_result(self):
146154
last_execution_time=1000.0,
147155
waiting_time=5.0,
148156
)
149-
assert "last_execution_result" in result
150-
assert result["last_execution_result"][
157+
inner = result[re_callable_operator_mixin.ReCallingOperatorResult.__name__]
158+
assert "last_execution_result" in inner
159+
assert inner["last_execution_result"][
151160
re_callable_operator_mixin.ReCallingOperatorResultKeys.LAST_EXECUTION_TIME.value
152161
] == 1000.0
153-
assert result["last_execution_result"][
162+
assert inner["last_execution_result"][
154163
re_callable_operator_mixin.ReCallingOperatorResultKeys.WAITING_TIME.value
155164
] == 5.0
156165

@@ -161,8 +170,9 @@ def test_build_re_callable_result_with_reset_to_id(self):
161170
last_execution_time=1000.0,
162171
waiting_time=5.0,
163172
)
164-
assert result["reset_to_id"] == "target_123"
165-
assert "last_execution_result" in result
173+
inner = result[re_callable_operator_mixin.ReCallingOperatorResult.__name__]
174+
assert inner["reset_to_id"] == "target_123"
175+
assert "last_execution_result" in inner
166176

167177
def test_build_re_callable_result_with_extra_kwargs(self):
168178
operator = _TestReCallableOperator()
@@ -171,4 +181,5 @@ def test_build_re_callable_result_with_extra_kwargs(self):
171181
waiting_time=5.0,
172182
extra_field=42,
173183
)
174-
assert result["last_execution_result"]["extra_field"] == 42
184+
inner = result[re_callable_operator_mixin.ReCallingOperatorResult.__name__]
185+
assert inner["last_execution_result"]["extra_field"] == 42

packages/flow/octobot_flow/entities/actions/action_details.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ class AbstractActionDetails(octobot_commons.dataclasses.FlexibleDataclass):
2020
# unique id of the action
2121
id: str = dataclasses.field(repr=True)
2222
# result of the action. Set after the action is executed
23-
result: typing.Optional[dict] = dataclasses.field(default=None, repr=True)
23+
result: typing.Optional[
24+
octobot_commons.dsl_interpreter.ComputedOperatorParameterType
25+
] = dataclasses.field(default=None, repr=True)
2426
# error status of the action. Set after the action is executed, in case an error occured
2527
error_status: typing.Optional[str] = dataclasses.field(default=None, repr=True) # ActionErrorStatus
2628
# time at which the action was executed

packages/flow/octobot_flow/entities/automations/post_iteration_actions_details.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import dataclasses
2-
import octobot_commons.dataclasses
32
import typing
43

4+
import octobot_commons.dataclasses
5+
56

67
@dataclasses.dataclass
78
class RefreshExchangeBotsAuthenticatedDataDetails:
@@ -21,7 +22,7 @@ class NextIterationDetails(octobot_commons.dataclasses.FlexibleDataclass):
2122

2223

2324
@dataclasses.dataclass
24-
class PostIterationActionsDetails(octobot_commons.dataclasses.FlexibleDataclass):
25+
class PostIterationActionsDetails(octobot_commons.dataclasses.MinimizableDataclass):
2526
stop_automation: bool = False
2627
postpone_execution: bool = False
2728
postpone_reason: typing.Optional[str] = None

packages/flow/octobot_flow/enums.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class ChangedElements(enum.Enum):
2626

2727
class ActionType(enum.Enum):
2828
APPLY_CONFIGURATION = "apply_configuration"
29-
STOP_AUTOMATION = "stop_automation"
3029
UNKNOWN = "unknown"
3130

3231

packages/flow/octobot_flow/jobs/automation_job.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@ async def run(self) -> list[octobot_flow.entities.AbstractActionDetails]:
5959
to_execute_actions, are_priority_actions = self._get_actions_to_execute()
6060
if are_priority_actions:
6161
self._logger.info(f"Running {len(to_execute_actions)} priority actions: {to_execute_actions}")
62+
self._resolve_dsl_scripts(to_execute_actions, True)
6263
else:
6364
# fetch the actions and signals if any
6465
await self._fetch_actions(maybe_authenticator)
6566
# resolve the DSL scripts in case it has dependencies on other actions
6667
self._resolve_dsl_scripts(
67-
self.automation_state.automation.actions_dag.get_executable_actions()
68+
self.automation_state.automation.actions_dag.get_executable_actions(),
69+
True
6870
)
6971
# fetch the dependencies of the automation environment
7072
fetched_dependencies = await self._fetch_dependencies(maybe_community_repository, to_execute_actions)
@@ -285,10 +287,17 @@ def _get_pending_priority_actions(self) -> list[octobot_flow.entities.AbstractAc
285287
action for action in self.automation_state.priority_actions if not action.is_completed()
286288
]
287289

288-
def _resolve_dsl_scripts(self, actions: list[octobot_flow.entities.AbstractActionDetails]):
289-
self.automation_state.automation.actions_dag.resolve_dsl_scripts(
290-
actions
291-
)
290+
def _resolve_dsl_scripts(
291+
self, actions: list[octobot_flow.entities.AbstractActionDetails],
292+
from_actions_dag: bool
293+
):
294+
if from_actions_dag:
295+
self.automation_state.automation.actions_dag.resolve_dsl_scripts(
296+
actions
297+
)
298+
else:
299+
local_dag = octobot_flow.entities.ActionsDAG(actions=actions)
300+
local_dag.resolve_dsl_scripts(actions)
292301

293302
def _clear_resolved_dsl_scripts(self, actions: list[octobot_flow.entities.AbstractActionDetails]):
294303
for action in actions:

packages/flow/octobot_flow/logic/actions/actions_executor.py

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import octobot_flow.entities
1111
import octobot_flow.repositories.community
1212
import octobot_flow.logic.dsl
13-
import octobot_flow.logic.configured_actions
1413
import octobot_flow.enums
1514
import octobot_flow.errors
1615

@@ -48,30 +47,9 @@ async def execute(self):
4847
async with dsl_executor.dependencies_context(self._actions):
4948
for index, action in enumerate(self._actions):
5049
await self._execute_action(dsl_executor, action)
51-
if octobot_commons.dsl_interpreter.ReCallingOperatorResult.is_re_calling_operator_result(
52-
action.result
53-
):
54-
recall_dag_details = octobot_commons.dsl_interpreter.ReCallingOperatorResult.from_dict(
55-
action.result
56-
)
57-
if not recall_dag_details.reset_to_id:
58-
# reset to the current action if no specific id is provided (loop on this action)
59-
recall_dag_details.reset_to_id = action.id
60-
if recall_dag_details.reset_to_id == action.id:
61-
# Keep executing other selected actions if any: those are not affected by the reset
62-
# as they don't depend on the reset action
63-
continue
64-
# Reset to a past action: interrupt execution of the following actions
65-
# as they might depend on the reset action
66-
if index < len(self._actions) - 1:
67-
interrupted_action = self._actions[index + 1: ]
68-
self._get_logger().info(
69-
f"DAG reset required. Interrupting execution of "
70-
f"{len(interrupted_action)} actions: "
71-
f"{', '.join([action.id for action in interrupted_action])}"
72-
)
73-
break
74-
50+
recall_dag_details, should_stop_processing = self._handle_execution_result(action, index)
51+
if should_stop_processing:
52+
break
7553
self._sync_after_execution()
7654
await self._update_actions_history()
7755
await self._insert_execution_bot_logs(dsl_executor.pending_bot_logs)
@@ -85,18 +63,51 @@ async def execute(self):
8563
# no reset: schedule immediately
8664
self.next_execution_scheduled_to = 0
8765

66+
def _handle_execution_result(
67+
self, action: octobot_flow.entities.AbstractActionDetails, index: int
68+
) -> tuple[typing.Optional[octobot_commons.dsl_interpreter.ReCallingOperatorResult], bool]:
69+
if not isinstance(action.result, dict):
70+
return None, False
71+
if octobot_flow.entities.PostIterationActionsDetails.__name__ in action.result:
72+
post_iteration_actions_details = octobot_flow.entities.PostIterationActionsDetails.from_dict(
73+
action.result[octobot_flow.entities.PostIterationActionsDetails.__name__]
74+
)
75+
if post_iteration_actions_details.stop_automation:
76+
self._get_logger().info(f"Stopping automation: {self._automation.metadata.automation_id}")
77+
self._automation.post_actions.stop_automation = True
78+
# todo cancel open orders and sell assets if required in action config
79+
return None, True
80+
return None, False
81+
if octobot_commons.dsl_interpreter.ReCallingOperatorResult.is_re_calling_operator_result(action.result):
82+
recall_dag_details = octobot_commons.dsl_interpreter.ReCallingOperatorResult.from_dict(
83+
action.result[octobot_commons.dsl_interpreter.ReCallingOperatorResult.__name__]
84+
)
85+
if not recall_dag_details.reset_to_id:
86+
# reset to the current action if no specific id is provided (loop on this action)
87+
recall_dag_details.reset_to_id = action.id
88+
if recall_dag_details.reset_to_id == action.id:
89+
# Keep executing other selected actions if any: those are not affected by the reset
90+
# as they don't depend on the reset action
91+
return recall_dag_details, False
92+
# Reset to a past action: interrupt execution of the following actions
93+
# as they might depend on the reset action
94+
if index < len(self._actions) - 1:
95+
interrupted_action = self._actions[index + 1: ]
96+
self._get_logger().info(
97+
f"DAG reset required. Interrupting execution of "
98+
f"{len(interrupted_action)} actions: "
99+
f"{', '.join([action.id for action in interrupted_action])}"
100+
)
101+
return recall_dag_details, True
102+
return None, False
103+
88104
async def _execute_action(
89105
self,
90106
dsl_executor: "octobot_flow.logic.dsl.DSLExecutor",
91107
action: octobot_flow.entities.AbstractActionDetails
92108
):
93109
if isinstance(action, octobot_flow.entities.DSLScriptActionDetails):
94110
return await dsl_executor.execute_action(action)
95-
elif isinstance(action, octobot_flow.entities.ConfiguredActionDetails):
96-
configured_actions_executor = octobot_flow.logic.configured_actions.ConfiguredActionsExecutor(
97-
self._exchange_manager, self._automation
98-
)
99-
return await configured_actions_executor.execute_action(action)
100111
raise octobot_flow.errors.UnsupportedActionTypeError(
101112
f"{self.__class__.__name__} does not support action type: {type(action)}"
102113
) from None

packages/flow/octobot_flow/logic/configured_actions/__init__.py

Lines changed: 0 additions & 5 deletions
This file was deleted.

packages/flow/octobot_flow/logic/configured_actions/configured_actions_executor.py

Lines changed: 0 additions & 50 deletions
This file was deleted.

packages/flow/tests/functionnal_tests/actions_reset/test_exchange_actions_split_by_wait.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ async def test_exchange_actions_creating_and_waiting_and_cancelling_limit(
103103
rescheduled_parameters = wait_action.get_rescheduled_parameters()
104104
assert dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY in rescheduled_parameters
105105
last_execution_result = dsl_interpreter.ReCallingOperatorResult.from_dict(
106-
rescheduled_parameters[dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY]
106+
rescheduled_parameters[dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY][
107+
dsl_interpreter.ReCallingOperatorResult.__name__
108+
]
107109
)
108110
assert isinstance(last_execution_result.last_execution_result, dict)
109111
waiting_time_1 = last_execution_result.last_execution_result[dsl_interpreter.ReCallingOperatorResultKeys.WAITING_TIME.value]
@@ -135,7 +137,9 @@ async def test_exchange_actions_creating_and_waiting_and_cancelling_limit(
135137
rescheduled_parameters = wait_action.get_rescheduled_parameters()
136138
assert dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY in rescheduled_parameters
137139
last_execution_result = dsl_interpreter.ReCallingOperatorResult.from_dict(
138-
rescheduled_parameters[dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY]
140+
rescheduled_parameters[dsl_interpreter.ReCallableOperatorMixin.LAST_EXECUTION_RESULT_KEY][
141+
dsl_interpreter.ReCallingOperatorResult.__name__
142+
]
139143
)
140144
assert isinstance(last_execution_result.last_execution_result, dict)
141145
waiting_time_2 = last_execution_result.last_execution_result[dsl_interpreter.ReCallingOperatorResultKeys.WAITING_TIME.value]

0 commit comments

Comments
 (0)