diff --git a/Makefile b/Makefile index de78859b7..8a49eb04c 100644 --- a/Makefile +++ b/Makefile @@ -34,4 +34,4 @@ helm-package: helm repo index --merge charts/index.yaml charts test: clean - go test $$(go list ./... | grep -v /vendor/) + go test -v $$(go list ./... | grep -v /vendor/) diff --git a/README.md b/README.md index a45697ed7..0c09b1892 100644 --- a/README.md +++ b/README.md @@ -283,6 +283,9 @@ $ go get -u github.com/cloudflare/cfssl/cmd/cfssl $ go get -u github.com/cloudflare/cfssl/cmd/cfssljson $ go run cmd/operator/main.go --kubecfg-file=${HOME}/.kube/config ``` +# Scaling feature: + +[Details of scaling](docs/scaling.md) # About Built by UPMC Enterprises in Pittsburgh, PA. http://enterprises.upmc.com/ diff --git a/docs/scaling.md b/docs/scaling.md new file mode 100644 index 000000000..704aa220f --- /dev/null +++ b/docs/scaling.md @@ -0,0 +1,127 @@ + +# Patch Details: + +The following are changes present in scaling patch + +- Scaling is optional feature: Seperate section is defined for scaling as shown in below example spec. If the scaling section is not present then entire scaling feature will be disabled. +- Added changes to support local disk, but it is not tested for multiple nodes, MAY need to add affinity between POD and node. +- when Scaling will be triggered: Scaling will be triggered if there is any change in the one of the following 3 fields inside the scaling section: javaoptions,cpu and memory. javaoptions and resources entries corresponds only to non-data nodes incase scaling section is present. If scaling section is abscent then it corresponds to all nodes. + - JavaOptions: + - CPU inside resources : number of cpu cores. + - Memory inside resources : Memory size. + - Steps involved in vertical scaling of Elastic cluster: Repeating the following steps for each data node one after another, if there is any failure then scaling will be halted for the rest of data nodes: + - Step-1: check if there is any changes in 3-resources inside scaling section: javaoptions,cpu and memory. + - Step-2: ES-setting-change: change default time from 1 min to 6 min to avoid copying of shards belonging to the data node that is going to be scaled. + - step-2: ES-setting-change: change translong durability from async to sync/request basis, this is to make no data is loss. + - Step-3: ES-change: check if ES cluster is green state, suppose if one of the data node is down and state is yellow then do not proceed with scaling. + - Step-4: scale the Data node by updating the new resources in the statefull set. Here the Data node will be restarted. + - Step-5: check if the POD is restarted and in running state from k8 point of view. + - Step-6: check if the POD is up from the ES point of view. means Data node is register with the master. + - Step-7: check if all shards are registered with Master, At this ES should turn in to green from yellow, now it is safe to scale next data node. + - Step-8: Undo the settings done in step-2. + - Future Enhancements: + - Horizontal scaling of Data nodes: increasing the "data-node-replica" will function as expected, but if "data-node-replica" is decreased by more then 2 then the Elastic search cluster can enter in to red state and there will be data loss, this can prevented by executing similar to vertical scaling one after another without entering into red state. + - Picking "masternodeip" from services k8 objects instead from user. + - Vertical scaling improvements: periodic scaling: currently vertical scaling is triggered from user, instead it can be triggered based on time automatically. + - Multiple threads: Currently vertical for each elasticsearch cluster takes considerable amount of time, during this time other elastic cluster MAY not be done, this can be parallelised by running scaling operation in multiple threads. + - Dependency on ElasticSearch Version: It it depends on elasticsearch version as 6.3, if there is lot of changes in api's then scaling operation will fail. + - Local disk support: added changes to support local disk, but not tested with multiple nodes, it need to check that the pod will have affinity to the node, this can be enforced during the statefull set creation.. + - Usecases: + - case1: scale down during start of non-peak time , and scale-up during the start of peak time every day. doing scaling operation twice in a day, this can done using a cron job without interrupting the service. + - case2: scale down and scale up once in a while. + + +``` +Example Spec containing optional scaling section + +apiVersion: enterprises.upmc.com/v1 +kind: ElasticsearchCluster +metadata: + clusterName: "" + creationTimestamp: 2018-11-19T08:49:23Z + generation: 1 + name: es-cluster + namespace: default + resourceVersion: "1636643" + selfLink: /apis/enterprises.upmc.com/v1/namespaces/default/elasticsearchclusters/es-cluster + uid: 03267c65-ebd8-11e8-8e6a-000d3a000cf8 +spec: + cerebro: + image: hub.docker.prod.walmart.com/upmcenterprises/cerebro:0.6.8 + client-node-replicas: 1 + data-node-replicas: 4 + data-volume-size: 10Gi + elastic-search-image: quay.docker.prod.walmart.com/pires/docker-elasticsearch-kubernetes:6.3.2 + java-options: -Xms1052m -Xmx1052m + master-node-replicas: 1 + network-host: 0.0.0.0 + resources: + limits: + cpu: 4m + memory: 3048Mi + requests: + cpu: 3m + memory: 2024Mi + scaling: + java-options: -Xms1078m -Xmx1078m + resources: + limits: + cpu: 5m + memory: 2048Mi + requests: + cpu: 4m + memory: 1024Mi + storage: + storage-class: standard-disk + zones: [] +``` + +# TestCases : + +# Testcase-1 : Normal scaling operation on Network block storage. + - Description: Scaling operation can be triggered by changing the scaling parameters in ElasticSearch Cluster configuration,this can be done using "kubectl edit ..". If there is any change in jvm-heap memory in java-options or cpu cores inside the scaling section then scaling operation will be triggered. This can be observed in elastic search operator log. During scaling operation, the following should be full filled: a) At any time only one data node should be restarted, b) The State of ElasticSearch cluster should not be entered into Red state anytime during scaling operation. In case if the Elasticsearch cluster enter in to Redstate then the scaling operation will be automatically halted by the elasaticsearch opeartor. +- Steps to Reproduce the test: + - step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file. + - step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log. + - step-3: elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green. +- Test Status: Passed. + +# Testcase-2 : Stop and start the ES operator during scaling operation. +- Description: when the scaling operation is halfway, stop the elasticsearch operator. example: out 20 nodes, 10 nodes have completed scaling, during that time stop the elastic operator, and start the elasticsearch operator after few minutes, when the elasticsearch operator is started then scaling of the rest of nodes should continue where it was stopped last time instead of starting from the beginning. +- Steps to Reproduce the test: + - step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file. + - step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log. + - step-3: Elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green. + - step-4: After scaling is half way of the data nodes, stop the elastic search operator.example: out of 20 nodes, after completing 10 nodes, stop the ES operator. + - step-5: Wait for 10 min, and start ES operator + - step-6: When the ES operator is started, rest of the datanodes will get scaled without starting from the start. + - step-7: check in elasticsearch cluster the starting time of each data node. half of the data nodes would have started 10 min later. + - Test Status: Passed. + +# Testcase-3 : Trigger scaling operation when Elastic cluster is in "Yellow/Red" state +- Description: When the Elastic cluster in non-green state, and if scaling operation is started it should not start scaling operation. means not single data pod should be restarted. Scaling operation should be started only if the Elastic cluster is in green state. +- Steps to Reproduce the test: + - step-1: make the elastic search cluster to change in to yellow state by stopping one of the data node or by other means. + - step-2: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file. + - step-3: After step-2, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log. + - step-4: scaling of Data nodes will started, but immedietly it will generate an error saying ES cluster is in yellow state and scaling operation is aborted. +- Test Status : Passed. + +# Testcase-4 : Trigger of scaling operation: changes in Non-scaling parameter +- Description: If there is any change in non-scalar parameter, then scaling operation should not be triggered. Scaling operation should be triggered only of there is change in Java-options, cpu or memory inside scaling section. To trigger scaling atleast any one of jvm heap or cpu is required. +- Steps to Reproduce the test: + - step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters not related to scaling. + - step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, but scaling operaton of data nodes will not started saying there is no change in memory and cpu. +- Test Status: Passed + +# Testcase-5 : Normal scaling operation on local/nfs storage. +- Description: Similar to Testcase-1 except localdisk is set as storage-class instead of network block storage. +- Steps to Reproduce the test: + - step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file. + - step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log. + - step-3: elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green. +- Test status : Failed . +- Reason for Failure: Since all data nodes share same statefull set, the mount point for all data nodes is same, due to this second data nodes will not comesup, this need to addressed in future. + + + \ No newline at end of file diff --git a/pkg/apis/elasticsearchoperator/v1/cluster.go b/pkg/apis/elasticsearchoperator/v1/cluster.go index b33303226..5d406ce74 100644 --- a/pkg/apis/elasticsearchoperator/v1/cluster.go +++ b/pkg/apis/elasticsearchoperator/v1/cluster.go @@ -25,6 +25,7 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT package v1 import ( + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -72,6 +73,12 @@ type ClusterSpec struct { // labels. NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // Tolerations specifies which tolerations the Master and Data nodes will have applied to them + Tolerations []v1.Toleration `json:"tolerations,omitempty"` + + // Affinity (podAffinity, podAntiAffinity, nodeAffinity) will be applied to the Client nodes + Affinity v1.Affinity `json:"affinity,omitempty"` + // Zones specifies a map of key-value pairs. Defines which zones // to deploy persistent volumes for data nodes Zones []string `json:"zones,omitempty"` @@ -88,12 +95,24 @@ type ClusterSpec struct { // Snapshot defines how snapshots are scheduled Snapshot Snapshot `json:"snapshot"` - + + // Scaling defines how data nodes does virtual-scaling + Scaling Scaling `json:"scaling"` + // Storage defines how volumes are provisioned Storage Storage `json:"storage"` - // JavaOptions defines args passed to elastic nodes + // JavaOptions defines args passed to all elastic nodes JavaOptions string `json:"java-options"` + + // ClientJavaOptions defines args passed to client nodes (Overrides JavaOptions) + ClientJavaOptions string `json:"client-java-options"` + + // DataJavaOptions defines args passed to data nodes (Overrides JavaOptions) + DataJavaOptions string `json:"data-java-options"` + + // MasterJavaOptions defines args passed to master nodes (Overrides JavaOptions) + MasterJavaOptions string `json:"master-java-options"` // ImagePullSecrets defines credentials to pull image from private repository (optional) ImagePullSecrets []ImagePullSecrets `json:"image-pull-secrets"` @@ -134,6 +153,15 @@ type ImagePullSecrets struct { Name string `json:"name"` } +// Scaling defines all params for vertical scaling of data nodes +type Scaling struct { + // Resources defines memory / cpu constraints + Resources Resources `json:"resources"` + + // JavaOptions defines args passed to elastic nodes + JavaOptions string `json:"java-options"` +} + // Snapshot defines all params to create / store snapshots type Snapshot struct { // Enabled determines if snapshots are enabled diff --git a/pkg/k8sutil/deployments.go b/pkg/k8sutil/deployments.go index f50fd7250..205bb7a31 100644 --- a/pkg/k8sutil/deployments.go +++ b/pkg/k8sutil/deployments.go @@ -194,6 +194,10 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java Name: "NODE_DATA", Value: "false", }, + v1.EnvVar{ + Name: "NODE_INGEST", + Value: "true", + }, v1.EnvVar{ Name: "HTTP_ENABLE", Value: "true", diff --git a/pkg/k8sutil/es_crud.go b/pkg/k8sutil/es_crud.go new file mode 100644 index 000000000..821f0001f --- /dev/null +++ b/pkg/k8sutil/es_crud.go @@ -0,0 +1,135 @@ + + +package k8sutil + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + "github.com/Sirupsen/logrus" +) + + +/* +TODO: + Assuming orginal settings are default values like delayed_timeout is 1m and translog.durability is async, incase if the user is changed then + user changed settings will be lost after scaling operations. + this can be corrected by reading the setting before change. +*/ + +func ES_change_settings(es_ip , duration,translog_durability string)(error){ + var ret error + ret = errors.New("Scaling: error in setting delay timeout") + + logrus.Infof("Scaling: change ES setting: delay timeout:%s and translog durability: %s", duration, translog_durability) + client := &http.Client{ + } + body:="{ " + body = body + "\"settings\": { \"index.unassigned.node_left.delayed_timeout\": \""+duration+"\" ," + body = body + "\"index.translog.durability\": \""+translog_durability +"\" }" + body = body + " }" + + req, _ := http.NewRequest("PUT", "http://"+es_ip+"/_all/_settings", bytes.NewBufferString(body)) + req.Header.Add("Content-Type", `application/json`) + resp, _ := client.Do(req) + if (resp == nil){ + return ret; + } + data, _ := ioutil.ReadAll(resp.Body) + + if (resp.StatusCode == 200){ + ret = nil + }else{ + if (strings.Contains(string(data), "index_not_found_exception")){ + ret = nil /* if there are no indexes , then this function need po pass */ + fmt.Println("WARNING in setting delaytimeout: response: ",resp, string(data)) + return ret + } + //string str:= "Scaling: setting delaytimeout: response: " + resp + string(data) + ret = errors.New("Scaling: setting delaytimeout: response: ") + } + return ret; +} +func ES_checkForGreen(es_ip string)(error){ + logrus.Infof("Scaling: ES checking for green ") + return ES_checkForShards(es_ip , "", MAX_EsCommunicationTime) +} +func util_wordcount(input string, nodename string) (int,int) { + node_count := 0 + unassigned_count :=0 + initializing_count :=0 + words := strings.Fields(input) + for _, word := range words { + if (nodename != "" && word == nodename){ + node_count++ + } else if (word == "UNASSIGNED"){ + unassigned_count++ + } + if (word == "INITIALIZING"){ + initializing_count++ + } + } + //fmt.Println(nodename," shards: ",node_count," UNASSIGNED shards: ",unassigned_count," initialising shards: ",initializing_count) + return node_count,unassigned_count+initializing_count +} + +func ES_checkForShards(es_ip string, nodeName string, waitSeconds int)(error) { + var ret error + ret = errors.New("still unassigned shards are there") + + logrus.Infof("Scaling: ES checking for shards name: %s",nodeName) + for i := 0; i < waitSeconds; i++ { + response, err := http.Get("http://" + es_ip + "/_cat/shards") + if err != nil { + logrus.Infof("Error: The HTTP request failed with error %s\n", err) + } else { + data, _ := ioutil.ReadAll(response.Body) + _,unassigned_count := util_wordcount(string(data), nodeName) + if (unassigned_count>0) { + time.Sleep(1 * time.Second) + continue + } else { + + if (response.StatusCode == 200){ + ret = nil + }else{ + logrus.Infof("Scaling: Error checkForShards response :%s: statuscode %d:",data,response.StatusCode) + } + break + } + } + time.Sleep(1 * time.Second) + } + + return ret; +} + + func ES_checkForNodeUp(es_ip string, nodeName string, waitSeconds int)(error){ + var ret error + ret = errors.New("ES node is not up") + + logrus.Infof("Scaling: ES checking if the data node: %s joined master ",nodeName) + for i := 0; i < waitSeconds; i++ { + response, err := http.Get("http://" + es_ip + "/_cat/nodes?v&h=n,ip,v") + if err != nil { + fmt.Printf("Scaling: The HTTP request failed with error %s\n", err) + ret = err + } else { + data, _ := ioutil.ReadAll(response.Body) + str_ret := strings.Contains(string(data), nodeName) + if (str_ret){ + ret = nil + return ret; + } + } + time.Sleep(1 * time.Second) + } + + return ret; + } + + diff --git a/pkg/k8sutil/k8sutil.go b/pkg/k8sutil/k8sutil.go index f1358969b..fb6dbb6ba 100644 --- a/pkg/k8sutil/k8sutil.go +++ b/pkg/k8sutil/k8sutil.go @@ -441,6 +441,48 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s component := fmt.Sprintf("elasticsearch-%s", clusterName) discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName) + + volumes := []v1.Volume{} + volumeClaimTemplates := []v1.PersistentVolumeClaim{} + if storageClass == "localdisk" { + hostpath := "/mnt/"+statefulSetName + // TODO: localdisk does not work, since the local path is same for all data nodes, this need to be fixed. + volumes = []v1.Volume{ + v1.Volume{ + Name: "es-data", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path : hostpath, + }, + }, + }, + + } + }else{ + volumeClaimTemplates = []v1.PersistentVolumeClaim{ + v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "es-data", + Labels: map[string]string{ + "component": "elasticsearch", + "role": role, + "name": statefulSetName, + "cluster": clusterName, + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: volumeSize, + }, + }, + }, + }, + } + } statefulSet := &apps.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -454,6 +496,9 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s }, Spec: apps.StatefulSetSpec{ Replicas: replicas, + UpdateStrategy: apps.StatefulSetUpdateStrategy { + Type: apps.OnDeleteStatefulSetStrategyType, + }, ServiceName: statefulSetName, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -519,6 +564,14 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s Name: "CLUSTER_NAME", Value: clusterName, }, + v1.EnvVar{ + Name: "NODE_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, v1.EnvVar{ Name: "NODE_MASTER", Value: isNodeMaster, @@ -531,6 +584,10 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s Name: "HTTP_ENABLE", Value: "true", }, + v1.EnvVar{ + Name: "path.data", + Value: "/data11/test", + }, v1.EnvVar{ Name: "SEARCHGUARD_SSL_TRANSPORT_ENABLED", Value: enableSSL, @@ -588,33 +645,11 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s }, }, }, - Volumes: []v1.Volume{}, + Volumes: volumes, ImagePullSecrets: TemplateImagePullSecrets(imagePullSecrets), }, }, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{ - v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "es-data", - Labels: map[string]string{ - "component": "elasticsearch", - "role": role, - "name": statefulSetName, - "cluster": clusterName, - }, - }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceStorage: volumeSize, - }, - }, - }, - }, - }, + VolumeClaimTemplates: volumeClaimTemplates, }, } @@ -642,7 +677,7 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s statefulSet.Spec.Template.Spec.ServiceAccountName = serviceAccountName } - if storageClass != "default" { + if storageClass != "default" && storageClass != "localdisk"{ statefulSet.Spec.VolumeClaimTemplates[0].Annotations = map[string]string{ "volume.beta.kubernetes.io/storage-class": storageClass, } @@ -653,7 +688,7 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s // CreateDataNodeDeployment creates the data node deployment func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources, - imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions string, useSSL *bool, esUrl string) error { + imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string, nodeSelector map[string]string, tolerations []v1.Toleration, scaling bool) error { deploymentName, _, _, _ := processDeploymentType(deploymentType, clusterName) @@ -678,7 +713,10 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 logrus.Error("Could not get stateful set! ", err) return err } - + if deploymentType == "data" && scaling { + ret := scale_statefulset(k, namespace, clusterName, statefulSetName, resources, javaOptions, statefulSet, *replicas) + return ret + } //scale replicas? if statefulSet.Spec.Replicas != replicas { currentReplicas := *statefulSet.Spec.Replicas @@ -689,7 +727,6 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 } statefulSet.Spec.Replicas = replicas _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Update(statefulSet) - if err != nil { logrus.Error("Could not scale statefulSet: ", err) minMasterNodes := elasticsearchutil.MinMasterNodes(int(currentReplicas)) diff --git a/pkg/k8sutil/scaling.go b/pkg/k8sutil/scaling.go new file mode 100644 index 000000000..72742bb61 --- /dev/null +++ b/pkg/k8sutil/scaling.go @@ -0,0 +1,236 @@ +/* + + */ + +package k8sutil + +import ( + "fmt" + "github.com/Sirupsen/logrus" + myspec "github.com/upmc-enterprises/elasticsearch-operator/pkg/apis/elasticsearchoperator/v1" + "k8s.io/api/apps/v1beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + //"k8s.io/api/core/v1" + "strconv" + "strings" + "time" +) + +const ( + MAX_DataNodePodRestartTime = 400 // in seconds + MAX_EsCommunicationTime = 180 // in seconds + MAX_EsWaitForDataNode = "6m" // time for the master to wait for data node to reboot before it rebalances the shards +) +func compare_pod_vs_statefulset(namespace, statefulSetName,podName string, k *K8sutil) bool { + resourceMatch := true + logrus.Infof("Scaling: Compare Spec POD Vs statefulset : ",podName) + statefulSet, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Get(statefulSetName, metav1.GetOptions{}) + if (err != nil){ + return resourceMatch + } + pod, err := k.Kclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) + if (err != nil){ + return resourceMatch + } + + if pod.Spec.Containers[0].Resources.Requests["cpu"] != statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["cpu"] { + logrus.Infof("Scaling: Compare POD Vs SS cpu changed by user: ", pod.Spec.Containers[0].Resources.Requests["cpu"], " current value: ", statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["cpu"]) + resourceMatch = false + return resourceMatch + } + /*if memory != statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["memory"] { + resourceMatch = false + return resourceMatch + }*/ + javaOptions:="" + for _, env := range pod.Spec.Containers[0].Env { + if env.Name == "ES_JAVA_OPTS" { + javaOptions = env.Value + break + } + } + for _, env := range statefulSet.Spec.Template.Spec.Containers[0].Env { + if env.Name == "ES_JAVA_OPTS" { + if env.Value != javaOptions { + logrus.Infof("Scaling: Compare POD Vs SS java_opts Changed by user: ", javaOptions, " current value: ", env.Value, " name: ", podName) + resourceMatch = false + return resourceMatch + } + break + } + } + + return resourceMatch +} + +func k8_check_DataNodeRestarted(namespace, statefulSetName,dnodename string, k *K8sutil) error { + ret := fmt.Errorf("Scaling: POD is not up: ") + statefulset, _ := k.Kclient.AppsV1beta2().StatefulSets(namespace).Get(statefulSetName, metav1.GetOptions{}) + + ss_version, _ := strconv.Atoi(statefulset.ObjectMeta.ResourceVersion) + pod, _ := k.Kclient.CoreV1().Pods(namespace).Get(dnodename, metav1.GetOptions{}) + logrus.Infof("Scaling: checking Datanode if it restarted or not by k8:ss: %s datanode:%s", statefulSetName,dnodename) + waitSeconds := MAX_DataNodePodRestartTime + for i := 0; i < waitSeconds; i++ { + pod, _ = k.Kclient.CoreV1().Pods(namespace).Get(dnodename, metav1.GetOptions{}) + if pod == nil { + //logrus.Infof("Scaling: POD Terminated ") + } else { + pod_version, _ := strconv.Atoi(pod.ObjectMeta.ResourceVersion) + if pod_version > ss_version { + //logrus.Infof("Scaling: POD started pod_version:",pod_version," dd_version:",ss_version," state: ",(pod.Status.ContainerStatuses)) + status := "" + if len(pod.Status.ContainerStatuses) > 0 { + if pod.Status.ContainerStatuses[0].State.Running != nil { + status = status + pod.Status.ContainerStatuses[0].State.Running.String() + } + if pod.Status.ContainerStatuses[0].State.Waiting != nil { + status = status + pod.Status.ContainerStatuses[0].State.Waiting.String() + } + if pod.Status.ContainerStatuses[0].State.Terminated != nil { + status = status + pod.Status.ContainerStatuses[0].State.Terminated.String() + } + } + if strings.Contains(status, "ContainerStateRunning") { + logrus.Infof("Scaling: POD started pod_version: %d", pod_version, " ss_version: %d", ss_version, " Status: %s", status) + ret = nil + break + } + } + } + time.Sleep(1 * time.Second) + } + return ret +} +func get_masterIP(k *K8sutil, namespace, clusterName string) string { + listN, _ := k.Kclient.CoreV1().Services(namespace).List(metav1.ListOptions{}) + for _, s := range listN.Items { + service_name := "elasticsearch-" + clusterName + if service_name == s.Name { + if len(s.Spec.ExternalIPs) > 0 && (len(s.Spec.Ports) > 0) { + ret := s.Spec.ExternalIPs[0] + ":" + strconv.Itoa(int(s.Spec.Ports[0].NodePort)) + logrus.Infof("Scaling: MasterIP external port: %s", ret) + return ret + } + if len(s.Spec.ClusterIP) > 0 && (len(s.Spec.Ports) > 0) { + ret := s.Spec.ClusterIP + ":" + strconv.Itoa(int(s.Spec.Ports[0].Port)) + logrus.Infof("Scaling: MasterIP Internal port: %s", ret) + return ret + } + } + } + return "" +} +func scale_datanode(k *K8sutil, namespace, clusterName, statefulSetName, podName string, resources myspec.Resources, javaOptions string, statefulSet *v1beta2.StatefulSet, pod_index int32, masterip string) error { + // Step-3: ES-change: check if ES cluster is green state, suppose if one of the data node is down and state is yellow then do not proceed with scaling. + if err := ES_checkForGreen(masterip); err != nil { + err = fmt.Errorf("Scaling: ES cluster is not in green state") + return err + } + + // Step-2: ES-chanage: change default time from 1 min to 3 to n min to avoid copying of shards belonging to the data node that is going to be scaled. + if err := ES_change_settings(masterip, MAX_EsWaitForDataNode, "request"); err != nil { // TODO before overwriting save the orginal setting + return err + } + + // Step-4: scale the Data node by deleting the POD + deletePolicy := metav1.DeletePropagationForeground + + err := k.Kclient.Core().Pods(namespace).Delete(podName, &metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy}) + if err != nil { + logrus.Error("ERROR: Scaling: Could not delete POD: ", err) + return err + } + + // Step-5: check if the POD is restarted and in running state from k8 point of view. + if err = k8_check_DataNodeRestarted(namespace, statefulSetName, podName, k); err != nil { + return err + } + + // Step-6: check if the POD is up from the ES point of view. + if err = ES_checkForNodeUp(masterip, podName, MAX_EsCommunicationTime); err != nil { + return err + } + + // Step-7: check if all shards are registered with Master, At this ES should turn in to green from yellow, now it is safe to scale next data node. + if err = ES_checkForShards(masterip, podName, MAX_EsCommunicationTime); err != nil { + return err + } + // Step-8: Undo the timeout settings + if err := ES_change_settings(masterip, "1m", "async"); err != nil { + return err + } + logrus.Infof("Scaling:------------- sucessfully Completed for: %s :--------------------------------", podName) + return nil +} + +func scale_statefulset(k *K8sutil, namespace, clusterName, statefulSetName string, resources myspec.Resources, javaOptions string, statefulSet *v1beta2.StatefulSet, replicas int32) error { + cpu, _ := resource.ParseQuantity(resources.Requests.CPU) + memory, _ := resource.ParseQuantity(resources.Requests.Memory) + + // Step-1: check if there is any change in the 3-resources: javaoptions,cpu and memory + resourceMatch := true + if cpu != statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["cpu"] { + logrus.Infof("Scaling: cpu changed by user: ", cpu, " current value: ", statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["cpu"]) + resourceMatch = false + statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["cpu"] = cpu + } + if memory != statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["memory"] { + // TODO: there is a slight mismatch, Memory Changed USER: %!(EXTRA resource.Quantity={{1073741824 0} {} BinarySI}, string= from k8: , resource.Quantity={{1073741824 0} {} 1Gi BinarySI}) + //logrus.Infof(" Memory Changed USER: ",memory," from k8: ",statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["memory"].i.value) + //resourceMatch = false + } + for index, env := range statefulSet.Spec.Template.Spec.Containers[0].Env { + if env.Name == "ES_JAVA_OPTS" { + if env.Value != javaOptions { + logrus.Infof("Scaling: java_opts Changed by user: ", javaOptions, " current value: ", env.Value, " name: ", statefulSetName) + resourceMatch = false + statefulSet.Spec.Template.Spec.Containers[0].Env[index].Value = javaOptions + } + break + } + } + + masterip := get_masterIP(k, namespace, clusterName) + if masterip == "" { + err := fmt.Errorf("MasterIP is empty") + return err + } + + if !resourceMatch { + logrus.Infof("Scaling:STARTED scaling with new resources ... : %s", statefulSetName) + // TODO : only memory request is updated, memory limit as need to be updated. + statefulSet.Spec.Template.Spec.Containers[0].Resources.Requests["memory"] = memory + + _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Update(statefulSet) + if err != nil { + logrus.Error("ERROR: Scaling: Could not update statefulSet: ", err) + return err + } + + var index int32 = 0 + for index = 0; index < replicas; index++ { + podName := fmt.Sprintf("%s-%d", statefulSetName, index) + ret := scale_datanode(k, namespace, clusterName, statefulSetName, podName, resources, javaOptions, statefulSet, index,masterip) + if ret != nil { + return ret + } + } + }else{ + // check if Scaling is stopped half the way, this may be because server is stopped or for various reasons */ + var index int32 = 0 + for index = 0; index < replicas; index++ { + podName := fmt.Sprintf("%s-%d", statefulSetName, index) + if (compare_pod_vs_statefulset(namespace, statefulSetName, podName, k)){ + continue + } + ret := scale_datanode(k, namespace, clusterName, statefulSetName, podName, resources, javaOptions, statefulSet, index,masterip) + if ret != nil { + return ret + } + } + } + return nil +} diff --git a/pkg/k8sutil/scaling_test.go b/pkg/k8sutil/scaling_test.go new file mode 100644 index 000000000..8bb9e3d88 --- /dev/null +++ b/pkg/k8sutil/scaling_test.go @@ -0,0 +1,36 @@ +package k8sutil + +import ( + "testing" +) + + +const ( + NonExistingIP = "10.1.1.1" /* some non existing IP */ + NonExistingPOD = "nonexistingpodname" +) + +/* + Test for Failures, by sending the unrechable ES Master IP, the below function under test should always return the error, + incase if testing function does not return the error then the unit test should fail. +*/ +func Test_scaling_change_setting(t *testing.T) { + err := ES_change_settings(NonExistingIP, "1m", "request"); + if (err == nil){ + t.Errorf("Scaling unittest change setting failed"); + } +} + +func Test_check_for_green(t *testing.T) { + err := ES_checkForGreen(NonExistingIP); + if (err == nil){ + t.Errorf("Scaling unittest check_for_green failed"); + } +} + +func Test_check_for_nodeUp(t *testing.T) { + err := ES_checkForNodeUp(NonExistingIP, NonExistingPOD, 5); + if (err == nil){ + t.Errorf("Scaling unittest check_for_nodeUp failed"); + } +} diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index af09b7c07..bc9e7dd55 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -27,7 +27,6 @@ package processor import ( "fmt" "sync" - "github.com/upmc-enterprises/elasticsearch-operator/pkg/elasticsearchutil" "github.com/upmc-enterprises/elasticsearch-operator/pkg/snapshot" @@ -125,7 +124,8 @@ func (p *Processor) defaultUseSSL(specUseSSL *bool) bool { // Default to true if specUseSSL == nil { logrus.Infof("use-ssl not specified, defaulting to UseSSL=true") - return true + return false + // return true } else { logrus.Infof("use-ssl %v", *specUseSSL) return *specUseSSL @@ -171,6 +171,19 @@ func (p *Processor) refreshClusters() error { CronSchedule: cluster.Spec.Snapshot.CronSchedule, RepoRegion: cluster.Spec.Snapshot.RepoRegion, }, + Scaling: myspec.Scaling{ + JavaOptions: cluster.Spec.Scaling.JavaOptions, + Resources: myspec.Resources{ + Limits: myspec.MemoryCPU{ + Memory: cluster.Spec.Scaling.Resources.Limits.Memory, + CPU: cluster.Spec.Scaling.Resources.Limits.CPU, + }, + Requests: myspec.MemoryCPU{ + Memory: cluster.Spec.Scaling.Resources.Requests.Memory, + CPU: cluster.Spec.Scaling.Resources.Requests.CPU, + }, + }, + }, Storage: myspec.Storage{ StorageType: cluster.Spec.Storage.StorageType, StorageClassProvisoner: cluster.Spec.Storage.StorageClassProvisoner, @@ -364,13 +377,13 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) logrus.Error("Error creating client service ", err) return err } - + if err := p.k8sclient.CreateClientDeployment(baseImage, &c.Spec.ClientNodeReplicas, c.Spec.JavaOptions, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.UseSSL); err != nil { logrus.Error("Error creating client deployment ", err) return err } - + zoneCount := 0 if len(c.Spec.Zones) != 0 { zoneCount = len(c.Spec.Zones) @@ -390,17 +403,30 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) for index, count := range zoneDistributionMaster { if err := p.k8sclient.CreateDataNodeDeployment("master", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, - c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations, false); err != nil { logrus.Error("Error creating master node deployment ", err) return err } } // Create Data Nodes + javaOptions := c.Spec.JavaOptions + resources := c.Spec.Resources + scaling := false + if len(c.Spec.Scaling.JavaOptions) != 0 { + javaOptions = c.Spec.Scaling.JavaOptions + scaling = true + } + if len(c.Spec.Scaling.Resources.Requests.CPU) != 0 { + resources = c.Spec.Scaling.Resources + scaling = true + } + + logrus.Info("Scaling enabled: ",scaling," zone distribution JavaOptions:",javaOptions," Resources: ",resources) for index, count := range zoneDistributionData { - if err := p.k8sclient.CreateDataNodeDeployment("data", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, c.Spec.Resources, + if err := p.k8sclient.CreateDataNodeDeployment("data", &count, baseImage, c.Spec.Zones[index], c.Spec.DataDiskSize, resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, - c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.ObjectMeta.Namespace, javaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations,scaling); err != nil { logrus.Error("Error creating data node deployment ", err) return err @@ -416,21 +442,35 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) // Create Master Nodes if err := p.k8sclient.CreateDataNodeDeployment("master", func() *int32 { i := int32(c.Spec.MasterNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass, c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, - c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { + c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations, false); err != nil { logrus.Error("Error creating master node deployment ", err) return err } // Create Data Nodes - if err := p.k8sclient.CreateDataNodeDeployment("data", func() *int32 { i := int32(c.Spec.DataNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass, - c.Spec.DataDiskSize, c.Spec.Resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, - c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, c.Spec.JavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL); err != nil { - logrus.Error("Error creating data node deployment ", err) - return err + javaOptions := c.Spec.JavaOptions + resources := c.Spec.Resources + scaling := false + if len(c.Spec.Scaling.JavaOptions) != 0 { + javaOptions = c.Spec.Scaling.JavaOptions + scaling = true + } + if len(c.Spec.Scaling.Resources.Requests.CPU) != 0 { + resources = c.Spec.Scaling.Resources + scaling = true } - } + logrus.Info("Scaling enabled: ",scaling," JavaOptions:",javaOptions," Resources: ",resources) + + if err := p.k8sclient.CreateDataNodeDeployment("data", func() *int32 { i := int32(c.Spec.DataNodeReplicas); return &i }(), baseImage, c.Spec.Storage.StorageClass, + c.Spec.DataDiskSize, resources, c.Spec.ImagePullSecrets, c.Spec.ImagePullPolicy, c.Spec.ServiceAccountName, c.ObjectMeta.Name, + c.Spec.Instrumentation.StatsdHost, c.Spec.NetworkHost, c.ObjectMeta.Namespace, javaOptions, c.Spec.MasterJavaOptions, c.Spec.DataJavaOptions, c.Spec.UseSSL, c.Spec.Scheduler.ElasticURL, c.Spec.NodeSelector, c.Spec.Tolerations, scaling); err != nil { + logrus.Error("Error creating data node deployment ", err) + return err + } + } + // Deploy Kibana if c.Spec.Kibana.Image != "" { @@ -475,7 +515,7 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) } } - + // Setup CronSchedule p.clusters[fmt.Sprintf("%s-%s", c.ObjectMeta.Name, c.ObjectMeta.Namespace)].Scheduler.Init() logrus.Println("--------> ElasticSearch Event finished!")