-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathserverless.py
235 lines (182 loc) · 7.65 KB
/
serverless.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from __future__ import annotations
from flotta import __version__
from flotta.config import DataSourceStorage, Configuration, config_manager
from flotta.const import TYPE_CLIENT, TYPE_USER
from flotta.core.artifacts import Artifact, ArtifactStatus
from flotta.database.repositories import (
ArtifactRepository,
ComponentRepository,
DataSourceRepository,
JobRepository,
ProjectRepository,
)
from flotta.node.services import (
JobManagementService,
WorkbenchService,
TaskManagementService,
ResourceManagementService,
)
from flotta.node.startup import NodeStartup
from flotta.schemas.components import Component
from flotta.schemas.database import Resource
from flotta.schemas.metadata import Metadata
from flotta.schemas.project import Project
from flotta.schemas.resources import ResourceIdentifier
from flotta.schemas.updates import UpdateData
from flotta.tasks.services.execution import load_environment
from flotta.tasks.tasks import Task, TaskError
from tests.utils import create_project
from sqlalchemy.ext.asyncio import AsyncSession
class ServerlessWorker:
def __init__(
self,
index: int,
node: ServerlessExecution,
data: DataSourceStorage | Metadata | None = None,
) -> None:
self.index: int = index
self.node: ServerlessExecution = node
self.config: Configuration = config_manager.get()
self.md: Metadata | None = None
self.data: DataSourceStorage | None = None
if isinstance(data, DataSourceStorage):
self.data = data
self.md = data.metadata()
if isinstance(data, Metadata):
self.md = data
self.component: Component
async def setup(self, cr: ComponentRepository, component: Component | None = None):
if component is None:
self.component = await cr.create_component(
f"client-{self.index}",
TYPE_CLIENT,
f"key-{self.index}",
__version__,
f"client-{self.index}",
f"ip-{self.index}",
"",
)
else:
self.component = component
def has_metadata(self) -> bool:
return self.md is not None
def metadata(self) -> Metadata:
if self.md is None:
raise ValueError("This client ha been created without metadata")
return self.md
async def next_action(self) -> UpdateData:
return await self.node.get_update(self.component)
async def get_task(self, next_action: UpdateData) -> Task:
return await self.node.get_task(next_action.job_id)
async def complete_task(self, task: Task) -> None:
await self.node.task_completed(task)
async def execute(self, task: Task) -> Resource:
env = load_environment(
self.data,
task,
self.config.storage_job(
task.artifact_id,
task.job_id,
task.iteration,
),
)
for resource in task.required_resources:
env.add_resource(
resource.resource_id,
self.config.storage_job(
resource.artifact_id,
resource.job_id,
resource.iteration,
)
/ f"{resource.resource_id}.pkl",
)
env = task.run(env)
env.store()
assert env.products is not None
return Resource(
id=env.product_id,
component_id=self.component.id,
creation_time=None,
path=env.product_path(),
is_external=False,
is_error=False,
is_ready=True,
)
async def next_get_execute_post(self) -> Resource:
next_action = await self.next_action()
if next_action is None:
raise ValueError("next_action is not an execution action!")
task = await self.get_task(next_action)
res = await self.execute(task)
await self.complete_task(task)
return res
class ServerlessExecution:
def __init__(self, session: AsyncSession) -> None:
self.session = session
self.cr: ComponentRepository = ComponentRepository(session)
self.ar: ArtifactRepository = ArtifactRepository(session)
self.jr: JobRepository = JobRepository(session)
self.dsr: DataSourceRepository = DataSourceRepository(self.session)
self.pr: ProjectRepository = ProjectRepository(self.session)
self.workers: dict[str, ServerlessWorker] = dict()
self.self_component: Component
self.user_component: Component
self.jobs_service: JobManagementService
self.task_service: TaskManagementService
self.workbench_service: WorkbenchService
self.resource_service: ResourceManagementService
async def setup(self):
await NodeStartup(self.session).startup()
self.self_component = await self.cr.get_self_component()
self.user_component = await self.cr.create_component(
"user-1",
TYPE_USER,
"user-1-public_key",
__version__,
"user-1",
"ip-user-1",
"",
)
self.self_worker = await self.add_worker(component=self.self_component)
self.jobs_service = JobManagementService(self.session, self.self_component)
self.task_service = TaskManagementService(self.session, self.self_component, "", "")
self.workbench_service = WorkbenchService(self.session, self.user_component, self.self_component)
self.resource_service = ResourceManagementService(self.session)
async def add_worker(
self,
data: DataSourceStorage | Metadata | None = None,
component: Component | None = None,
) -> ServerlessWorker:
sw = ServerlessWorker(len(self.workers), self, data)
await sw.setup(self.cr, component)
if sw.has_metadata():
metadata = sw.metadata()
await self.cr.create_event(sw.component.id, "update metadata")
# this will also update existing metadata
await self.dsr.create_or_update_from_metadata(sw.component.id, metadata)
await self.pr.add_datasources_from_metadata(metadata)
self.workers[sw.component.id] = sw
return sw
async def create_project(self, project_token: str) -> None:
await create_project(self.session, project_token)
async def get_project(self, project_token: str) -> Project:
return await self.workbench_service.project(project_token)
async def submit(self, artifact: Artifact) -> str:
status = await self.workbench_service.submit_artifact(artifact)
assert status.id is not None
return status.id
async def get_status(self, artifact_id: str) -> ArtifactStatus:
return await self.workbench_service.get_status_artifact(artifact_id)
async def get_update(self, component: Component):
return await self.jobs_service.update(component)
async def next(self, component: Component) -> str | None:
return await self.jobs_service.next_task_for_component(component.id)
async def get_task(self, job_id: str) -> Task:
return await self.jobs_service.get_task_by_job_id(job_id)
async def task_completed(self, task: Task) -> None:
await self.jobs_service.task_completed(task.job_id)
await self.jobs_service.check(task.artifact_id)
async def task_failed(self, job_id: str) -> None:
await self.jobs_service.task_failed(TaskError(job_id=job_id))
async def get_resource(self, resource_id: str) -> Resource:
return await self.resource_service.load_resource(ResourceIdentifier(resource_id=resource_id))