Skip to content

Commit

Permalink
Merge pull request #8347 from PeterChung241/add_worker_details_api
Browse files Browse the repository at this point in the history
Added api for worker details
  • Loading branch information
rasswanth-s authored Dec 18, 2023
2 parents f25d42e + b08519c commit a636bda
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
45 changes: 45 additions & 0 deletions notebooks/api/0.8/10-container-images.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"import syft as sy\n",
"sy.requires(SYFT_VERSION)\n",
"from syft.service.worker.worker_image import SyftWorkerImage\n",
"from syft.service.worker.worker_pool import WorkerStatus, SyftWorker\n",
"from syft.custom_worker.config import DockerWorkerConfig"
]
},
Expand Down Expand Up @@ -321,6 +322,50 @@
"assert len(worker_pool.workers)==3"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fd731689-f9d6-4327-b199-d1458bc9a1f5",
"metadata": {},
"outputs": [],
"source": [
"# get first worker from worker pool\n",
"get_worker = domain_client.api.services.worker_pool.get_worker(worker_pool_id=worker_pool.id, worker_id=worker_pool.workers[0].id)\n",
"get_worker"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b953f060-cff5-483b-9b0a-cdb2dbb44356",
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(get_worker, SyftWorker)\n",
"assert get_worker.name == worker_pool.workers[0].name"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "530dec26-8c9e-43d9-9bb9-ddd7576dd01e",
"metadata": {},
"outputs": [],
"source": [
"worker_status = domain_client.api.services.worker_pool.get_worker_status(worker_pool_id=worker_pool.id, worker_id=get_worker.id)\n",
"worker_status"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4044b45b-d3f0-424e-93c7-79bddc68755c",
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(worker_status, WorkerStatus)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
72 changes: 72 additions & 0 deletions packages/syft/src/syft/service/worker/worker_pool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .worker_pool import SyftWorker
from .worker_pool import WorkerOrchestrationType
from .worker_pool import WorkerPool
from .worker_pool import WorkerStatus
from .worker_pool_stash import SyftWorkerPoolStash


Expand Down Expand Up @@ -178,6 +179,54 @@ def delete_worker(
message=f"Worker with id: {worker_id} deleted successfully from pool: {worker_pool.name}"
)

@service_method(
path="worker_pool.get_worker",
name="get_worker",
roles=DATA_OWNER_ROLE_LEVEL,
)
def get_worker(
self, context: AuthedServiceContext, worker_pool_id: UID, worker_id: UID
) -> Union[SyftWorker, SyftError]:
worker_pool_worker = self._get_worker_pool_and_worker(
context, worker_pool_id, worker_id
)
if isinstance(worker_pool_worker, SyftError):
return worker_pool_worker

worker_pool, worker = worker_pool_worker

worker_status = _get_worker_container_status(worker)
if isinstance(worker_status, SyftError):
return worker_status
elif worker_status != WorkerStatus.PENDING:
worker.status = worker_status

result = self.stash.update(
credentials=context.credentials,
obj=worker_pool,
)

return (
SyftError(
message=f"Failed to update worker status. Error: {result.err()}"
)
if result.is_err()
else worker
)

return worker

@service_method(
path="worker_pool.get_worker_status",
name="get_worker_status",
roles=DATA_OWNER_ROLE_LEVEL,
)
def get_worker_status(
self, context: AuthedServiceContext, worker_pool_id: UID, worker_id: UID
) -> Union[WorkerStatus, SyftError]:
worker = self.get_worker(context, worker_pool_id, worker_id)
return worker if isinstance(worker, SyftError) else worker.status

@service_method(
path="worker_pool.worker_logs",
name="worker_logs",
Expand Down Expand Up @@ -273,3 +322,26 @@ def _get_worker_container(
f"Unable to access worker {worker.id} container. "
+ f"Container server error {e}"
)


def _get_worker_container_status(
worker: SyftWorker, docker_client: Optional[docker.DockerClient] = None
) -> Union[WorkerStatus, SyftError]:
container = _get_worker_container(worker, docker_client)
if isinstance(container, SyftError):
return container

container_status = container.status
syft_container_status = None
if container_status == "running":
syft_container_status = WorkerStatus.RUNNING
elif container_status in ["paused", "removing", "exited", "dead"]:
syft_container_status = WorkerStatus.STOPPED
elif container_status == "restarting":
syft_container_status = WorkerStatus.RESTARTED
elif container_status == "created":
syft_container_status = WorkerStatus.PENDING
else:
return SyftError(message=f"Unknown container status: {container_status}")

return syft_container_status

0 comments on commit a636bda

Please sign in to comment.