diff --git a/README.md b/README.md index 6e402cc6342..735eee063d3 100644 --- a/README.md +++ b/README.md @@ -81,14 +81,27 @@ SYFT_VERSION="" #### 4. Provisioning Helm Charts ```sh -helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.ingressClass=traefik +helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.className="traefik" ``` -### Azure or GCP Ingress +### Ingress Controllers +For Azure AKS + +```sh +helm install ... --set ingress.className="azure-application-gateway" ``` -helm install ... --set ingress.ingressClass="azure/application-gateway" -helm install ... --set ingress.ingressClass="gce" + +For AWS EKS + +```sh +helm install ... --set ingress.className="alb" +``` + +For Google GKE we need the [`gce` annotation](https://cloud.google.com/kubernetes-engine/docs/how-to/load-balance-ingress#create-ingress) annotation. + +```sh +helm install ... --set ingress.class="gce" ``` ## Deploy to a Container Engine or Cloud diff --git a/notebooks/tutorials/data-engineer/11-installing-and-upgrading-via-helm.ipynb b/notebooks/tutorials/data-engineer/11-installing-and-upgrading-via-helm.ipynb index ed1537ef73a..729b5751c2f 100644 --- a/notebooks/tutorials/data-engineer/11-installing-and-upgrading-via-helm.ipynb +++ b/notebooks/tutorials/data-engineer/11-installing-and-upgrading-via-helm.ipynb @@ -142,7 +142,7 @@ "metadata": {}, "source": [ "```bash\n", - "helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.ingressClass=traefik\n", + "helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.className=traefik\n", "```" ] }, diff --git a/packages/grid/devspace.yaml b/packages/grid/devspace.yaml index 9accc527d5d..126064c486c 100644 --- a/packages/grid/devspace.yaml +++ b/packages/grid/devspace.yaml @@ -66,8 +66,8 @@ deployments: syft: registry: ${CONTAINER_REGISTRY} version: dev-${DEVSPACE_TIMESTAMP} - workerBuilds: - mountInBackend: true + registry: + maxStorage: "5Gi" node: settings: nodeName: ${NODE_NAME} diff --git a/packages/grid/helm/syft/templates/backend-service-account.yaml b/packages/grid/helm/syft/templates/backend-service-account.yaml index 97608b4fd4e..56e552b2b7f 100644 --- a/packages/grid/helm/syft/templates/backend-service-account.yaml +++ b/packages/grid/helm/syft/templates/backend-service-account.yaml @@ -2,7 +2,6 @@ apiVersion: v1 kind: ServiceAccount metadata: name: backend-service-account - namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ .Chart.Name }} app.kubernetes.io/version: {{ .Chart.AppVersion }} @@ -14,7 +13,6 @@ apiVersion: v1 kind: Secret metadata: name: backend-service-secret - namespace: {{ .Release.Namespace }} annotations: kubernetes.io/service-account.name: "backend-service-account" labels: @@ -29,7 +27,6 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: backend-service-role - namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ .Chart.Name }} app.kubernetes.io/version: {{ .Chart.AppVersion }} @@ -53,7 +50,6 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: backend-service-role-binding - namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ .Chart.Name }} app.kubernetes.io/version: {{ .Chart.AppVersion }} @@ -61,7 +57,6 @@ metadata: subjects: - kind: ServiceAccount name: backend-service-account - namespace: {{ .Release.Namespace }} roleRef: kind: Role name: backend-service-role diff --git a/packages/grid/helm/syft/templates/backend-statefulset.yaml b/packages/grid/helm/syft/templates/backend-statefulset.yaml index 1bd3ab2e898..ebebae80cc2 100644 --- a/packages/grid/helm/syft/templates/backend-statefulset.yaml +++ b/packages/grid/helm/syft/templates/backend-statefulset.yaml @@ -136,12 +136,6 @@ spec: name: credentials-data readOnly: false subPath: credentials-data - {{- if .Values.workerBuilds.mountInBackend }} - # mount for debugging and inspection of worker-build volume - - mountPath: /root/data/images/ - name: worker-builds - readOnly: true - {{- end }} dnsConfig: null ephemeralContainers: null hostAliases: null @@ -155,12 +149,6 @@ spec: terminationGracePeriodSeconds: 5 tolerations: null topologySpreadConstraints: null - {{- if .Values.workerBuilds.mountInBackend }} - volumes: - - name: worker-builds - persistentVolumeClaim: - claimName: worker-builds - {{- end }} volumeClaimTemplates: - metadata: labels: diff --git a/packages/grid/helm/syft/templates/frontend-deployment.yaml b/packages/grid/helm/syft/templates/frontend-deployment.yaml index dfc5d39549a..f43fd0018dc 100644 --- a/packages/grid/helm/syft/templates/frontend-deployment.yaml +++ b/packages/grid/helm/syft/templates/frontend-deployment.yaml @@ -29,7 +29,7 @@ spec: command: null env: - name: VERSION - value: {{ .Values.syft.version }} + value: "{{ .Values.syft.version }}" - name: VERSION_HASH value: {{ .Values.node.settings.versionHash }} - name: NODE_TYPE diff --git a/packages/grid/helm/syft/templates/grid-stack-ingress-ingress.yaml b/packages/grid/helm/syft/templates/grid-stack-ingress-ingress.yaml index 2eef50b54c6..6aed72bd414 100644 --- a/packages/grid/helm/syft/templates/grid-stack-ingress-ingress.yaml +++ b/packages/grid/helm/syft/templates/grid-stack-ingress-ingress.yaml @@ -8,8 +8,14 @@ metadata: app.kubernetes.io/component: ingress app.kubernetes.io/managed-by: Helm name: grid-stack-ingress + {{- if .Values.ingress.class }} + annotations: + kubernetes.io/ingress.class: {{ .Values.ingress.class }} + {{- end }} spec: - ingressClassName: {{ .Values.ingress.ingressClass }} + {{- if .Values.ingress.className }} + ingressClassName: {{ .Values.ingress.className }} + {{- end }} defaultBackend: service: name: proxy diff --git a/packages/grid/helm/syft/templates/grid-stack-ingress-tls-ingress.yaml b/packages/grid/helm/syft/templates/grid-stack-ingress-tls-ingress.yaml index a263c910156..58db0a03e29 100644 --- a/packages/grid/helm/syft/templates/grid-stack-ingress-tls-ingress.yaml +++ b/packages/grid/helm/syft/templates/grid-stack-ingress-tls-ingress.yaml @@ -8,8 +8,14 @@ metadata: app.kubernetes.io/component: ingress app.kubernetes.io/managed-by: Helm name: grid-stack-ingress-tls + {{- if .Values.ingress.class }} + annotations: + kubernetes.io/ingress.class: {{ .Values.ingress.class }} + {{- end }} spec: - ingressClassName: {{ .Values.ingress.ingressClass }} + {{- if .Values.ingress.className }} + ingressClassName: {{ .Values.ingress.className }} + {{- end }} defaultBackend: service: name: proxy diff --git a/packages/grid/helm/syft/templates/registry-service.yaml b/packages/grid/helm/syft/templates/registry-service.yaml new file mode 100644 index 00000000000..f96060e3a4d --- /dev/null +++ b/packages/grid/helm/syft/templates/registry-service.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Service +metadata: + name: registry + labels: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - protocol: TCP + port: 80 + targetPort: 5000 + selector: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/component: registry diff --git a/packages/grid/helm/syft/templates/registry-statefulset.yaml b/packages/grid/helm/syft/templates/registry-statefulset.yaml new file mode 100644 index 00000000000..c4fb60d474d --- /dev/null +++ b/packages/grid/helm/syft/templates/registry-statefulset.yaml @@ -0,0 +1,47 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: registry + labels: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/component: registry + app.kubernetes.io/managed-by: Helm +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/component: registry + app.kubernetes.io/managed-by: Helm + template: + metadata: + labels: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/component: registry + app.kubernetes.io/managed-by: Helm + spec: + containers: + - image: registry:2 + name: registry + env: + - name: REGISTRY_STORAGE_DELETE_ENABLED + value: "true" + ports: + - containerPort: 5000 + volumeMounts: + - mountPath: /var/lib/registry + name: registry-data + volumeClaimTemplates: + - metadata: + name: registry-data + labels: + app.kubernetes.io/name: {{ .Chart.Name }} + app.kubernetes.io/component: registry + app.kubernetes.io/managed-by: Helm + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.registry.maxStorage }} diff --git a/packages/grid/helm/syft/templates/worker-builds-pvc.yaml b/packages/grid/helm/syft/templates/worker-builds-pvc.yaml deleted file mode 100644 index 54eb4f7acc6..00000000000 --- a/packages/grid/helm/syft/templates/worker-builds-pvc.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: worker-builds - labels: - app.kubernetes.io/name: {{ .Chart.Name }} - app.kubernetes.io/version: {{ .Chart.AppVersion }} - app.kubernetes.io/component: worker-builds - app.kubernetes.io/managed-by: Helm -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: {{ .Values.workerBuilds.maxStorage }} diff --git a/packages/grid/helm/syft/values.yaml b/packages/grid/helm/syft/values.yaml index 43514759766..57d8ea8ff03 100644 --- a/packages/grid/helm/syft/values.yaml +++ b/packages/grid/helm/syft/values.yaml @@ -1,7 +1,3 @@ -# Default values for syft. -# This is a YAML-formatted file. -# Declare variables to be passed into your templates. - secrets: syft: syft-default-secret mongo: mongo-default-secret @@ -28,9 +24,8 @@ seaweedfs: queue: port: 5556 -workerBuilds: +registry: maxStorage: "10Gi" - mountInBackend: false syft: registry: "docker.io" @@ -49,7 +44,16 @@ node: inMemoryWorkers: false defaultWorkerPoolCount: 1 +# ---------------------------------------- +# For Azure +# className: "azure-application-gateway" +# ---------------------------------------- +# For AWS +# className: "alb" +# ---------------------------------------- +# For GCE, https://cloud.google.com/kubernetes-engine/docs/how-to/load-balance-ingress#create-ingress +# class: "gce" +# ---------------------------------------- ingress: - ingressClass: "" - # ingressClass: "azure/application-gateway" - # ingressClass: "gce" + class: null + className: null diff --git a/packages/syft/PYPI.md b/packages/syft/PYPI.md index 2265c5057ef..bee711a131a 100644 --- a/packages/syft/PYPI.md +++ b/packages/syft/PYPI.md @@ -78,14 +78,27 @@ SYFT_VERSION="" #### 4. Provisioning Helm Charts ```sh -helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.ingressClass=traefik +helm install my-domain openmined/syft --version $SYFT_VERSION --namespace syft --create-namespace --set ingress.className="traefik" ``` -### Azure or GCP Ingress +### Ingress Controllers +For Azure AKS + +```sh +helm install ... --set ingress.className="azure-application-gateway" ``` -helm install ... --set ingress.ingressClass="azure/application-gateway" -helm install ... --set ingress.ingressClass="gce" + +For AWS EKS + +```sh +helm install ... --set ingress.className="alb" +``` + +For Google GKE we need the [`gce` annotation](https://cloud.google.com/kubernetes-engine/docs/how-to/load-balance-ingress#create-ingress) annotation. + +```sh +helm install ... --set ingress.class="gce" ``` ## Deploy to a Container Engine or Cloud diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index e098c8c9f4e..a740dc3c79b 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -62,7 +62,7 @@ syft = numpy>=1.23.5,<=1.24.4 pandas==1.5.3 docker==6.1.3 - kr8s==0.13.0 + kr8s==0.13.1 PyYAML==6.0.1 azure-storage-blob==12.19 diff --git a/packages/syft/src/syft/custom_worker/builder_k8s.py b/packages/syft/src/syft/custom_worker/builder_k8s.py index a80df6470cd..395028d69b4 100644 --- a/packages/syft/src/syft/custom_worker/builder_k8s.py +++ b/packages/syft/src/syft/custom_worker/builder_k8s.py @@ -1,24 +1,25 @@ # stdlib from hashlib import sha256 -import os from pathlib import Path from typing import Dict from typing import List from typing import Optional # third party -import kr8s -from kr8s.objects import APIObject from kr8s.objects import ConfigMap from kr8s.objects import Job +from kr8s.objects import Secret # relative from .builder_types import BuilderBase from .builder_types import ImageBuildResult from .builder_types import ImagePushResult -from .k8s import BUILD_OUTPUT_PVC +from .k8s import INTERNAL_REGISTRY_HOST from .k8s import JOB_COMPLETION_TTL from .k8s import KUBERNETES_NAMESPACE +from .k8s import KubeUtils +from .k8s import get_kr8s_client +from .utils import ImageUtils __all__ = ["KubernetesBuilder"] @@ -28,8 +29,10 @@ class BuildFailed(Exception): class KubernetesBuilder(BuilderBase): + COMPONENT = "builder" + def __init__(self): - self.client = kr8s.api(namespace=KUBERNETES_NAMESPACE) + self.client = get_kr8s_client() def build_image( self, @@ -39,6 +42,9 @@ def build_image( buildargs: Optional[dict] = None, **kwargs, ) -> ImageBuildResult: + image_digest = None + logs = None + config = None job_id = self._new_job_id(tag) if dockerfile: @@ -46,19 +52,19 @@ def build_image( elif dockerfile_path: dockerfile = dockerfile_path.read_text() - # Create a ConfigMap with the Dockerfile - config = self._create_build_config(job_id, dockerfile) - config.refresh() - - # Create and start the job - job = self._create_kaniko_build_job( - job_id=job_id, - tag=tag, - job_config=config, - build_args=buildargs, - ) - try: + # Create a ConfigMap with the Dockerfile + config = self._create_build_config(job_id, dockerfile) + config.refresh() + + # Create and start the job + job = self._create_kaniko_build_job( + job_id=job_id, + tag=tag, + job_config=config, + build_args=buildargs, + ) + # wait for job to complete/fail job.wait(["condition=Complete", "condition=Failed"]) @@ -67,7 +73,7 @@ def build_image( image_digest = self._get_image_digest(job) if not image_digest: - exit_code = self._get_container_exit_code(job) + exit_code = self._get_exit_code(job) raise BuildFailed( "Failed to build the image. " f"Kaniko exit code={exit_code}. " @@ -78,7 +84,7 @@ def build_image( raise finally: # don't delete the job, kubernetes will gracefully do that for us - config.delete() + config and config.delete(propagation_policy="Foreground") return ImageBuildResult( image_hash=image_digest, @@ -93,18 +99,35 @@ def push_image( registry_url: str, **kwargs, ) -> ImagePushResult: - # Create and start the job + exit_code = 1 + logs = None job_id = self._new_job_id(tag) - job = self._create_push_job( - job_id=job_id, - tag=tag, - username=username, - password=password, - registry_url=registry_url, - ) - job.wait(["condition=Complete", "condition=Failed"]) - exit_code = self._get_container_exit_code(job)[0] - return ImagePushResult(logs=self._get_logs(job), exit_code=exit_code) + push_secret = None + + try: + push_secret = self._create_push_secret( + id=job_id, + url=registry_url, + username=username, + password=password, + ) + push_secret.refresh() + + job = self._create_push_job( + job_id=job_id, + tag=tag, + push_secret=push_secret, + ) + + job.wait(["condition=Complete", "condition=Failed"]) + exit_code = self._get_exit_code(job)[0] + logs = self._get_logs(job) + except Exception: + raise + finally: + push_secret and push_secret.delete(propagation_policy="Foreground") + + return ImagePushResult(logs=logs, exit_code=exit_code) def _new_job_id(self, tag: str) -> str: return self._get_tag_hash(tag)[:16] @@ -115,49 +138,35 @@ def _get_tag_hash(self, tag: str) -> str: def _get_image_digest(self, job: Job) -> Optional[str]: selector = {"batch.kubernetes.io/job-name": job.metadata.name} pods = self.client.get("pods", label_selector=selector) - for pod in pods: - for container_status in pod.status.containerStatuses: - if container_status.state.terminated.exitCode != 0: - continue - return container_status.state.terminated.message - return None - - def _get_container_exit_code(self, job: Job) -> List[int]: + return KubeUtils.get_container_exit_message(pods) + + def _get_exit_code(self, job: Job) -> List[int]: selector = {"batch.kubernetes.io/job-name": job.metadata.name} pods = self.client.get("pods", label_selector=selector) - exit_codes = [] - for pod in pods: - for container_status in pod.status.containerStatuses: - exit_codes.append(container_status.state.terminated.exitCode) - return exit_codes + return KubeUtils.get_container_exit_code(pods) def _get_logs(self, job: Job) -> str: selector = {"batch.kubernetes.io/job-name": job.metadata.name} pods = self.client.get("pods", label_selector=selector) - logs = [] - for pod in pods: - logs.append(f"----------Logs for pod={pod.metadata.name}----------") - for log in pod.logs(): - logs.append(log) - - return "\n".join(logs) - - def _check_success(self, job: Job) -> bool: - # TODO - return True + return KubeUtils.get_logs(pods) def _create_build_config(self, job_id: str, dockerfile: str) -> ConfigMap: config_map = ConfigMap( { "metadata": { "name": f"build-{job_id}", + "labels": { + "app.kubernetes.io/name": KUBERNETES_NAMESPACE, + "app.kubernetes.io/component": KubernetesBuilder.COMPONENT, + "app.kubernetes.io/managed-by": "kr8s", + }, }, "data": { "Dockerfile": dockerfile, }, } ) - return self._create_or_get(config_map) + return KubeUtils.create_or_get(config_map) def _create_kaniko_build_job( self, @@ -168,14 +177,20 @@ def _create_kaniko_build_job( ) -> Job: # for push build_args = build_args or {} - tag_hash = self._get_tag_hash(tag) + build_args_list = [] + + internal_tag = ImageUtils.change_registry(tag, registry=INTERNAL_REGISTRY_HOST) + + for k, v in build_args.items(): + build_args_list.append(f'--build-arg="{k}={v}"') + job = Job( { "metadata": { "name": f"build-{job_id}", "labels": { "app.kubernetes.io/name": KUBERNETES_NAMESPACE, - "app.kubernetes.io/component": "builder", + "app.kubernetes.io/component": KubernetesBuilder.COMPONENT, "app.kubernetes.io/managed-by": "kr8s", }, }, @@ -190,32 +205,32 @@ def _create_kaniko_build_job( "name": "kaniko", "image": "gcr.io/kaniko-project/executor:latest", "args": [ - # build_args "--dockerfile=Dockerfile", "--context=dir:///workspace", - f"--destination={tag}", + f"--destination={internal_tag}", # Disabling --reproducible because it eats up a lot of CPU+RAM # https://github.com/GoogleContainerTools/kaniko/issues/1960 # https://github.com/GoogleContainerTools/kaniko/pull/2477 # "--reproducible", - # Build outputs - f"--tar-path=/output/{tag_hash}.tar", + # cache args + "--cache=true", + "--cache-copy-layers", + "--cache-run-layers", + f"--cache-repo={INTERNAL_REGISTRY_HOST}/builder-cache", + # outputs args "--digest-file=/dev/termination-log", - "--no-push", # other kaniko conf + f"--insecure-registry={INTERNAL_REGISTRY_HOST}", + f"--skip-tls-verify-registry={INTERNAL_REGISTRY_HOST}", "--log-format=text", "--verbosity=info", - ], + ] + + build_args_list, "volumeMounts": [ { "name": "build-input", "mountPath": "/workspace", }, - { - "name": "build-output", - "mountPath": "/output", - "readOnly": False, - }, ], "resources": { "requests": { @@ -237,12 +252,6 @@ def _create_kaniko_build_job( "name": job_config.metadata.name, }, }, - { - "name": "build-output", - "persistentVolumeClaim": { - "claimName": BUILD_OUTPUT_PVC, - }, - }, ], } }, @@ -250,33 +259,25 @@ def _create_kaniko_build_job( } ) - return self._create_or_get(job) + return KubeUtils.create_or_get(job) def _create_push_job( self, job_id: str, tag: str, - username: str, - password: str, - registry_url: Optional[str] = None, + push_secret: Secret, ) -> Job: - tag_hash = self._get_tag_hash(tag) - registry_url = registry_url or tag.split("/")[0] - - extra_flags = "" - if os.getenv("DEV_MODE") == "True": - extra_flags = "--insecure" + internal_tag = ImageUtils.change_registry(tag, registry=INTERNAL_REGISTRY_HOST) + internal_reg, internal_repo, _ = ImageUtils.parse_tag(internal_tag) run_cmds = [ - "echo Logging in to $REG_URL with user $REG_USERNAME...", - # login to registry - "crane auth login $REG_URL -u $REG_USERNAME -p $REG_PASSWORD", # push with credentials - "echo Pushing image....", - f"crane push --image-refs /dev/termination-log {extra_flags} /output/{tag_hash}.tar {tag}", - # cleanup built tarfile - "echo Cleaning up tar....", - f"rm /output/{tag_hash}.tar", + "echo Pushing image...", + f"crane copy {internal_tag} {tag}", + # cleanup image from internal registry + "echo Cleaning up...", + f"IMG_DIGEST=$(crane digest {internal_tag})", + f"crane delete {internal_reg}/{internal_repo}@$IMG_DIGEST; echo Done", ] job = Job( @@ -286,7 +287,7 @@ def _create_push_job( "name": f"push-{job_id}", "labels": { "app.kubernetes.io/name": KUBERNETES_NAMESPACE, - "app.kubernetes.io/component": "builder", + "app.kubernetes.io/component": KubernetesBuilder.COMPONENT, "app.kubernetes.io/managed-by": "kr8s", }, }, @@ -301,27 +302,14 @@ def _create_push_job( "name": "crane", # debug is needed for "sh" to be available "image": "gcr.io/go-containerregistry/crane:debug", - "env": [ - { - "name": "REG_URL", - "value": registry_url, - }, - { - "name": "REG_USERNAME", - "value": username, - }, - { - "name": "REG_PASSWORD", - "value": password, - }, - ], "command": ["sh"], "args": ["-c", " && ".join(run_cmds)], "volumeMounts": [ { - "name": "build-output", - "mountPath": "/output", - "readOnly": False, + "name": "push-secret", + "mountPath": "/root/.docker/config.json", + "subPath": "config.json", + "readOnly": True, }, ], "resources": { @@ -339,9 +327,15 @@ def _create_push_job( ], "volumes": [ { - "name": "build-output", - "persistentVolumeClaim": { - "claimName": BUILD_OUTPUT_PVC, + "name": "push-secret", + "secret": { + "secretName": push_secret.metadata.name, + "items": [ + { + "key": ".dockerconfigjson", + "path": "config.json", + }, + ], }, }, ], @@ -350,11 +344,15 @@ def _create_push_job( }, } ) - return self._create_or_get(job) - - def _create_or_get(self, obj: APIObject) -> APIObject: - if not obj.exists(): - obj.create() - else: - obj.refresh() - return obj + return KubeUtils.create_or_get(job) + + def _create_push_secret(self, id: str, url: str, username: str, password: str): + return KubeUtils.create_dockerconfig_secret( + secret_name=f"push-secret-{id}", + component=KubernetesBuilder.COMPONENT, + registries=[ + # TODO: authorize internal registry? + (INTERNAL_REGISTRY_HOST, "username", id), + (url, username, password), + ], + ) diff --git a/packages/syft/src/syft/custom_worker/k8s.py b/packages/syft/src/syft/custom_worker/k8s.py index 491d7333b38..fb777f6ec6c 100644 --- a/packages/syft/src/syft/custom_worker/k8s.py +++ b/packages/syft/src/syft/custom_worker/k8s.py @@ -1,24 +1,36 @@ # stdlib +import base64 from enum import Enum +from functools import cache +import json import os +from typing import Dict +from typing import Iterable +from typing import List from typing import Optional +from typing import Tuple +from typing import Union # third party -from kr8s._data_utils import list_dict_unpack +import kr8s +from kr8s.objects import APIObject +from kr8s.objects import Pod +from kr8s.objects import Secret from pydantic import BaseModel # Time after which Job will be deleted JOB_COMPLETION_TTL = 60 -# Persistent volume claim for storing build output -BUILD_OUTPUT_PVC = "worker-builds" - # Kubernetes namespace KUBERNETES_NAMESPACE = os.getenv("K8S_NAMESPACE", "syft") # Kubernetes runtime flag IN_KUBERNETES = os.getenv("CONTAINER_HOST") == "k8s" +# Internal registry URL +DEFAULT_INTERNAL_REGISTRY = f"registry.{KUBERNETES_NAMESPACE}.svc.cluster.local" +INTERNAL_REGISTRY_HOST = os.getenv("INTERNAL_REGISTRY_HOST", DEFAULT_INTERNAL_REGISTRY) + class PodPhase(Enum): Pending = "Pending" @@ -36,7 +48,7 @@ class PodCondition(BaseModel): @classmethod def from_conditions(cls, conditions: list): - pod_cond = list_dict_unpack(conditions, key="type", value="status") + pod_cond = KubeUtils.list_dict_unpack(conditions, key="type", value="status") pod_cond_flags = {k: v == "True" for k, v in pod_cond.items()} return cls( pod_scheduled=pod_cond_flags.get("PodScheduled", False), @@ -82,3 +94,167 @@ def from_status_dict(cls: "PodStatus", status: dict): status.get("containerStatuses", {})[0] ), ) + + +@cache +def get_kr8s_client() -> kr8s.Api: + if not IN_KUBERNETES: + raise RuntimeError("Not inside a kubernetes environment") + return kr8s.api(namespace=KUBERNETES_NAMESPACE) + + +class KubeUtils: + """ + This class contains utility functions for interacting with kubernetes objects. + + DO NOT call `get_kr8s_client()` inside this class, instead pass it as an argument to the functions. + This is to avoid calling these functions on resources across namespaces! + """ + + @staticmethod + def resolve_pod(client: kr8s.Api, pod: Union[str, Pod]) -> Optional[Pod]: + """Return the first pod that matches the given name""" + if isinstance(pod, Pod): + return pod + + for _pod in client.get("pods", pod): + return _pod + + @staticmethod + def get_logs(pods: List[Pod]): + """Combine and return logs for all the pods as string""" + logs = [] + for pod in pods: + logs.append(f"----------Logs for pod={pod.metadata.name}----------") + for log in pod.logs(): + logs.append(log) + + return "\n".join(logs) + + @staticmethod + def get_pod_status(pod: Pod) -> Optional[PodStatus]: + """Map the status of the given pod to PodStatuss.""" + if not pod: + return None + return PodStatus.from_status_dict(pod.status) + + @staticmethod + def get_pod_env(pod: Pod) -> Optional[List[Dict]]: + """Return the environment variables of the first container in the pod.""" + if not pod: + return + + for container in pod.spec.containers: + return container.env.to_list() + + @staticmethod + def get_container_exit_code(pods: List[Pod]) -> List[int]: + """Return the exit codes of all the containers in the given pods.""" + exit_codes = [] + for pod in pods: + for container_status in pod.status.containerStatuses: + exit_codes.append(container_status.state.terminated.exitCode) + return exit_codes + + @staticmethod + def get_container_exit_message(pods: List[Pod]) -> Optional[str]: + """Return the exit message of the first container that exited with non-zero code.""" + for pod in pods: + for container_status in pod.status.containerStatuses: + if container_status.state.terminated.exitCode != 0: + continue + return container_status.state.terminated.message + return None + + @staticmethod + def b64encode_secret(data: str) -> str: + """Convert the data to base64 encoded string for Secret.""" + return base64.b64encode(data.encode()).decode() + + @staticmethod + def create_dockerconfig_secret( + secret_name: str, + component: str, + registries: Iterable[Tuple[str, str, str]], + ) -> Secret: + auths = {} + + for url, uname, passwd in registries: + auths[url] = { + "username": uname, + "password": passwd, + "auth": KubeUtils.b64encode_secret(f"{uname}:{passwd}"), + } + + config_str = json.dumps({"auths": auths}) + + return KubeUtils.create_secret( + secret_name=secret_name, + type="kubernetes.io/dockerconfigjson", + component=component, + data={ + ".dockerconfigjson": KubeUtils.b64encode_secret(config_str), + }, + ) + + @staticmethod + def create_secret( + secret_name: str, + type: str, + component: str, + data: str, + encoded=True, + ) -> Secret: + if not encoded: + for k, v in data.items(): + data[k] = KubeUtils.b64encode_secret(v) + + secret = Secret( + { + "metadata": { + "name": secret_name, + "labels": { + "app.kubernetes.io/name": KUBERNETES_NAMESPACE, + "app.kubernetes.io/component": component, + "app.kubernetes.io/managed-by": "kr8s", + }, + }, + "type": type, + "data": data, + } + ) + return KubeUtils.create_or_get(secret) + + @staticmethod + def create_or_get(obj: APIObject) -> APIObject: + if obj.exists(): + obj.refresh() + else: + obj.create() + return obj + + @staticmethod + def patch_env_vars(env_list: List[Dict], env_dict: Dict) -> List[Dict]: + """Patch kubernetes pod environment variables in the list with the provided dictionary.""" + + # update existing + for item in env_list: + k = item["name"] + if k in env_dict: + v = env_dict.pop(k) + item["value"] = v + + # append remaining + for k, v in env_dict.items(): + env_list.append({"name": k, "value": v}) + + return env_list + + @staticmethod + def list_dict_unpack( + input_list: List[Dict], + key: str = "key", + value: str = "value", + ) -> Dict: + # Snapshot from kr8s._data_utils + return {i[key]: i[value] for i in input_list} diff --git a/packages/syft/src/syft/custom_worker/runner_k8s.py b/packages/syft/src/syft/custom_worker/runner_k8s.py index 0838836989b..ff2c3120ebb 100644 --- a/packages/syft/src/syft/custom_worker/runner_k8s.py +++ b/packages/syft/src/syft/custom_worker/runner_k8s.py @@ -1,62 +1,68 @@ # stdlib -import base64 -import copy -import json -import os -from time import sleep +from typing import Dict from typing import List from typing import Optional from typing import Union # third party -import kr8s -from kr8s.objects import APIObject from kr8s.objects import Pod from kr8s.objects import Secret from kr8s.objects import StatefulSet # relative from .k8s import KUBERNETES_NAMESPACE +from .k8s import KubeUtils from .k8s import PodStatus +from .k8s import get_kr8s_client + +JSONPATH_AVAILABLE_REPLICAS = "{.status.availableReplicas}" class KubernetesRunner: def __init__(self): - self.client = kr8s.api(namespace=KUBERNETES_NAMESPACE) + self.client = get_kr8s_client() def create_pool( self, pool_name: str, tag: str, replicas: int = 1, - env_vars: Optional[dict] = None, + env_vars: Optional[List[Dict]] = None, + mount_secrets: Optional[Dict] = None, reg_username: Optional[str] = None, reg_password: Optional[str] = None, reg_url: Optional[str] = None, **kwargs, ) -> StatefulSet: - # create pull secret if registry credentials are passed - pull_secret = None - if reg_username and reg_password and reg_url: - pull_secret = self._create_image_pull_secret( - pool_name, - reg_username, - reg_password, - reg_url, + try: + # create pull secret if registry credentials are passed + pull_secret = None + if reg_username and reg_password and reg_url: + pull_secret = self._create_image_pull_secret( + pool_name, + reg_username, + reg_password, + reg_url, + ) + + # create a stateful set deployment + deployment = self._create_stateful_set( + pool_name=pool_name, + tag=tag, + replicas=replicas, + env_vars=env_vars, + mount_secrets=mount_secrets, + pull_secret=pull_secret, + **kwargs, ) - # create a stateful set deployment - deployment = self._create_stateful_set( - pool_name, - tag, - replicas, - env_vars, - pull_secret=pull_secret, - **kwargs, - ) - - # wait for replicas to be available and ready - self.wait(deployment, available_replicas=replicas) + # wait for replicas to be available and ready + deployment.wait(f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}") + except Exception: + raise + finally: + if pull_secret: + pull_secret.delete(propagation_policy="Foreground") # return return deployment @@ -66,7 +72,7 @@ def scale_pool(self, pool_name: str, replicas: int) -> Optional[StatefulSet]: if not deployment: return None deployment.scale(replicas) - self.wait(deployment, available_replicas=replicas) + deployment.wait(f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}") return deployment def get_pool(self, pool_name: str) -> Optional[StatefulSet]: @@ -78,21 +84,21 @@ def get_pool(self, pool_name: str) -> Optional[StatefulSet]: def delete_pool(self, pool_name: str) -> bool: selector = {"app.kubernetes.io/component": pool_name} for _set in self.client.get("statefulsets", label_selector=selector): - _set.delete() + _set.delete(propagation_policy="Foreground") for _secret in self.client.get("secrets", label_selector=selector): - _secret.delete() + _secret.delete(propagation_policy="Foreground") return True def delete_pod(self, pod_name: str) -> bool: pods = self.client.get("pods", pod_name) for pod in pods: - pod.delete() + pod.delete(propagation_policy="Foreground") return True return False - def get_pods(self, pool_name: str) -> List[Pod]: + def get_pool_pods(self, pool_name: str) -> List[Pod]: selector = {"app.kubernetes.io/component": pool_name} pods = self.client.get("pods", label_selector=selector) if len(pods) > 0: @@ -101,62 +107,15 @@ def get_pods(self, pool_name: str) -> List[Pod]: def get_pod_logs(self, pod_name: str) -> str: pods = self.client.get("pods", pod_name) - logs = [] - for pod in pods: - logs.append(f"----------Logs for pod={pod.metadata.name}----------") - for log in pod.logs(): - logs.append(log) - - return "\n".join(logs) + return KubeUtils.get_logs(pods) def get_pod_status(self, pod: Union[str, Pod]) -> Optional[PodStatus]: - if isinstance(pod, str): - pods = self.client.get("pods", pod) - if len(pods) == 0: - return None - pod = pods[0] - else: - pod.refresh() - - return PodStatus.from_status_dict(pod.status) - - def wait( - self, - deployment: StatefulSet, - available_replicas: int, - timeout: int = 300, - ) -> None: - # TODO: Report wait('jsonpath=') bug to kr8s - # Until then this is the substitute implementation - - if available_replicas <= 0: - return - - while True: - if timeout == 0: - raise TimeoutError("Timeout waiting for replicas") - - deployment.refresh() - if deployment.status.availableReplicas == available_replicas: - break + pod = KubeUtils.resolve_pod(self.client, pod) + return KubeUtils.get_pod_status(pod) - timeout -= 1 - sleep(1) - - def _current_pod_name(self) -> str: - env_val = os.getenv("K8S_POD_NAME") - if env_val: - return env_val - - selector = {"app.kubernetes.io/component": "backend"} - for pod in self.client.get("pods", label_selector=selector): - return pod.name - - def _get_obj_from_list(self, objs: List[dict], name: str) -> dict: - """Helper function extract kubernetes object from list by name""" - for obj in objs: - if obj.name == name: - return obj + def get_pod_env_vars(self, pod: Union[str, Pod]) -> Optional[List[Dict]]: + pod = KubeUtils.resolve_pod(self.client, pod) + return KubeUtils.get_pod_env(pod) def _create_image_pull_secret( self, @@ -166,67 +125,49 @@ def _create_image_pull_secret( reg_url: str, **kwargs, ): - _secret = Secret( - { - "metadata": { - "name": f"pull-secret-{pool_name}", - "labels": { - "app.kubernetes.io/name": KUBERNETES_NAMESPACE, - "app.kubernetes.io/component": pool_name, - "app.kubernetes.io/managed-by": "kr8s", - }, - }, - "type": "kubernetes.io/dockerconfigjson", - "data": { - ".dockerconfigjson": self._create_dockerconfig_json( - reg_username, - reg_password, - reg_url, - ) - }, - } + return KubeUtils.create_dockerconfig_secret( + secret_name=f"pull-secret-{pool_name}", + component=pool_name, + registries=[ + (reg_url, reg_username, reg_password), + ], ) - return self._create_or_get(_secret) - def _create_stateful_set( self, pool_name: str, tag: str, replicas=1, - env_vars: Optional[dict] = None, + env_vars: Optional[List[Dict]] = None, + mount_secrets: Optional[Dict] = None, pull_secret: Optional[Secret] = None, **kwargs, ) -> StatefulSet: """Create a stateful set for a pool""" - env_vars = env_vars or {} + volumes = [] + volume_mounts = [] pull_secret_obj = None - - _pod = Pod.get(self._current_pod_name()) - - creds_volume = self._get_obj_from_list( - objs=_pod.spec.volumes, - name="credentials-data", - ) - creds_volume_mount = self._get_obj_from_list( - objs=_pod.spec.containers[0].volumeMounts, - name="credentials-data", - ) - - env = _pod.spec.containers[0].env.to_list() - env_clone = copy.deepcopy(env) - - # update existing - for item in env_clone: - k = item["name"] - if k in env_vars: - v = env_vars.pop(k) - item["value"] = v - - # append remaining - for k, v in env_vars.items(): - env_clone.append({"name": k, "value": v}) + env_vars = env_vars or [] + + if mount_secrets: + for secret_name, mount_opts in mount_secrets.items(): + volumes.append( + { + "name": secret_name, + "secret": { + "secretName": secret_name, + }, + } + ) + volume_mounts.append( + { + "name": secret_name, + "mountPath": mount_opts.get("mountPath"), + "subPath": mount_opts.get("subPath"), + "readOnly": True, + } + ) if pull_secret: pull_secret_obj = [ @@ -265,41 +206,15 @@ def _create_stateful_set( "name": pool_name, "imagePullPolicy": "IfNotPresent", "image": tag, - "env": env_clone, - "volumeMounts": [creds_volume_mount], + "env": env_vars, + "volumeMounts": volume_mounts, } ], - "volumes": [creds_volume], + "volumes": volumes, "imagePullSecrets": pull_secret_obj, }, }, }, } ) - return self._create_or_get(stateful_set) - - def _create_or_get(self, obj: APIObject) -> APIObject: - if not obj.exists(): - obj.create() - else: - obj.refresh() - return obj - - def _create_dockerconfig_json( - self, - reg_username: str, - reg_password: str, - reg_url: str, - ): - config = { - "auths": { - reg_url: { - "username": reg_username, - "password": reg_password, - "auth": base64.b64encode( - f"{reg_username}:{reg_password}".encode() - ).decode(), - } - } - } - return base64.b64encode(json.dumps(config).encode()).decode() + return KubeUtils.create_or_get(stateful_set) diff --git a/packages/syft/src/syft/custom_worker/utils.py b/packages/syft/src/syft/custom_worker/utils.py index be6d3cb5915..597e4bb6aff 100644 --- a/packages/syft/src/syft/custom_worker/utils.py +++ b/packages/syft/src/syft/custom_worker/utils.py @@ -1,6 +1,8 @@ # stdlib import json from typing import Iterable +from typing import Optional +from typing import Tuple def iterator_to_string(iterator: Iterable) -> str: @@ -14,3 +16,24 @@ def iterator_to_string(iterator: Iterable) -> str: else: log += str(item) return log + + +class ImageUtils: + @staticmethod + def parse_tag(tag: str) -> Tuple[Optional[str], str, str]: + url, tag = tag.rsplit(":", 1) + args = url.rsplit("/", 2) + + if len(args) == 3: + registry = args[0] + repo = "/".join(args[1:]) + else: + registry = None + repo = "/".join(args) + + return registry, repo, tag + + @staticmethod + def change_registry(tag: str, registry: str) -> str: + _, repo, tag = ImageUtils.parse_tag(tag) + return f"{registry}/{repo}:{tag}" diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index 020830d2508..a449845115d 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -462,6 +462,10 @@ def input_policy(self) -> Optional[InputPolicy]: print(f"Failed to deserialize custom input policy state. {e}") return None + @property + def output_policy_approved(self): + return self.status.approved + @property def output_policy(self) -> Optional[OutputPolicy]: if not self.status.approved: diff --git a/packages/syft/src/syft/service/code/user_code_service.py b/packages/syft/src/syft/service/code/user_code_service.py index 20c8630c231..da409b1dac0 100644 --- a/packages/syft/src/syft/service/code/user_code_service.py +++ b/packages/syft/src/syft/service/code/user_code_service.py @@ -311,7 +311,7 @@ def is_execution_allowed(self, code, context, output_policy): # Check if the user has permission to execute the code. elif not (has_code_permission := self.has_code_permission(code, context)): return has_code_permission - elif code.output_policy is None: + elif not code.output_policy_approved: return SyftError("Output policy not approved", code) elif not output_policy.valid: return output_policy.valid @@ -399,9 +399,9 @@ def _call( code=code, context=context, output_policy=output_policy ) if not can_execute: - if output_policy is None: + if not code.output_policy_approved: return Err( - "UserCodeStatus.DENIED: Function has no output policy" + "Execution denied: Your code is waiting for approval" ) if not (is_valid := output_policy.valid): if ( diff --git a/packages/syft/src/syft/service/worker/image_identifier.py b/packages/syft/src/syft/service/worker/image_identifier.py index 531b5d3b726..43623f44d36 100644 --- a/packages/syft/src/syft/service/worker/image_identifier.py +++ b/packages/syft/src/syft/service/worker/image_identifier.py @@ -1,12 +1,12 @@ # stdlib from typing import Optional -from typing import Tuple from typing import Union # third party from typing_extensions import Self # relative +from ...custom_worker.utils import ImageUtils from ...serde.serializable import serializable from ...types.base import SyftBaseModel from .image_registry import SyftImageRegistry @@ -38,7 +38,7 @@ class SyftWorkerImageIdentifier(SyftBaseModel): @classmethod def with_registry(cls, tag: str, registry: SyftImageRegistry) -> Self: """Build a SyftWorkerImageTag from Docker tag & a previously created SyftImageRegistry object.""" - registry_str, repo, tag = SyftWorkerImageIdentifier.parse_str(tag) + registry_str, repo, tag = ImageUtils.parse_tag(tag) # if we parsed a registry string, make sure it matches the registry object if registry_str and registry_str != registry.url: @@ -49,23 +49,9 @@ def with_registry(cls, tag: str, registry: SyftImageRegistry) -> Self: @classmethod def from_str(cls, tag: str) -> Self: """Build a SyftWorkerImageTag from a pure-string standard Docker tag.""" - registry, repo, tag = SyftWorkerImageIdentifier.parse_str(tag) + registry, repo, tag = ImageUtils.parse_tag(tag) return cls(repo=repo, registry=registry, tag=tag) - @staticmethod - def parse_str(tag: str) -> Tuple[Optional[str], str, str]: - url, tag = tag.rsplit(":", 1) - args = url.rsplit("/", 2) - - if len(args) == 3: - registry = args[0] - repo = "/".join(args[1:]) - else: - registry = None - repo = "/".join(args) - - return registry, repo, tag - @property def repo_with_tag(self) -> str: if self.repo or self.tag: diff --git a/packages/syft/src/syft/service/worker/image_registry.py b/packages/syft/src/syft/service/worker/image_registry.py index cf3c36c0e0d..806a0946d2b 100644 --- a/packages/syft/src/syft/service/worker/image_registry.py +++ b/packages/syft/src/syft/service/worker/image_registry.py @@ -1,4 +1,5 @@ # stdlib +import re from urllib.parse import urlparse # third party @@ -10,6 +11,8 @@ from ...types.syft_object import SyftObject from ...types.uid import UID +REGX_DOMAIN = re.compile(r"^(localhost|([a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*))(\:\d{1,5})?$") + @serializable() class SyftImageRegistry(SyftObject): @@ -26,15 +29,19 @@ class SyftImageRegistry(SyftObject): @validator("url") def validate_url(cls, val: str): - if val.startswith("http") or "://" in val: - raise ValueError("Registry URL must be a valid RFC 3986 URI") + if not val: + raise ValueError("Invalid Registry URL. Must not be empty") + + if not bool(re.match(REGX_DOMAIN, val)): + raise ValueError("Invalid Registry URL. Must be a valid domain.") + return val @classmethod def from_url(cls, full_str: str): + # this is only for urlparse if "://" not in full_str: full_str = f"http://{full_str}" - parsed = urlparse(full_str) # netloc includes the host & port, so local dev should work as expected diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 9f677eb7f98..14b5799d825 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -1,6 +1,7 @@ # stdlib import contextlib import os +from pathlib import Path import socket import socketserver import sys @@ -19,6 +20,7 @@ from ...custom_worker.builder_types import ImagePushResult from ...custom_worker.config import DockerWorkerConfig from ...custom_worker.config import PrebuiltWorkerConfig +from ...custom_worker.k8s import KubeUtils from ...custom_worker.k8s import PodStatus from ...custom_worker.runner_k8s import KubernetesRunner from ...node.credentials import SyftVerifyKey @@ -36,6 +38,7 @@ DEFAULT_WORKER_IMAGE_TAG = "openmined/default-worker-image-cpu:0.0.1" DEFAULT_WORKER_POOL_NAME = "default-pool" +K8S_NODE_CREDS_NAME = "node-creds" def backend_container_name() -> str: @@ -254,9 +257,46 @@ def run_workers_in_threads( return results +def prepare_kubernetes_pool_env(runner: KubernetesRunner, env_vars: dict): + # get current backend pod name + backend_pod_name = os.getenv("K8S_POD_NAME") + if not backend_pod_name: + raise ValueError(message="Pod name not provided in environment variable") + + # get current backend's credentials path + creds_path = os.getenv("CREDENTIALS_PATH") + if not creds_path: + raise ValueError(message="Credentials path not provided") + + creds_path = Path(creds_path) + if not creds_path.exists(): + raise ValueError(message="Credentials file does not exist") + + # create a secret for the node credentials owned by the backend, not the pool. + node_secret = KubeUtils.create_secret( + secret_name=K8S_NODE_CREDS_NAME, + type="Opaque", + component=backend_pod_name, + data={creds_path.name: creds_path.read_text()}, + encoded=False, + ) + + # clone and patch backend environment variables + backend_env = runner.get_pod_env_vars(backend_pod_name) or [] + env_vars = KubeUtils.patch_env_vars(backend_env, env_vars) + mount_secrets = { + node_secret.metadata.name: { + "mountPath": str(creds_path), + "subPath": creds_path.name, + }, + } + + return env_vars, mount_secrets + + def create_kubernetes_pool( runner: KubernetesRunner, - worker_image: SyftWorker, + tag: str, pool_name: str, replicas: int, queue_port: int, @@ -273,14 +313,13 @@ def create_kubernetes_pool( print( "Creating new pool " f"name={pool_name} " - f"tag={worker_image.image_identifier.full_name_with_tag} " + f"tag={tag} " f"replicas={replicas}" ) - pool = runner.create_pool( - pool_name=pool_name, - tag=worker_image.image_identifier.full_name_with_tag, - replicas=replicas, - env_vars={ + + env_vars, mount_secrets = prepare_kubernetes_pool_env( + runner, + { "SYFT_WORKER": "True", "DEV_MODE": f"{debug}", "QUEUE_PORT": f"{queue_port}", @@ -289,6 +328,15 @@ def create_kubernetes_pool( "CREATE_PRODUCER": "False", "INMEMORY_WORKERS": "False", }, + ) + + # run the pool with args + secret + pool = runner.create_pool( + pool_name=pool_name, + tag=tag, + replicas=replicas, + env_vars=env_vars, + mount_secrets=mount_secrets, reg_username=reg_username, reg_password=reg_password, reg_url=reg_url, @@ -300,7 +348,7 @@ def create_kubernetes_pool( if error and pool: pool.delete() - return runner.get_pods(pool_name=pool_name) + return runner.get_pool_pods(pool_name=pool_name) def scale_kubernetes_pool( @@ -318,7 +366,7 @@ def scale_kubernetes_pool( except Exception as e: return SyftError(message=f"Failed to scale workers {e}") - return runner.get_pods(pool_name=pool_name) + return runner.get_pool_pods(pool_name=pool_name) def run_workers_in_kubernetes( @@ -339,7 +387,7 @@ def run_workers_in_kubernetes( if start_idx == 0: pool_pods = create_kubernetes_pool( runner=runner, - worker_image=worker_image, + tag=worker_image.image_identifier.full_name_with_tag, pool_name=pool_name, replicas=worker_count, queue_port=queue_port, diff --git a/packages/syft/tests/syft/request/request_code_accept_deny_test.py b/packages/syft/tests/syft/request/request_code_accept_deny_test.py index 242de50ff85..7451dd26d91 100644 --- a/packages/syft/tests/syft/request/request_code_accept_deny_test.py +++ b/packages/syft/tests/syft/request/request_code_accept_deny_test.py @@ -205,4 +205,4 @@ def simple_function(data): result = ds_client.code.simple_function(data=action_obj) assert isinstance(result, SyftError) - assert "UserCodeStatus.DENIED" in result.message + assert "Execution denied" in result.message diff --git a/scripts/build_images.sh b/scripts/build_images.sh new file mode 100644 index 00000000000..280ca544620 --- /dev/null +++ b/scripts/build_images.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +REGISTRY=${1:-"k3d-registry.localhost:5000"} +TAG=${2:-"latest"} + +docker image build -f ./packages/grid/backend/backend.dockerfile --target backend -t $REGISTRY/openmined/grid-backend:$TAG ./packages +docker image build -f ./packages/grid/frontend/frontend.dockerfile --target grid-ui-development -t $REGISTRY/openmined/grid-frontend:$TAG ./packages/grid/frontend +docker image build -f ./packages/grid/seaweedfs/seaweedfs.dockerfile --build-arg SEAWEEDFS_VERSION=3.59 -t $REGISTRY/openmined/grid-seaweedfs:$TAG ./packages/grid/seaweedfs diff --git a/tox.ini b/tox.ini index 838d8fe35e5..570bad4c0e4 100644 --- a/tox.ini +++ b/tox.ini @@ -1014,10 +1014,12 @@ allowlist_externals = commands = bash -c 'devspace purge --force-purge --kube-context k3d-syft-dev --namespace syft; sleep 3' bash -c 'devspace cleanup images --kube-context k3d-syft-dev --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5000 || true' + bash -c 'kubectl config use-context k3d-syft-dev' bash -c 'kubectl delete all --all --namespace syft || true' bash -c 'kubectl delete pvc --all --namespace syft || true' bash -c 'kubectl delete secret --all --namespace syft || true' bash -c 'kubectl delete configmap --all --namespace syft || true' + bash -c 'kubectl delete serviceaccount --all --namespace syft || true' [testenv:dev.k8s.destroy] description = Destroy local Kubernetes cluster