Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling new #277

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ helm-package:
helm repo index --merge charts/index.yaml charts

test: clean
go test $$(go list ./... | grep -v /vendor/)
go test -v $$(go list ./... | grep -v /vendor/)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ $ go get -u github.com/cloudflare/cfssl/cmd/cfssl
$ go get -u github.com/cloudflare/cfssl/cmd/cfssljson
$ go run cmd/operator/main.go --kubecfg-file=${HOME}/.kube/config
```
# Scaling feature:

[Details of scaling](docs/scaling.md)

# About
Built by UPMC Enterprises in Pittsburgh, PA. http://enterprises.upmc.com/
127 changes: 127 additions & 0 deletions docs/scaling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@

# Patch Details:

The following are changes present in scaling patch

- Scaling is optional feature: Seperate section is defined for scaling as shown in below example spec. If the scaling section is not present then entire scaling feature will be disabled.
- Added changes to support local disk, but it is not tested for multiple nodes, MAY need to add affinity between POD and node.
- when Scaling will be triggered: Scaling will be triggered if there is any change in the one of the following 3 fields inside the scaling section: javaoptions,cpu and memory. javaoptions and resources entries corresponds only to non-data nodes incase scaling section is present. If scaling section is abscent then it corresponds to all nodes.
- JavaOptions:
- CPU inside resources : number of cpu cores.
- Memory inside resources : Memory size.
- Steps involved in vertical scaling of Elastic cluster: Repeating the following steps for each data node one after another, if there is any failure then scaling will be halted for the rest of data nodes:
- Step-1: check if there is any changes in 3-resources inside scaling section: javaoptions,cpu and memory.
- Step-2: ES-setting-change: change default time from 1 min to 6 min to avoid copying of shards belonging to the data node that is going to be scaled.
- step-2: ES-setting-change: change translong durability from async to sync/request basis, this is to make no data is loss.
- Step-3: ES-change: check if ES cluster is green state, suppose if one of the data node is down and state is yellow then do not proceed with scaling.
- Step-4: scale the Data node by updating the new resources in the statefull set. Here the Data node will be restarted.
- Step-5: check if the POD is restarted and in running state from k8 point of view.
- Step-6: check if the POD is up from the ES point of view. means Data node is register with the master.
- Step-7: check if all shards are registered with Master, At this ES should turn in to green from yellow, now it is safe to scale next data node.
- Step-8: Undo the settings done in step-2.
- Future Enhancements:
- Horizontal scaling of Data nodes: increasing the "data-node-replica" will function as expected, but if "data-node-replica" is decreased by more then 2 then the Elastic search cluster can enter in to red state and there will be data loss, this can prevented by executing similar to vertical scaling one after another without entering into red state.
- Picking "masternodeip" from services k8 objects instead from user.
- Vertical scaling improvements: periodic scaling: currently vertical scaling is triggered from user, instead it can be triggered based on time automatically.
- Multiple threads: Currently vertical for each elasticsearch cluster takes considerable amount of time, during this time other elastic cluster MAY not be done, this can be parallelised by running scaling operation in multiple threads.
- Dependency on ElasticSearch Version: It it depends on elasticsearch version as 6.3, if there is lot of changes in api's then scaling operation will fail.
- Local disk support: added changes to support local disk, but not tested with multiple nodes, it need to check that the pod will have affinity to the node, this can be enforced during the statefull set creation..
- Usecases:
- case1: scale down during start of non-peak time , and scale-up during the start of peak time every day. doing scaling operation twice in a day, this can done using a cron job without interrupting the service.
- case2: scale down and scale up once in a while.


```
Example Spec containing optional scaling section

apiVersion: enterprises.upmc.com/v1
kind: ElasticsearchCluster
metadata:
clusterName: ""
creationTimestamp: 2018-11-19T08:49:23Z
generation: 1
name: es-cluster
namespace: default
resourceVersion: "1636643"
selfLink: /apis/enterprises.upmc.com/v1/namespaces/default/elasticsearchclusters/es-cluster
uid: 03267c65-ebd8-11e8-8e6a-000d3a000cf8
spec:
cerebro:
image: hub.docker.prod.walmart.com/upmcenterprises/cerebro:0.6.8
client-node-replicas: 1
data-node-replicas: 4
data-volume-size: 10Gi
elastic-search-image: quay.docker.prod.walmart.com/pires/docker-elasticsearch-kubernetes:6.3.2
java-options: -Xms1052m -Xmx1052m
master-node-replicas: 1
network-host: 0.0.0.0
resources:
limits:
cpu: 4m
memory: 3048Mi
requests:
cpu: 3m
memory: 2024Mi
scaling:
java-options: -Xms1078m -Xmx1078m
resources:
limits:
cpu: 5m
memory: 2048Mi
requests:
cpu: 4m
memory: 1024Mi
storage:
storage-class: standard-disk
zones: []
```

# TestCases :

# Testcase-1 : Normal scaling operation on Network block storage.
- Description: Scaling operation can be triggered by changing the scaling parameters in ElasticSearch Cluster configuration,this can be done using "kubectl edit ..". If there is any change in jvm-heap memory in java-options or cpu cores inside the scaling section then scaling operation will be triggered. This can be observed in elastic search operator log. During scaling operation, the following should be full filled: a) At any time only one data node should be restarted, b) The State of ElasticSearch cluster should not be entered into Red state anytime during scaling operation. In case if the Elasticsearch cluster enter in to Redstate then the scaling operation will be automatically halted by the elasaticsearch opeartor.
- Steps to Reproduce the test:
- step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file.
- step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log.
- step-3: elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green.
- Test Status: Passed.

# Testcase-2 : Stop and start the ES operator during scaling operation.
- Description: when the scaling operation is halfway, stop the elasticsearch operator. example: out 20 nodes, 10 nodes have completed scaling, during that time stop the elastic operator, and start the elasticsearch operator after few minutes, when the elasticsearch operator is started then scaling of the rest of nodes should continue where it was stopped last time instead of starting from the beginning.
- Steps to Reproduce the test:
- step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file.
- step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log.
- step-3: Elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green.
- step-4: After scaling is half way of the data nodes, stop the elastic search operator.example: out of 20 nodes, after completing 10 nodes, stop the ES operator.
- step-5: Wait for 10 min, and start ES operator
- step-6: When the ES operator is started, rest of the datanodes will get scaled without starting from the start.
- step-7: check in elasticsearch cluster the starting time of each data node. half of the data nodes would have started 10 min later.
- Test Status: Passed.

# Testcase-3 : Trigger scaling operation when Elastic cluster is in "Yellow/Red" state
- Description: When the Elastic cluster in non-green state, and if scaling operation is started it should not start scaling operation. means not single data pod should be restarted. Scaling operation should be started only if the Elastic cluster is in green state.
- Steps to Reproduce the test:
- step-1: make the elastic search cluster to change in to yellow state by stopping one of the data node or by other means.
- step-2: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file.
- step-3: After step-2, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log.
- step-4: scaling of Data nodes will started, but immedietly it will generate an error saying ES cluster is in yellow state and scaling operation is aborted.
- Test Status : Passed.

# Testcase-4 : Trigger of scaling operation: changes in Non-scaling parameter
- Description: If there is any change in non-scalar parameter, then scaling operation should not be triggered. Scaling operation should be triggered only of there is change in Java-options, cpu or memory inside scaling section. To trigger scaling atleast any one of jvm heap or cpu is required.
- Steps to Reproduce the test:
- step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters not related to scaling.
- step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, but scaling operaton of data nodes will not started saying there is no change in memory and cpu.
- Test Status: Passed

# Testcase-5 : Normal scaling operation on local/nfs storage.
- Description: Similar to Testcase-1 except localdisk is set as storage-class instead of network block storage.
- Steps to Reproduce the test:
- step-1: edit the configuration of elastic cluster, this can done using "kubectl edit,..", change the parameters like jvm heap memory size or cpu cores inside the scaling section and save the file.
- step-2: After step-1, elastic search operator will receive the signal about the change in configuration of cluster, then the scaling code will check if there is any change in parameters related to scaling, this triggers scaling operation, this can be monitored in operator log.
- step-3: elasticsearch operator will update the data node pods one after another, and making sure the elastic search cluster is in yellow/green.
- Test status : Failed .
- Reason for Failure: Since all data nodes share same statefull set, the mount point for all data nodes is same, due to this second data nodes will not comesup, this need to addressed in future.



14 changes: 13 additions & 1 deletion pkg/apis/elasticsearchoperator/v1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ type ClusterSpec struct {

// Snapshot defines how snapshots are scheduled
Snapshot Snapshot `json:"snapshot"`


// Scaling defines how data nodes does virtual-scaling
Scaling Scaling `json:"scaling"`

// Storage defines how volumes are provisioned
Storage Storage `json:"storage"`

Expand Down Expand Up @@ -150,6 +153,15 @@ type ImagePullSecrets struct {
Name string `json:"name"`
}

// Scaling defines all params for vertical scaling of data nodes
type Scaling struct {
// Resources defines memory / cpu constraints
Resources Resources `json:"resources"`

// JavaOptions defines args passed to elastic nodes
JavaOptions string `json:"java-options"`
}

// Snapshot defines all params to create / store snapshots
type Snapshot struct {
// Enabled determines if snapshots are enabled
Expand Down
135 changes: 135 additions & 0 deletions pkg/k8sutil/es_crud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@


package k8sutil

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/Sirupsen/logrus"
)


/*
TODO:
Assuming orginal settings are default values like delayed_timeout is 1m and translog.durability is async, incase if the user is changed then
user changed settings will be lost after scaling operations.
this can be corrected by reading the setting before change.
*/

func ES_change_settings(es_ip , duration,translog_durability string)(error){
var ret error
ret = errors.New("Scaling: error in setting delay timeout")

logrus.Infof("Scaling: change ES setting: delay timeout:%s and translog durability: %s", duration, translog_durability)
client := &http.Client{
}
body:="{ "
body = body + "\"settings\": { \"index.unassigned.node_left.delayed_timeout\": \""+duration+"\" ,"
body = body + "\"index.translog.durability\": \""+translog_durability +"\" }"
body = body + " }"

req, _ := http.NewRequest("PUT", "http://"+es_ip+"/_all/_settings", bytes.NewBufferString(body))
req.Header.Add("Content-Type", `application/json`)
resp, _ := client.Do(req)
if (resp == nil){
return ret;
}
data, _ := ioutil.ReadAll(resp.Body)

if (resp.StatusCode == 200){
ret = nil
}else{
if (strings.Contains(string(data), "index_not_found_exception")){
ret = nil /* if there are no indexes , then this function need po pass */
fmt.Println("WARNING in setting delaytimeout: response: ",resp, string(data))
return ret
}
//string str:= "Scaling: setting delaytimeout: response: " + resp + string(data)
ret = errors.New("Scaling: setting delaytimeout: response: ")
}
return ret;
}
func ES_checkForGreen(es_ip string)(error){
logrus.Infof("Scaling: ES checking for green ")
return ES_checkForShards(es_ip , "", MAX_EsCommunicationTime)
}
func util_wordcount(input string, nodename string) (int,int) {
node_count := 0
unassigned_count :=0
initializing_count :=0
words := strings.Fields(input)
for _, word := range words {
if (nodename != "" && word == nodename){
node_count++
} else if (word == "UNASSIGNED"){
unassigned_count++
}
if (word == "INITIALIZING"){
initializing_count++
}
}
//fmt.Println(nodename," shards: ",node_count," UNASSIGNED shards: ",unassigned_count," initialising shards: ",initializing_count)
return node_count,unassigned_count+initializing_count
}

func ES_checkForShards(es_ip string, nodeName string, waitSeconds int)(error) {
var ret error
ret = errors.New("still unassigned shards are there")

logrus.Infof("Scaling: ES checking for shards name: %s",nodeName)
for i := 0; i < waitSeconds; i++ {
response, err := http.Get("http://" + es_ip + "/_cat/shards")
if err != nil {
logrus.Infof("Error: The HTTP request failed with error %s\n", err)
} else {
data, _ := ioutil.ReadAll(response.Body)
_,unassigned_count := util_wordcount(string(data), nodeName)
if (unassigned_count>0) {
time.Sleep(1 * time.Second)
continue
} else {

if (response.StatusCode == 200){
ret = nil
}else{
logrus.Infof("Scaling: Error checkForShards response :%s: statuscode %d:",data,response.StatusCode)
}
break
}
}
time.Sleep(1 * time.Second)
}

return ret;
}

func ES_checkForNodeUp(es_ip string, nodeName string, waitSeconds int)(error){
var ret error
ret = errors.New("ES node is not up")

logrus.Infof("Scaling: ES checking if the data node: %s joined master ",nodeName)
for i := 0; i < waitSeconds; i++ {
response, err := http.Get("http://" + es_ip + "/_cat/nodes?v&h=n,ip,v")
if err != nil {
fmt.Printf("Scaling: The HTTP request failed with error %s\n", err)
ret = err
} else {
data, _ := ioutil.ReadAll(response.Body)
str_ret := strings.Contains(string(data), nodeName)
if (str_ret){
ret = nil
return ret;
}
}
time.Sleep(1 * time.Second)
}

return ret;
}


24 changes: 22 additions & 2 deletions pkg/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s
},
Spec: apps.StatefulSetSpec{
Replicas: replicas,
UpdateStrategy: apps.StatefulSetUpdateStrategy {
Type: apps.OnDeleteStatefulSetStrategyType,
},
ServiceName: statefulSetName,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down Expand Up @@ -533,6 +536,14 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s
Name: "CLUSTER_NAME",
Value: clusterName,
},
v1.EnvVar{
Name: "NODE_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
v1.EnvVar{
Name: "NODE_MASTER",
Value: isNodeMaster,
Expand Down Expand Up @@ -667,7 +678,7 @@ func buildStatefulSet(statefulSetName, clusterName, deploymentType, baseImage, s

// CreateDataNodeDeployment creates the data node deployment
func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources,
imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string, nodeSelector map[string]string, tolerations []v1.Toleration) error {
imagePullSecrets []myspec.ImagePullSecrets, imagePullPolicy, serviceAccountName, clusterName, statsdEndpoint, networkHost, namespace, javaOptions, masterJavaOptions, dataJavaOptions string, useSSL *bool, esUrl string, nodeSelector map[string]string, tolerations []v1.Toleration, scaling bool) error {

deploymentName, _, _, _ := processDeploymentType(deploymentType, clusterName)

Expand All @@ -692,7 +703,16 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3
logrus.Error("Could not get stateful set! ", err)
return err
}

if deploymentType == "data" && scaling {
esJavaOps := ""
if dataJavaOptions != "" {
esJavaOps = dataJavaOptions
} else {
esJavaOps = javaOptions
}
ret := scale_statefulset(k, namespace, clusterName, statefulSetName, resources, esJavaOps, statefulSet, *replicas)
return ret
}
//scale replicas?
if statefulSet.Spec.Replicas != replicas {
currentReplicas := *statefulSet.Spec.Replicas
Expand Down
Loading