From 4791c0faa625a104c36d95f62a5a1148b1982bad Mon Sep 17 00:00:00 2001 From: while1eq1 Date: Fri, 14 Dec 2018 20:14:29 -0500 Subject: [PATCH] - add nodeSelectors, Tolerations, Affinity, and Balance master/data nodes across AZ's - use node selectors to assign masters / datanodes to specific AZ's - add nodeSelectors, also use nodeselectors to balance out each master/data node to 1 per AZ. Add tolerations - update documentation --- README.md | 21 ++++++++- pkg/apis/elasticsearchoperator/v1/cluster.go | 7 +++ .../v1/zz_generated.deepcopy.go | 43 +++++++++++++++++++ pkg/k8sutil/deployments.go | 3 +- pkg/k8sutil/k8sutil.go | 8 ++-- pkg/k8sutil/k8sutil_test.go | 7 ++- pkg/processor/processor.go | 13 +++--- 7 files changed, 90 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index e6fc95a9d..4b9d9161b 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,26 @@ Following parameters are available to customize the elastic cluster: - image: Image to use (Note: Using [custom image](https://github.com/upmc-enterprises/kibana-docker) since upstream has x-pack installed and causes issues) - cerebro: Deploy [cerebro](https://github.com/lmenezes/cerebro) to cluster and automatically reference certs from secret - image: Image to use (Note: Using [custom image](https://github.com/upmc-enterprises/cerebro-docker) since upstream has no docker images available) - +- nodeSelector: list of k8s NodeSelectors which are applied to the Master Nodes and Data Nodes + - `key: "value` +- tolerations: list of k8s Tolerations which are applied to the Master Nodes and Data Nodes + - `- effect:` eg: NoSchedule, NoExecute + `key:` eg: somekey + `operator:` eg: exists +- affinity: affinity rules to put on the client node deployments + - example: + ``` + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: role + operator: In + values: + - client + topologyKey: kubernetes.io/hostname + ``` ## Certs secret The default image used adds TLS to the Elastic cluster. If not existing, secrets are automatically generated by the operator dynamically. diff --git a/pkg/apis/elasticsearchoperator/v1/cluster.go b/pkg/apis/elasticsearchoperator/v1/cluster.go index 3b1c0cb84..a57bd1596 100644 --- a/pkg/apis/elasticsearchoperator/v1/cluster.go +++ b/pkg/apis/elasticsearchoperator/v1/cluster.go @@ -25,6 +25,7 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT package v1 import ( + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -72,6 +73,12 @@ type ClusterSpec struct { // labels. NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // Tolerations specifies which tolerations the Master and Data nodes will have applied to them + Tolerations []v1.Toleration `json:"tolerations,omitempty"` + + // Affinity (podAffinity, podAntiAffinity, nodeAffinity) will be applied to the Client nodes + Affinity v1.Affinity `json:"affinity,omitempty"` + // Zones specifies a map of key-value pairs. Defines which zones // to deploy persistent volumes for data nodes Zones []string `json:"zones,omitempty"` diff --git a/pkg/apis/elasticsearchoperator/v1/zz_generated.deepcopy.go b/pkg/apis/elasticsearchoperator/v1/zz_generated.deepcopy.go index f0bc1a629..e048ff2ec 100644 --- a/pkg/apis/elasticsearchoperator/v1/zz_generated.deepcopy.go +++ b/pkg/apis/elasticsearchoperator/v1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1 import ( + corev1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -82,6 +83,14 @@ func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) { (*out)[key] = val } } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]corev1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.Affinity.DeepCopyInto(&out.Affinity) if in.Zones != nil { in, out := &in.Zones, &out.Zones *out = make([]string, len(*in)) @@ -242,6 +251,38 @@ func (in *MemoryCPU) DeepCopy() *MemoryCPU { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RepoAuthentication) DeepCopyInto(out *RepoAuthentication) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RepoAuthentication. +func (in *RepoAuthentication) DeepCopy() *RepoAuthentication { + if in == nil { + return nil + } + out := new(RepoAuthentication) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RepoSchedulerAuthentication) DeepCopyInto(out *RepoSchedulerAuthentication) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RepoSchedulerAuthentication. +func (in *RepoSchedulerAuthentication) DeepCopy() *RepoSchedulerAuthentication { + if in == nil { + return nil + } + out := new(RepoSchedulerAuthentication) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Resources) DeepCopyInto(out *Resources) { *out = *in @@ -264,6 +305,7 @@ func (in *Resources) DeepCopy() *Resources { func (in *Scheduler) DeepCopyInto(out *Scheduler) { *out = *in out.Auth = in.Auth + out.RepoAuth = in.RepoAuth return } @@ -297,6 +339,7 @@ func (in *SchedulerAuthentication) DeepCopy() *SchedulerAuthentication { func (in *Snapshot) DeepCopyInto(out *Snapshot) { *out = *in out.Authentication = in.Authentication + out.RepoAuthentication = in.RepoAuthentication return } diff --git a/pkg/k8sutil/deployments.go b/pkg/k8sutil/deployments.go index 02993fd1c..c5bcc9557 100644 --- a/pkg/k8sutil/deployments.go +++ b/pkg/k8sutil/deployments.go @@ -96,7 +96,7 @@ func (k *K8sutil) DeleteDeployment(clusterName, namespace, deploymentType string // CreateClientDeployment creates the client deployment func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, javaOptions, clientJavaOptions string, - resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool) error { + resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool, affinity v1.Affinity) error { component := fmt.Sprintf("elasticsearch-%s", clusterName) discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName) @@ -170,6 +170,7 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java }, }, Spec: v1.PodSpec{ + Affinity: &affinity, Containers: []v1.Container{ v1.Container{ Name: deploymentName, diff --git a/pkg/k8sutil/k8sutil.go b/pkg/k8sutil/k8sutil.go index c37e600cf..111d3dace 100644 --- a/pkg/k8sutil/k8sutil.go +++ b/pkg/k8sutil/k8sutil.go @@ -396,7 +396,7 @@ func processDeploymentType(deploymentType string, clusterName string) (string, s } func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, storageClass, dataDiskSize, javaOptions, masterJavaOptions, dataJavaOptions, serviceAccountName, - statsdEndpoint, networkHost string, replicas *int32, useSSL *bool, resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy string) *apps.StatefulSet { + statsdEndpoint, networkHost string, replicas *int32, useSSL *bool, resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy string, nodeSelector map[string]string, tolerations []v1.Toleration) *apps.StatefulSet { _, role, isNodeMaster, isNodeData := processDeploymentType(deploymentType, clusterName) @@ -485,6 +485,8 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s }, }, Spec: v1.PodSpec{ + Tolerations: tolerations, + NodeSelector: nodeSelector, Affinity: &v1.Affinity{ PodAntiAffinity: &v1.PodAntiAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ @@ -665,7 +667,7 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s // CreateDataNodeDeployment creates the data node deployment func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources, - imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string) error { + imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string, nodeSelector map[string]string, tolerations []v1.Toleration) error { deploymentName, _, _, _ := processDeploymentType(deploymentType, clusterName) @@ -679,7 +681,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 logrus.Infof("StatefulSet %s not found, creating...", statefulSetName) statefulSet := buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, storageClass, dataDiskSize, javaOptions, masterJavaOptions, dataJavaOptions, serviceAccountName, - statsdEndpoint, networkHost, replicas, useSSL, resources, imagePullSecrets, imagePullPolicy) + statsdEndpoint, networkHost, replicas, useSSL, resources, imagePullSecrets, imagePullPolicy, nodeSelector, tolerations) if _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet); err != nil { logrus.Error("Could not create stateful set: ", err) diff --git a/pkg/k8sutil/k8sutil_test.go b/pkg/k8sutil/k8sutil_test.go index 37f4444a2..7da68a2dd 100644 --- a/pkg/k8sutil/k8sutil_test.go +++ b/pkg/k8sutil/k8sutil_test.go @@ -5,6 +5,7 @@ import ( "testing" v1 "github.com/upmc-enterprises/elasticsearch-operator/pkg/apis/elasticsearchoperator/v1" + corev1 "k8s.io/api/core/v1" ) func TestGetESURL(t *testing.T) { @@ -39,8 +40,10 @@ func TestSSLCertConfig(t *testing.T) { } clusterName := "test" useSSL := false + nodeSelector := make(map[string]string) + tolerations := []corev1.Toleration{} statefulSet := buildStatefulSet("test", clusterName, "master", "foo/image", "test", "1G", "", - "", "", "", "", "", nil, &useSSL, resources, nil, "") + "", "", "", "", "", nil, &useSSL, resources, nil, "", nodeSelector, tolerations) for _, volume := range statefulSet.Spec.Template.Spec.Volumes { if volume.Name == fmt.Sprintf("%s-%s", secretName, clusterName) { @@ -50,7 +53,7 @@ func TestSSLCertConfig(t *testing.T) { useSSL = true statefulSet = buildStatefulSet("test", clusterName, "master", "foo/image", "test", "1G", "", - "", "", "", "", "", nil, &useSSL, resources, nil, "") + "", "", "", "", "", nil, &useSSL, resources, nil, "", nodeSelector, tolerations) found := false for _, volume := range statefulSet.Spec.Template.Spec.Volumes { diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index c9600bde5..43e901c93 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -222,6 +222,9 @@ func (p *Processor) refreshClusters() error { ImagePullPolicy: cluster.Spec.Cerebro.ImagePullPolicy, ServiceAccountName: cluster.Spec.Cerebro.ServiceAccountName, }, + NodeSelector: cluster.Spec.NodeSelector, + Tolerations: cluster.Spec.Tolerations, + Affinity: cluster.Spec.Affinity, UseSSL: &useSSL, ServiceAccountName: cluster.Spec.ServiceAccountName, }, @@ -369,7 +372,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) } if err := p.k8sclient.CreateClientDeployment(baseImage, &c.Spec.ClientNodeReplicas, c.Spec.JavaOptions, c.Spec.ClientJavaOptions, - c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.UseSSL); err != nil { + c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.UseSSL, c.Spec.Affinity); err != nil { logrus.Error("Error creating client deployment ", err) return err } @@ -393,7 +396,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) for index, count := range zoneDistributionMaster { if err := p.k8sclient.CreateDataNodeDeployment("master", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, - c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil { logrus.Error("Error creating master node deployment ", err) return err } @@ -403,7 +406,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) for index, count := range zoneDistributionData { if err := p.k8sclient.CreateDataNodeDeployment("data", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, - c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil { logrus.Error("Error creating data node deployment ", err) return err @@ -419,7 +422,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) // Create Master Nodes if err := p.k8sclient.CreateDataNodeDeployment("master", func() *int32 { i := int32(c.Spec.MasterNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass, c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, - c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil { logrus.Error("Error creating master node deployment ", err) return err @@ -428,7 +431,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) // Create Data Nodes if err := p.k8sclient.CreateDataNodeDeployment("data", func() *int32 { i := int32(c.Spec.DataNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass, c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, - c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil { logrus.Error("Error creating data node deployment ", err) return err }