-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexecution_engine.py
56 lines (44 loc) · 1.69 KB
/
execution_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from loguru import logger
from networkx import topological_sort
from reworkd_platform.services.kafka.event_schemas import WorkflowTaskEvent
from reworkd_platform.services.kafka.producers.task_producer import WorkflowTaskProducer
from reworkd_platform.services.sockets import websockets
from reworkd_platform.web.api.workflow.schemas import WorkflowFull
class ExecutionEngine:
def __init__(self, producer: WorkflowTaskProducer, workflow: WorkflowTaskEvent):
self.producer = producer
self.workflow = workflow
async def start(self) -> None:
await self.producer.send(event=self.workflow)
async def loop(self) -> None:
curr = self.workflow.queue.pop(0)
logger.info(f"Running task: {curr.ref}")
websockets.emit(
self.workflow.workflow_id,
"my-event",
{"nodeId": curr.id, "status": "running"},
)
# TODO: do work
# await sleep(0.5)
websockets.emit(
self.workflow.workflow_id,
"my-event",
{"nodeId": curr.id, "status": "success"},
)
if self.workflow.queue:
await self.start()
@classmethod
def create_execution_plan(
cls, producer: WorkflowTaskProducer, workflow: WorkflowFull
) -> "ExecutionEngine":
node_map = {n.id: n for n in workflow.nodes}
graph = workflow.to_graph()
sorted_nodes = [node_map[n] for n in topological_sort(graph)]
return cls(
producer=producer,
workflow=WorkflowTaskEvent.from_workflow(
workflow_id=workflow.id,
user_id=workflow.user_id,
work_queue=sorted_nodes,
),
)