Skip to content

Commit

Permalink
Merge pull request #270 from while1eq1/add_tolerations
Browse files Browse the repository at this point in the history
Add tolerations, nodeSelectors for master/data nodes and affinity for client nodes
  • Loading branch information
stevesloka authored Jan 2, 2019
2 parents 8b2535d + 4791c0f commit 37e443b
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 12 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,26 @@ Following parameters are available to customize the elastic cluster:
- image: Image to use (Note: Using [custom image](https://github.com/upmc-enterprises/kibana-docker) since upstream has x-pack installed and causes issues)
- cerebro: Deploy [cerebro](https://github.com/lmenezes/cerebro) to cluster and automatically reference certs from secret
- image: Image to use (Note: Using [custom image](https://github.com/upmc-enterprises/cerebro-docker) since upstream has no docker images available)

- nodeSelector: list of k8s NodeSelectors which are applied to the Master Nodes and Data Nodes
- `key: "value`
- tolerations: list of k8s Tolerations which are applied to the Master Nodes and Data Nodes
- `- effect:` eg: NoSchedule, NoExecute
`key:` eg: somekey
`operator:` eg: exists
- affinity: affinity rules to put on the client node deployments
- example:
```
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: role
operator: In
values:
- client
topologyKey: kubernetes.io/hostname
```
## Certs secret

The default image used adds TLS to the Elastic cluster. If not existing, secrets are automatically generated by the operator dynamically.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/elasticsearchoperator/v1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
package v1

import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -72,6 +73,12 @@ type ClusterSpec struct {
// labels.
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

// Tolerations specifies which tolerations the Master and Data nodes will have applied to them
Tolerations []v1.Toleration `json:"tolerations,omitempty"`

// Affinity (podAffinity, podAntiAffinity, nodeAffinity) will be applied to the Client nodes
Affinity v1.Affinity `json:"affinity,omitempty"`

// Zones specifies a map of key-value pairs. Defines which zones
// to deploy persistent volumes for data nodes
Zones []string `json:"zones,omitempty"`
Expand Down
43 changes: 43 additions & 0 deletions pkg/apis/elasticsearchoperator/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/k8sutil/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (k *K8sutil) DeleteDeployment(clusterName, namespace, deploymentType string

// CreateClientDeployment creates the client deployment
func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, javaOptions, clientJavaOptions string,
resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool) error {
resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool, affinity v1.Affinity) error {

component := fmt.Sprintf("elasticsearch-%s", clusterName)
discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName)
Expand Down Expand Up @@ -170,6 +170,7 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java
},
},
Spec: v1.PodSpec{
Affinity: &affinity,
Containers: []v1.Container{
v1.Container{
Name: deploymentName,
Expand Down
8 changes: 5 additions & 3 deletions pkg/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func processDeploymentType(deploymentType string, clusterName string) (string, s
}

func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, storageClass, dataDiskSize, javaOptions, masterJavaOptions, dataJavaOptions, serviceAccountName,
statsdEndpoint, networkHost string, replicas *int32, useSSL *bool, resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy string) *apps.StatefulSet {
statsdEndpoint, networkHost string, replicas *int32, useSSL *bool, resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy string, nodeSelector map[string]string, tolerations []v1.Toleration) *apps.StatefulSet {

_, role, isNodeMaster, isNodeData := processDeploymentType(deploymentType, clusterName)

Expand Down Expand Up @@ -485,6 +485,8 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s
},
},
Spec: v1.PodSpec{
Tolerations: tolerations,
NodeSelector: nodeSelector,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
Expand Down Expand Up @@ -665,7 +667,7 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s

// CreateDataNodeDeployment creates the data node deployment
func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources,
imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string) error {
imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string, nodeSelector map[string]string, tolerations []v1.Toleration) error {

deploymentName, _, _, _ := processDeploymentType(deploymentType, clusterName)

Expand All @@ -679,7 +681,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3
logrus.Infof("StatefulSet %s not found, creating...", statefulSetName)

statefulSet := buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, storageClass, dataDiskSize, javaOptions, masterJavaOptions, dataJavaOptions, serviceAccountName,
statsdEndpoint, networkHost, replicas, useSSL, resources, imagePullSecrets, imagePullPolicy)
statsdEndpoint, networkHost, replicas, useSSL, resources, imagePullSecrets, imagePullPolicy, nodeSelector, tolerations)

if _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet); err != nil {
logrus.Error("Could not create stateful set: ", err)
Expand Down
7 changes: 5 additions & 2 deletions pkg/k8sutil/k8sutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

v1 "github.com/upmc-enterprises/elasticsearch-operator/pkg/apis/elasticsearchoperator/v1"
corev1 "k8s.io/api/core/v1"
)

func TestGetESURL(t *testing.T) {
Expand Down Expand Up @@ -39,8 +40,10 @@ func TestSSLCertConfig(t *testing.T) {
}
clusterName := "test"
useSSL := false
nodeSelector := make(map[string]string)
tolerations := []corev1.Toleration{}
statefulSet := buildStatefulSet("test", clusterName, "master", "foo/image", "test", "1G", "",
"", "", "", "", "", nil, &useSSL, resources, nil, "")
"", "", "", "", "", nil, &useSSL, resources, nil, "", nodeSelector, tolerations)

for _, volume := range statefulSet.Spec.Template.Spec.Volumes {
if volume.Name == fmt.Sprintf("%s-%s", secretName, clusterName) {
Expand All @@ -50,7 +53,7 @@ func TestSSLCertConfig(t *testing.T) {

useSSL = true
statefulSet = buildStatefulSet("test", clusterName, "master", "foo/image", "test", "1G", "",
"", "", "", "", "", nil, &useSSL, resources, nil, "")
"", "", "", "", "", nil, &useSSL, resources, nil, "", nodeSelector, tolerations)

found := false
for _, volume := range statefulSet.Spec.Template.Spec.Volumes {
Expand Down
13 changes: 8 additions & 5 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func (p *Processor) refreshClusters() error {
ImagePullPolicy: cluster.Spec.Cerebro.ImagePullPolicy,
ServiceAccountName: cluster.Spec.Cerebro.ServiceAccountName,
},
NodeSelector: cluster.Spec.NodeSelector,
Tolerations: cluster.Spec.Tolerations,
Affinity: cluster.Spec.Affinity,
UseSSL: &useSSL,
ServiceAccountName: cluster.Spec.ServiceAccountName,
},
Expand Down Expand Up @@ -369,7 +372,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)
}

if err := p.k8sclient.CreateClientDeployment(baseImage, &c.Spec.ClientNodeReplicas, c.Spec.JavaOptions, c.Spec.ClientJavaOptions,
c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.UseSSL); err != nil {
c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.UseSSL, c.Spec.Affinity); err != nil {
logrus.Error("Error creating client deployment ", err)
return err
}
Expand All @@ -393,7 +396,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)
for index, count := range zoneDistributionMaster {
if err := p.k8sclient.CreateDataNodeDeployment("master", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources,
c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost,
c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil {
c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil {
logrus.Error("Error creating master node deployment ", err)
return err
}
Expand All @@ -403,7 +406,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)
for index, count := range zoneDistributionData {
if err := p.k8sclient.CreateDataNodeDeployment("data", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources,
c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost,
c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil {
c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil {
logrus.Error("Error creating data node deployment ", err)

return err
Expand All @@ -419,7 +422,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)
// Create Master Nodes
if err := p.k8sclient.CreateDataNodeDeployment("master", func() *int32 { i := int32(c.Spec.MasterNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass,
c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name,
c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil {
c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil {
logrus.Error("Error creating master node deployment ", err)

return err
Expand All @@ -428,7 +431,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)
// Create Data Nodes
if err := p.k8sclient.CreateDataNodeDeployment("data", func() *int32 { i := int32(c.Spec.DataNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass,
c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name,
c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil {
c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations); err != nil {
logrus.Error("Error creating data node deployment ", err)
return err
}
Expand Down

0 comments on commit 37e443b

Please sign in to comment.