Skip to content

Commit

Permalink
Kubernetes Driver: switch to 'StatefulSet' from 'Deployment'.
Browse files Browse the repository at this point in the history
Signed-off-by: ArielLahiany <[email protected]>
  • Loading branch information
ArielLahiany committed Jan 21, 2025
1 parent b4a0dee commit 504eff2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 96 deletions.
46 changes: 23 additions & 23 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ type Driver struct {

// if you add fields, remember to update docs:
// https://github.com/docker/docs/blob/main/content/build/drivers/kubernetes.md
minReplicas int
deployment *appsv1.Deployment
configMaps []*corev1.ConfigMap
clientset *kubernetes.Clientset
deploymentClient clientappsv1.DeploymentInterface
podClient clientcorev1.PodInterface
configMapClient clientcorev1.ConfigMapInterface
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
minReplicas int
statefulSet *appsv1.StatefulSet
configMaps []*corev1.ConfigMap
clientset *kubernetes.Clientset
statefulSetClient clientappsv1.StatefulSetInterface
podClient clientcorev1.PodInterface
configMapClient clientcorev1.ConfigMapInterface
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
}

func (d *Driver) IsMobyDriver() bool {
Expand All @@ -65,10 +65,10 @@ func (d *Driver) Config() driver.InitConfig {

func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
return progress.Wrap("[internal] booting buildkit", l, func(sub progress.SubLogger) error {
_, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
_, err := d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error for bootstrap %q", d.deployment.Name)
return errors.Wrapf(err, "error for bootstrap %q", d.statefulSet.Name)
}

for _, cfg := range d.configMaps {
Expand All @@ -85,9 +85,9 @@ func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
}
}

_, err = d.deploymentClient.Create(ctx, d.deployment, metav1.CreateOptions{})
_, err = d.statefulSetClient.Create(ctx, d.statefulSet, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "error while calling deploymentClient.Create for %q", d.deployment.Name)
return errors.Wrapf(err, "error while calling statefulSetClient.Create for %q", d.statefulSet.Name)
}
}
return sub.Wrap(
Expand All @@ -102,7 +102,7 @@ func (d *Driver) wait(ctx context.Context) error {
// TODO: use watch API
var (
err error
depl *appsv1.Deployment
stat *appsv1.StatefulSet
)

timeoutChan := time.After(d.timeout)
Expand All @@ -116,31 +116,31 @@ func (d *Driver) wait(ctx context.Context) error {
case <-timeoutChan:
return err
case <-ticker.C:
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
stat, err = d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
if stat.Status.ReadyReplicas >= int32(d.minReplicas) {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, depl.Status.ReadyReplicas)
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, stat.Status.ReadyReplicas)
}
}
}
}

func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
depl, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
stat, err := d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
}
if depl.Status.ReadyReplicas <= 0 {
if stat.Status.ReadyReplicas <= 0 {
return &driver.Info{
Status: driver.Stopped,
}, nil
}
pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl)
pods, err := podchooser.ListRunningPods(ctx, d.podClient, stat)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -182,9 +182,9 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
if err := d.statefulSetClient.Delete(ctx, d.statefulSet.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
return errors.Wrapf(err, "error while calling statefulSetClient.Delete for %q", d.statefulSet.Name)
}
}
for _, cfg := range d.configMaps {
Expand Down
70 changes: 35 additions & 35 deletions driver/kubernetes/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
}
}

deploymentName, err := buildxNameToDeploymentName(cfg.Name)
statefulSetName, err := buildxNameToStatefulSetName(cfg.Name)
if err != nil {
return nil, err
}
Expand All @@ -128,44 +128,44 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
clientset: clientset,
}

deploymentOpt, loadbalance, namespace, defaultLoad, timeout, err := f.processDriverOpts(deploymentName, namespace, cfg)
statefulSetOpt, loadbalance, namespace, defaultLoad, timeout, err := f.processDriverOpts(statefulSetName, namespace, cfg)
if nil != err {
return nil, err
}

d.defaultLoad = defaultLoad
d.timeout = timeout

d.deployment, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
d.statefulSet, d.configMaps, err = manifest.NewStatefulSet(statefulSetOpt)
if err != nil {
return nil, err
}

d.minReplicas = deploymentOpt.Replicas
d.minReplicas = statefulSetOpt.Replicas

d.deploymentClient = clientset.AppsV1().Deployments(namespace)
d.statefulSetClient = clientset.AppsV1().StatefulSets(namespace)
d.podClient = clientset.CoreV1().Pods(namespace)
d.configMapClient = clientset.CoreV1().ConfigMaps(namespace)

switch loadbalance {
case LoadbalanceSticky:
d.podChooser = &podchooser.StickyPodChooser{
Key: cfg.ContextPathHash,
PodClient: d.podClient,
Deployment: d.deployment,
Key: cfg.ContextPathHash,
PodClient: d.podClient,
StatefulSet: d.statefulSet,
}
case LoadbalanceRandom:
d.podChooser = &podchooser.RandomPodChooser{
PodClient: d.podClient,
Deployment: d.deployment,
PodClient: d.podClient,
StatefulSet: d.statefulSet,
}
}
return d, nil
}

func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg driver.InitConfig) (*manifest.DeploymentOpt, string, string, bool, time.Duration, error) {
deploymentOpt := &manifest.DeploymentOpt{
Name: deploymentName,
func (f *factory) processDriverOpts(statefulSetName string, namespace string, cfg driver.InitConfig) (*manifest.StatefulSetOpt, string, string, bool, time.Duration, error) {
statefulSetOpt := &manifest.StatefulSetOpt{
Name: statefulSetName,
Image: bkimage.DefaultImage,
Replicas: 1,
BuildkitFlags: cfg.BuildkitdFlags,
Expand All @@ -177,7 +177,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
defaultLoad := false
timeout := defaultTimeout

deploymentOpt.Qemu.Image = bkimage.QemuImage
statefulSetOpt.Qemu.Image = bkimage.QemuImage

loadbalance := LoadbalanceSticky
var err error
Expand All @@ -186,57 +186,57 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
switch k {
case "image":
if v != "" {
deploymentOpt.Image = v
statefulSetOpt.Image = v
}
case "namespace":
namespace = v
case "replicas":
deploymentOpt.Replicas, err = strconv.Atoi(v)
statefulSetOpt.Replicas, err = strconv.Atoi(v)
if err != nil {
return nil, "", "", false, 0, err
}
case "requests.cpu":
deploymentOpt.RequestsCPU = v
statefulSetOpt.RequestsCPU = v
case "requests.memory":
deploymentOpt.RequestsMemory = v
statefulSetOpt.RequestsMemory = v
case "requests.ephemeral-storage":
deploymentOpt.RequestsEphemeralStorage = v
statefulSetOpt.RequestsEphemeralStorage = v
case "limits.cpu":
deploymentOpt.LimitsCPU = v
statefulSetOpt.LimitsCPU = v
case "limits.memory":
deploymentOpt.LimitsMemory = v
statefulSetOpt.LimitsMemory = v
case "limits.ephemeral-storage":
deploymentOpt.LimitsEphemeralStorage = v
statefulSetOpt.LimitsEphemeralStorage = v
case "rootless":
deploymentOpt.Rootless, err = strconv.ParseBool(v)
statefulSetOpt.Rootless, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, 0, err
}
if _, isImage := cfg.DriverOpts["image"]; !isImage {
deploymentOpt.Image = bkimage.DefaultRootlessImage
statefulSetOpt.Image = bkimage.DefaultRootlessImage
}
case "schedulername":
deploymentOpt.SchedulerName = v
statefulSetOpt.SchedulerName = v
case "serviceaccount":
deploymentOpt.ServiceAccountName = v
statefulSetOpt.ServiceAccountName = v
case "nodeselector":
deploymentOpt.NodeSelector, err = splitMultiValues(v, ",", "=")
statefulSetOpt.NodeSelector, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse node selector")
}
case "annotations":
deploymentOpt.CustomAnnotations, err = splitMultiValues(v, ",", "=")
statefulSetOpt.CustomAnnotations, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse annotations")
}
case "labels":
deploymentOpt.CustomLabels, err = splitMultiValues(v, ",", "=")
statefulSetOpt.CustomLabels, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse labels")
}
case "tolerations":
ts := strings.Split(v, ";")
deploymentOpt.Tolerations = []corev1.Toleration{}
statefulSetOpt.Tolerations = []corev1.Toleration{}
for i := range ts {
kvs := strings.Split(ts[i], ",")

Expand Down Expand Up @@ -267,7 +267,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
}

deploymentOpt.Tolerations = append(deploymentOpt.Tolerations, t)
statefulSetOpt.Tolerations = append(statefulSetOpt.Tolerations, t)
}
case "loadbalance":
switch v {
Expand All @@ -278,13 +278,13 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
loadbalance = v
case "qemu.install":
deploymentOpt.Qemu.Install, err = strconv.ParseBool(v)
statefulSetOpt.Qemu.Install, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, 0, err
}
case "qemu.image":
if v != "" {
deploymentOpt.Qemu.Image = v
statefulSetOpt.Qemu.Image = v
}
case "default-load":
defaultLoad, err = strconv.ParseBool(v)
Expand All @@ -301,7 +301,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
}

return deploymentOpt, loadbalance, namespace, defaultLoad, timeout, nil
return statefulSetOpt, loadbalance, namespace, defaultLoad, timeout, nil
}

func splitMultiValues(in string, itemsep string, kvsep string) (map[string]string, error) {
Expand All @@ -324,7 +324,7 @@ func (f *factory) AllowsInstances() bool {
// buildxNameToDeploymentName converts buildx name to Kubernetes Deployment name.
//
// eg. "buildx_buildkit_loving_mendeleev0" -> "loving-mendeleev0"
func buildxNameToDeploymentName(bx string) (string, error) {
func buildxNameToStatefulSetName(bx string) (string, error) {
// TODO: commands.util.go should not pass "buildx_buildkit_" prefix to drivers
s, err := driver.ParseBuilderName(bx)
if err != nil {
Expand Down
Loading

0 comments on commit 504eff2

Please sign in to comment.