Skip to content

Commit

Permalink
Merge branch 'dev' into move-queue-constr
Browse files Browse the repository at this point in the history
  • Loading branch information
shubham3121 authored Feb 8, 2024
2 parents 39a0350 + dc0a249 commit 66e9544
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 20 deletions.
105 changes: 104 additions & 1 deletion notebooks/api/0.8/11-container-images-k8s.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@
"domain_client"
]
},
{
"cell_type": "markdown",
"id": "fe3d0aa7",
"metadata": {},
"source": [
"### Scaling Default Worker Pool"
]
},
{
"cell_type": "markdown",
"id": "55439eb5-1e92-46a6-a45a-471917a86265",
Expand All @@ -92,6 +100,101 @@
"domain_client.worker_pools"
]
},
{
"cell_type": "markdown",
"id": "0ff8e268",
"metadata": {},
"source": [
"Scale up to 3 workers"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de9872be",
"metadata": {},
"outputs": [],
"source": [
"result = domain_client.api.services.worker_pool.scale(\n",
" number=3, pool_name=\"default-pool\"\n",
")\n",
"assert not isinstance(result, sy.SyftError), str(result)\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "da6a499b",
"metadata": {},
"outputs": [],
"source": [
"result = domain_client.api.services.worker_pool.get_by_name(pool_name=\"default-pool\")\n",
"assert len(result.workers) == 3, str(result.to_dict())\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "27761f0c",
"metadata": {},
"outputs": [],
"source": [
"# stdlib\n",
"# wait for some time for scale up to be ready\n",
"from time import sleep\n",
"\n",
"sleep(5)"
]
},
{
"cell_type": "markdown",
"id": "c1276b5c",
"metadata": {},
"source": [
"Scale down to 1 worker"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f0aa94c",
"metadata": {},
"outputs": [],
"source": [
"default_worker_pool = domain_client.api.services.worker_pool.scale(\n",
" number=1, pool_name=\"default-pool\"\n",
")\n",
"assert not isinstance(result, sy.SyftError), str(result)\n",
"default_worker_pool"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "52acc6f6",
"metadata": {},
"outputs": [],
"source": [
"result = domain_client.api.services.worker_pool.get_by_name(pool_name=\"default-pool\")\n",
"assert len(result.workers) == 1, str(result.to_dict())\n",
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a7b40a3",
"metadata": {},
"outputs": [],
"source": [
"default_worker_pool = domain_client.api.services.worker_pool.get_by_name(\n",
" pool_name=\"default-pool\"\n",
")\n",
"default_worker_pool"
]
},
{
"cell_type": "markdown",
"id": "3c7a124a",
Expand Down Expand Up @@ -1153,7 +1256,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
"version": "3.11.7"
}
},
"nbformat": 4,
Expand Down
5 changes: 2 additions & 3 deletions packages/syft/src/syft/custom_worker/builder_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import docker

# relative
from .builder_types import BUILD_IMAGE_TIMEOUT_SEC
from .builder_types import BuilderBase
from .builder_types import ImageBuildResult
from .builder_types import ImagePushResult
Expand All @@ -18,8 +19,6 @@


class DockerBuilder(BuilderBase):
BUILD_MAX_WAIT = 30 * 60

def build_image(
self,
tag: str,
Expand All @@ -40,7 +39,7 @@ def build_image(
with contextlib.closing(docker.from_env()) as client:
image_result, logs = client.images.build(
tag=tag,
timeout=self.BUILD_MAX_WAIT,
timeout=BUILD_IMAGE_TIMEOUT_SEC,
buildargs=buildargs,
**kwargs,
)
Expand Down
12 changes: 10 additions & 2 deletions packages/syft/src/syft/custom_worker/builder_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from kr8s.objects import Secret

# relative
from .builder_types import BUILD_IMAGE_TIMEOUT_SEC
from .builder_types import BuilderBase
from .builder_types import ImageBuildResult
from .builder_types import ImagePushResult
from .builder_types import PUSH_IMAGE_TIMEOUT_SEC
from .k8s import INTERNAL_REGISTRY_HOST
from .k8s import JOB_COMPLETION_TTL
from .k8s import KUBERNETES_NAMESPACE
Expand Down Expand Up @@ -66,7 +68,10 @@ def build_image(
)

# wait for job to complete/fail
job.wait(["condition=Complete", "condition=Failed"])
job.wait(
["condition=Complete", "condition=Failed"],
timeout=BUILD_IMAGE_TIMEOUT_SEC,
)

# get logs
logs = self._get_logs(job)
Expand Down Expand Up @@ -119,7 +124,10 @@ def push_image(
push_secret=push_secret,
)

job.wait(["condition=Complete", "condition=Failed"])
job.wait(
["condition=Complete", "condition=Failed"],
timeout=PUSH_IMAGE_TIMEOUT_SEC,
)
exit_code = self._get_exit_code(job)[0]
logs = self._get_logs(job)
except Exception:
Expand Down
12 changes: 11 additions & 1 deletion packages/syft/src/syft/custom_worker/builder_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@
# third party
from pydantic import BaseModel

__all__ = ["BuilderBase", "ImageBuildResult", "ImagePushResult"]
__all__ = [
"BuilderBase",
"ImageBuildResult",
"ImagePushResult",
"BUILD_IMAGE_TIMEOUT_SEC",
"PUSH_IMAGE_TIMEOUT_SEC",
]


BUILD_IMAGE_TIMEOUT_SEC = 30 * 60
PUSH_IMAGE_TIMEOUT_SEC = 10 * 60


class ImageBuildResult(BaseModel):
Expand Down
15 changes: 13 additions & 2 deletions packages/syft/src/syft/custom_worker/runner_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from .k8s import get_kr8s_client

JSONPATH_AVAILABLE_REPLICAS = "{.status.availableReplicas}"
CREATE_POOL_TIMEOUT_SEC = 60
SCALE_POOL_TIMEOUT_SEC = 60


class KubernetesRunner:
Expand Down Expand Up @@ -57,7 +59,10 @@ def create_pool(
)

# wait for replicas to be available and ready
deployment.wait(f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}")
deployment.wait(
f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}",
timeout=CREATE_POOL_TIMEOUT_SEC,
)
except Exception:
raise
finally:
Expand All @@ -72,9 +77,15 @@ def scale_pool(self, pool_name: str, replicas: int) -> Optional[StatefulSet]:
if not deployment:
return None
deployment.scale(replicas)
deployment.wait(f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}")
deployment.wait(
f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}",
timeout=SCALE_POOL_TIMEOUT_SEC,
)
return deployment

def exists(self, pool_name: str) -> bool:
return bool(self.get_pool(pool_name))

def get_pool(self, pool_name: str) -> Optional[StatefulSet]:
selector = {"app.kubernetes.io/component": pool_name}
for _set in self.client.get("statefulsets", label_selector=selector):
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def run_workers_in_kubernetes(
spawn_status = []
runner = KubernetesRunner()

if start_idx == 0:
if not runner.exists(pool_name=pool_name):
pool_pods = create_kubernetes_pool(
runner=runner,
tag=worker_image.image_identifier.full_name_with_tag,
Expand Down
90 changes: 90 additions & 0 deletions packages/syft/src/syft/service/worker/worker_pool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ...custom_worker.config import CustomWorkerConfig
from ...custom_worker.config import WorkerConfig
from ...custom_worker.k8s import IN_KUBERNETES
from ...custom_worker.runner_k8s import KubernetesRunner
from ...serde.serializable import serializable
from ...store.document_store import DocumentStore
from ...store.linked_obj import LinkedObject
Expand All @@ -36,6 +37,7 @@
from .utils import get_orchestration_type
from .utils import run_containers
from .utils import run_workers_in_threads
from .utils import scale_kubernetes_pool
from .worker_image import SyftWorkerImage
from .worker_image_stash import SyftWorkerImageStash
from .worker_pool import ContainerSpawnStatus
Expand Down Expand Up @@ -430,6 +432,94 @@ def add_workers(

return container_statuses

@service_method(
path="worker_pool.scale",
name="scale",
roles=DATA_OWNER_ROLE_LEVEL,
)
def scale(
self,
context: AuthedServiceContext,
number: int,
pool_id: Optional[UID] = None,
pool_name: Optional[str] = None,
):
"""
Scale the worker pool to the given number of workers in Kubernetes.
Allows both scaling up and down the worker pool.
"""

if not IN_KUBERNETES:
return SyftError(message="Scaling is only supported in Kubernetes mode")
elif number < 0:
# zero is a valid scale down
return SyftError(message=f"Invalid number of workers: {number}")

result = self._get_worker_pool(context, pool_id, pool_name)
if isinstance(result, SyftError):
return result

worker_pool = result
current_worker_count = len(worker_pool.worker_list)

if current_worker_count == number:
return SyftSuccess(message=f"Worker pool already has {number} workers")
elif number > current_worker_count:
workers_to_add = number - current_worker_count
return self.add_workers(
context=context,
number=workers_to_add,
pool_id=pool_id,
pool_name=pool_name,
# kube scaling doesn't require password as it replicates an existing deployment
reg_username=None,
reg_password=None,
)
else:
# scale down at kubernetes control plane
runner = KubernetesRunner()
result = scale_kubernetes_pool(
runner,
pool_name=worker_pool.name,
replicas=number,
)
if isinstance(result, SyftError):
return result

# scale down removes the last "n" workers
# workers to delete = len(workers) - number
workers_to_delete = worker_pool.worker_list[
-(current_worker_count - number) :
]

worker_stash = context.node.get_service("WorkerService").stash
# delete linkedobj workers
for worker in workers_to_delete:
delete_result = worker_stash.delete_by_uid(
credentials=context.credentials,
uid=worker.object_uid,
)
if delete_result.is_err():
print(f"Failed to delete worker: {worker.object_uid}")

# update worker_pool
worker_pool.max_count = number
worker_pool.worker_list = worker_pool.worker_list[:number]
update_result = self.stash.update(
credentials=context.credentials,
obj=worker_pool,
)

if update_result.is_err():
return SyftError(
message=(
f"Pool {worker_pool.name} was scaled down, "
f"but failed update the stash with err: {result.err()}"
)
)

return SyftSuccess(message=f"Worker pool scaled to {number} workers")

@service_method(
path="worker_pool.filter_by_image_id",
name="filter_by_image_id",
Expand Down
10 changes: 0 additions & 10 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -783,19 +783,9 @@ commands =
# ignore 06 because of opendp on arm64
# Run 0.8 notebooks

bash -c 'echo Gateway Cluster Info; kubectl describe all -A --context k3d-testgateway1 --namespace testgateway1'
bash -c 'echo Gateway Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testgateway1 --namespace testgateway1'
bash -c 'echo Domain Cluster Info; kubectl describe all -A --context k3d-testdomain1 --namespace testdomain1'
bash -c 'echo Domain Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testdomain1 --namespace testdomain1'

bash -c " source ./scripts/get_k8s_secret_ci.sh; \
pytest --nbmake notebooks/api/0.8 -p no:randomly -k 'not 10-container-images.ipynb' -vvvv --nbmake-timeout=1000"

bash -c 'echo Gateway Cluster Info; kubectl describe all -A --context k3d-testgateway1 --namespace testgateway1'
bash -c 'echo Gateway Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testgateway1 --namespace testgateway1'
bash -c 'echo Domain Cluster Info; kubectl describe all -A --context k3d-testdomain1 --namespace testdomain1'
bash -c 'echo Domain Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testdomain1 --namespace testdomain1'

#Integration + Gateway Connection Tests
# Gateway tests are not run in kuberetes, as currently,it does not have a way to configure
# high/low side warning flag.
Expand Down

0 comments on commit 66e9544

Please sign in to comment.