Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for submitting a prebuilt image #8802

Merged
merged 38 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0aa797d
Add support for submitting a prebuilt image
kiendang May 13, 2024
870f591
Remove the nbqa-black pre-commit hook
kiendang May 13, 2024
7f86a1e
Merge branch 'dev' into prebuilt-image-worker
shubham3121 May 14, 2024
6c48218
Rename get_by_docker_config to get_by_worker_config
kiendang May 14, 2024
85935b3
Add unit test for services.worker_image.get_by_config
kiendang May 14, 2024
81ae3ea
Rename
kiendang May 14, 2024
56a8ad6
Use get_by_image to retrieve the image in test
kiendang May 14, 2024
ce21987
Rename arg docker_config to worker_config
kiendang May 14, 2024
df646ea
Add integration tests for prebuilt images
kiendang May 16, 2024
6c13ba2
Merge branch 'dev' into prebuilt-image-worker
kiendang May 16, 2024
16bcf91
Fix worker pool integration tests
kiendang May 16, 2024
87861a3
Fix worker pool integration tests
kiendang May 16, 2024
749bc1e
Use a different prebuilt image for each tag
kiendang May 16, 2024
f2c4d5c
Add tag for images from PrebuiltWorkerConfig
kiendang May 16, 2024
9f24bab
No longer try to push prebuilt image to local registry
kiendang May 16, 2024
b0b658a
Use a different worker pool name for each test
kiendang May 16, 2024
74038fe
Get error while launching worker pool
kiendang May 16, 2024
b9554e5
debug
kiendang May 16, 2024
6d6a6a6
Increase k8s pool creation timeout
kiendang May 16, 2024
5636bf1
Test using a smaller image
kiendang May 16, 2024
ece69c1
Remove unused variables
kiendang May 21, 2024
9e84ea9
Remove unnecessary check
kiendang May 21, 2024
f9a35e1
Use different prebuilt images for tests
kiendang May 21, 2024
c67932f
Merge branch 'dev' into prebuilt-image-worker
kiendang May 22, 2024
dd138f5
Avoid username collision in tests
kiendang May 22, 2024
1286940
Get the correct job by id instead of the last job
kiendang May 22, 2024
7c535c7
Only require tag for DockerWorkerConfig pool request
kiendang May 22, 2024
5ecf05b
Update CreateCustomImageChange protocol version
kiendang May 22, 2024
135fbb2
Update tests
kiendang May 22, 2024
54c3aed
Merge branch 'dev' into prebuilt-image-worker
shubham3121 May 24, 2024
3618ad4
Return an error in case of invalid container name
kiendang May 24, 2024
39ac64a
Merge branch 'dev' into prebuilt-image-worker
shubham3121 May 27, 2024
3cb2a0a
Rename submit_container_image to just submit
kiendang May 27, 2024
6a7022a
Merge branch 'dev' into prebuilt-image-worker
kiendang May 27, 2024
a8d86ed
Use correct account client
kiendang May 27, 2024
c453ad5
Merge branch 'dev' into prebuilt-image-worker
kiendang May 27, 2024
9865e20
Merge branch 'dev' into prebuilt-image-worker
kiendang May 27, 2024
19d627f
Fix user permission in test
kiendang May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ repos:
# files: "^notebooks/(api|tutorials|admin)"
hooks:
- id: nbqa-isort
- id: nbqa-black
shubham3121 marked this conversation as resolved.
Show resolved Hide resolved

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
Expand Down
4 changes: 2 additions & 2 deletions notebooks/admin/Custom API + Custom Worker.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit_container_image(\n",
" worker_config=docker_config\n",
")\n",
"submit_result"
]
Expand Down
8 changes: 4 additions & 4 deletions notebooks/api/0.8/10-container-images.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit_container_image(\n",
" worker_config=docker_config\n",
")"
]
},
Expand Down Expand Up @@ -1095,8 +1095,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config_2\n",
"submit_result = domain_client.api.services.worker_image.submit_container_image(\n",
" worker_config=docker_config_2\n",
")\n",
"submit_result"
]
Expand Down
8 changes: 4 additions & 4 deletions notebooks/api/0.8/11-container-images-k8s.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit_container_image(\n",
" worker_config=docker_config\n",
")\n",
"submit_result"
]
Expand Down Expand Up @@ -935,8 +935,8 @@
"outputs": [],
"source": [
"submit_result = None\n",
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config_opendp\n",
"submit_result = domain_client.api.services.worker_image.submit_container_image(\n",
" worker_config=docker_config_opendp\n",
")\n",
"submit_result"
]
Expand Down
1 change: 1 addition & 0 deletions packages/syft/src/syft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .client.user_settings import UserSettings # noqa: F401
from .client.user_settings import settings # noqa: F401
from .custom_worker.config import DockerWorkerConfig # noqa: F401
from .custom_worker.config import PrebuiltWorkerConfig # noqa: F401
from .node.credentials import SyftSigningKey # noqa: F401
from .node.domain import Domain # noqa: F401
from .node.enclave import Enclave # noqa: F401
Expand Down
4 changes: 1 addition & 3 deletions packages/syft/src/syft/custom_worker/runner_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .k8s import get_kr8s_client

JSONPATH_AVAILABLE_REPLICAS = "{.status.availableReplicas}"
CREATE_POOL_TIMEOUT_SEC = 60
CREATE_POOL_TIMEOUT_SEC = 180
SCALE_POOL_TIMEOUT_SEC = 60


Expand Down Expand Up @@ -60,8 +60,6 @@ def create_pool(
f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}",
timeout=CREATE_POOL_TIMEOUT_SEC,
)
except Exception:
raise
finally:
if pull_secret:
pull_secret.delete(propagation_policy="Foreground")
Expand Down
67 changes: 49 additions & 18 deletions packages/syft/src/syft/service/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

# third party
from pydantic import model_validator
from result import Err
from result import Ok
from result import Result
Expand All @@ -15,6 +16,7 @@
from ...abstract_node import NodeSideType
from ...client.api import APIRegistry
from ...client.client import SyftClient
from ...custom_worker.config import DockerWorkerConfig
from ...custom_worker.config import WorkerConfig
from ...custom_worker.k8s import IN_KUBERNETES
from ...node.credentials import SyftVerifyKey
Expand Down Expand Up @@ -187,7 +189,7 @@ def __repr_syft_nested__(self) -> str:


@serializable()
class CreateCustomImageChange(Change):
class CreateCustomImageChangeV2(Change):
__canonical_name__ = "CreateCustomImageChange"
__version__ = SYFT_OBJECT_VERSION_2

Expand All @@ -198,41 +200,70 @@ class CreateCustomImageChange(Change):

__repr_attrs__ = ["config", "tag"]


@serializable()
class CreateCustomImageChange(Change):
__canonical_name__ = "CreateCustomImageChange"
__version__ = SYFT_OBJECT_VERSION_3

config: WorkerConfig
tag: str | None = None
registry_uid: UID | None = None
pull_image: bool = True

__repr_attrs__ = ["config", "tag"]

@model_validator(mode="after")
def _tag_required_for_dockerworkerconfig(self) -> Self:
if isinstance(self.config, DockerWorkerConfig) and self.tag is None:
raise ValueError("`tag` is required for `DockerWorkerConfig`.")
return self

def _run(
self, context: ChangeContext, apply: bool
) -> Result[SyftSuccess, SyftError]:
try:
worker_image_service = context.node.get_service("SyftWorkerImageService")

service_context = context.to_service_ctx()
result = worker_image_service.submit_dockerfile(
service_context, docker_config=self.config
result = worker_image_service.submit_container_image(
service_context, worker_config=self.config
)

if isinstance(result, SyftError):
return Err(result)

result = worker_image_service.stash.get_by_docker_config(
result = worker_image_service.stash.get_by_worker_config(
service_context.credentials, config=self.config
)

if result.is_err():
return Err(SyftError(message=f"{result.err()}"))

worker_image = result.ok()
if (worker_image := result.ok()) is None:
return Err(SyftError(message="The worker image does not exist."))

build_result = worker_image_service.build(
service_context,
image_uid=worker_image.id,
tag=self.tag,
registry_uid=self.registry_uid,
pull=self.pull_image,
)
build_success_message = "Image was pre-built."

if not worker_image.is_prebuilt:
build_result = worker_image_service.build(
service_context,
image_uid=worker_image.id,
tag=self.tag,
registry_uid=self.registry_uid,
pull=self.pull_image,
)

if isinstance(build_result, SyftError):
return Err(build_result)

if isinstance(build_result, SyftError):
return Err(build_result)
build_success_message = build_result.message

build_success = SyftSuccess(
message=f"Build result: {build_success_message}"
)

if IN_KUBERNETES:
if IN_KUBERNETES and not worker_image.is_prebuilt:
push_result = worker_image_service.push(
service_context,
image=worker_image.id,
Expand All @@ -245,11 +276,11 @@ def _run(

return Ok(
SyftSuccess(
message=f"Build Result: {build_result.message} \n Push Result: {push_result.message}"
message=f"{build_success}\nPush result: {push_result.message}"
)
)

return Ok(build_result)
return Ok(build_success)

except Exception as e:
return Err(SyftError(message=f"Failed to create/build image: {e}"))
Expand Down Expand Up @@ -291,7 +322,7 @@ def _run(
service_context: AuthedServiceContext = context.to_service_ctx()

if self.config is not None:
result = worker_pool_service.image_stash.get_by_docker_config(
result = worker_pool_service.image_stash.get_by_worker_config(
service_context.credentials, self.config
)
if result.is_err():
Expand Down
14 changes: 8 additions & 6 deletions packages/syft/src/syft/service/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ def create_kubernetes_pool(
**kwargs: Any,
) -> list[Pod] | SyftError:
pool = None
error = False

try:
print(
Expand Down Expand Up @@ -366,11 +365,14 @@ def create_kubernetes_pool(
reg_url=reg_url,
)
except Exception as e:
error = True
return SyftError(message=f"Failed to start workers {e}")
finally:
if error and pool:
if pool:
pool.delete()
# stdlib
import traceback

return SyftError(
message=f"Failed to start workers {e} {e.__class__} {e.args} {traceback.format_exc()}."
)

return runner.get_pool_pods(pool_name=pool_name)

Expand Down Expand Up @@ -564,7 +566,7 @@ def create_default_image(
image_identifier=SyftWorkerImageIdentifier.from_str(tag),
)

result = image_stash.get_by_docker_config(
result = image_stash.get_by_worker_config(
credentials=credentials,
config=worker_config,
)
Expand Down
26 changes: 16 additions & 10 deletions packages/syft/src/syft/service/worker/worker_image_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import pydantic

# relative
from ...custom_worker.config import DockerWorkerConfig
from ...custom_worker.config import PrebuiltWorkerConfig
from ...custom_worker.config import WorkerConfig
from ...custom_worker.k8s import IN_KUBERNETES
from ...serde.serializable import serializable
from ...store.document_store import DocumentStore
Expand Down Expand Up @@ -39,16 +40,21 @@ def __init__(self, store: DocumentStore) -> None:
self.stash = SyftWorkerImageStash(store=store)

@service_method(
path="worker_image.submit_dockerfile",
name="submit_dockerfile",
path="worker_image.submit_container_image",
name="submit_container_image",
roles=DATA_OWNER_ROLE_LEVEL,
shubham3121 marked this conversation as resolved.
Show resolved Hide resolved
)
def submit_dockerfile(
self, context: AuthedServiceContext, docker_config: DockerWorkerConfig
def submit_container_image(
self, context: AuthedServiceContext, worker_config: WorkerConfig
) -> SyftSuccess | SyftError:
worker_image = SyftWorkerImage(
config=docker_config,
config=worker_config,
created_by=context.credentials,
image_identifier=(
SyftWorkerImageIdentifier.from_str(worker_config.tag)
if isinstance(worker_config, PrebuiltWorkerConfig)
else None
),
)
res = self.stash.set(context.credentials, worker_image)

Expand Down Expand Up @@ -278,14 +284,14 @@ def get_by_uid(
roles=DATA_SCIENTIST_ROLE_LEVEL,
)
def get_by_config(
self, context: AuthedServiceContext, docker_config: DockerWorkerConfig
self, context: AuthedServiceContext, worker_config: WorkerConfig
) -> SyftWorkerImage | SyftError:
res = self.stash.get_by_docker_config(
credentials=context.credentials, config=docker_config
res = self.stash.get_by_worker_config(
credentials=context.credentials, config=worker_config
)
if res.is_err():
return SyftError(
message=f"Failed to get image with docker config {docker_config}. Error: {res.err()}"
message=f"Failed to get image with docker config {worker_config}. Error: {res.err()}"
)
image: SyftWorkerImage = res.ok()
return image
6 changes: 3 additions & 3 deletions packages/syft/src/syft/service/worker/worker_image_stash.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def set(
)

if isinstance(obj.config, DockerWorkerConfig):
result = self.get_by_docker_config(
result = self.get_by_worker_config(
credentials=credentials, config=obj.config
)
if result.is_ok() and result.ok() is not None:
Expand All @@ -62,8 +62,8 @@ def set(
ignore_duplicates=ignore_duplicates,
)

def get_by_docker_config(
self, credentials: SyftVerifyKey, config: DockerWorkerConfig
def get_by_worker_config(
self, credentials: SyftVerifyKey, config: WorkerConfig
) -> Result[SyftWorkerImage | None, str]:
qks = QueryKeys(qks=[WorkerConfigPK.with_obj(config)])
return self.query_one(credentials=credentials, qks=qks)
Loading
Loading