Skip to content

Commit

Permalink
Merge pull request #175 from splisson-altair/fix-PR-#161
Browse files Browse the repository at this point in the history
Fixes for usage of use-ssl option
  • Loading branch information
stevesloka authored Apr 27, 2018
2 parents 31eae2d + a76a162 commit 872dad5
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 35 deletions.
3 changes: 2 additions & 1 deletion pkg/apis/elasticsearchoperator/v1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ type ClusterSpec struct {
//KeepSecretsOnDelete tells the operator to not delete secrets when a cluster is destroyed
KeepSecretsOnDelete bool `json:"keep-secrets-on-delete"`

UseSSL bool `json:"use-ssl"`
// Use SSL for clients connections
UseSSL *bool `json:"use-ssl,omitempty"`
}

// ImagePullSecrets defines credentials to pull image from private repository
Expand Down
33 changes: 24 additions & 9 deletions pkg/k8sutil/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (k *K8sutil) DeleteDeployment(clusterName, namespace, deploymentType string

// CreateClientDeployment creates the client deployment
func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, javaOptions string,
resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace string, useSSL bool) error {
resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool) error {

component := fmt.Sprintf("elasticsearch-%s", clusterName)
discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName)
Expand All @@ -108,6 +108,11 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java
// Check if deployment exists
deployment, err := k.Kclient.ExtensionsV1beta1().Deployments(namespace).Get(deploymentName, metav1.GetOptions{})

enableSSL := "false"
if useSSL != nil && *useSSL {
enableSSL = "true"
}

if len(deployment.Name) == 0 {
logrus.Infof("Deployment %s not found, creating...", deploymentName)

Expand All @@ -117,7 +122,7 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java
requestCPU, _ := resource.ParseQuantity(resources.Requests.CPU)
requestMemory, _ := resource.ParseQuantity(resources.Requests.Memory)
scheme := v1.URISchemeHTTP
if useSSL {
if useSSL != nil && *useSSL {
scheme = v1.URISchemeHTTPS
}
probe := &v1.Probe{
Expand Down Expand Up @@ -195,6 +200,14 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java
Name: "HTTP_ENABLE",
Value: "true",
},
v1.EnvVar{
Name: "SEARCHGUARD_SSL_TRANSPORT_ENABLED",
Value: enableSSL,
},
v1.EnvVar{
Name: "SEARCHGUARD_SSL_HTTP_ENABLED",
Value: enableSSL,
},
v1.EnvVar{
Name: "ES_JAVA_OPTS",
Value: javaOptions,
Expand Down Expand Up @@ -297,17 +310,19 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java
}

// CreateKibanaDeployment creates a deployment of Kibana
func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace string, imagePullSecrets []myspec.ImagePullSecrets, elasticsearchUseSSL bool) error {
func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace string, imagePullSecrets []myspec.ImagePullSecrets, useSSL *bool) error {

replicaCount := int32(1)

component := fmt.Sprintf("elasticsearch-%s", clusterName)
elasticHTTPEndpoint := fmt.Sprintf("https://%s:9200", component)
if !elasticsearchUseSSL {
elasticHTTPEndpoint = fmt.Sprintf("http://%s:9200", component)
}

deploymentName := fmt.Sprintf("%s-%s", kibanaDeploymentName, clusterName)

enableSSL := "false"
if useSSL != nil && *useSSL {
enableSSL = "true"
}

// Check if deployment exists
deployment, err := k.Kclient.ExtensionsV1beta1().Deployments(namespace).Get(deploymentName, metav1.GetOptions{})
probe := &v1.Probe{
Expand Down Expand Up @@ -353,15 +368,15 @@ func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace strin
Env: []v1.EnvVar{
v1.EnvVar{
Name: "ELASTICSEARCH_URL",
Value: elasticHTTPEndpoint,
Value: GetESURL(component, useSSL),
},
v1.EnvVar{
Name: "ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES",
Value: fmt.Sprintf("%s/ca.pem", elasticsearchCertspath),
},
v1.EnvVar{
Name: "SERVER_SSL_ENABLED",
Value: "true",
Value: enableSSL,
},
v1.EnvVar{
Name: "SERVER_SSL_KEY",
Expand Down
41 changes: 32 additions & 9 deletions pkg/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,20 @@ func TemplateImagePullSecrets(ips []myspec.ImagePullSecrets) []v1.LocalObjectRef
return outSecrets
}

// GetESURL Returns Elasticsearch URL
func GetESURL(esHost string, useSSL *bool) string {

if useSSL == nil || !*useSSL {
return fmt.Sprintf("http://%s:9200", esHost)
}

return fmt.Sprintf("https://%s:9200", esHost)

}

// CreateDataNodeDeployment creates the data node deployment
func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources,
imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace, javaOptions string, useSSL bool) error {
imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace, javaOptions string, useSSL *bool) error {

var deploymentName, role, isNodeMaster, isNodeData string

Expand All @@ -378,6 +389,11 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3
isNodeData = "false"
}

enableSSL := "false"
if useSSL != nil && *useSSL {
enableSSL = "true"
}

component := fmt.Sprintf("elasticsearch-%s", clusterName)
discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName)
statefulSetName := fmt.Sprintf("%s-%s", deploymentName, storageClass)
Expand All @@ -396,7 +412,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3

logrus.Infof("StatefulSet %s not found, creating...", statefulSetName)
scheme := v1.URISchemeHTTP
if useSSL {
if useSSL != nil && *useSSL {
scheme = v1.URISchemeHTTPS
}
probe := &v1.Probe{
Expand Down Expand Up @@ -500,6 +516,14 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3
Name: "HTTP_ENABLE",
Value: "true",
},
v1.EnvVar{
Name: "SEARCHGUARD_SSL_TRANSPORT_ENABLED",
Value: enableSSL,
},
v1.EnvVar{
Name: "SEARCHGUARD_SSL_HTTP_ENABLED",
Value: enableSSL,
},
v1.EnvVar{
Name: "ES_JAVA_OPTS",
Value: javaOptions,
Expand Down Expand Up @@ -598,9 +622,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3
}
}

_, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet)

if err != nil {
if _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet); err != nil {
logrus.Error("Could not create stateful set: ", err)
return err
}
Expand All @@ -618,14 +640,16 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3

if err != nil {
logrus.Error("Could not scale statefulSet: ", err)
return err
}
}
}

return nil
}

func (k *K8sutil) CreateCerebroConfiguration(clusterName string) map[string]string {
// CreateCerebroConfiguration creates Cerebro configuration
func (k *K8sutil) CreateCerebroConfiguration(esHost string, useSSL *bool) map[string]string {

x := map[string]string{}
x["application.conf"] = fmt.Sprintf(`
Expand Down Expand Up @@ -657,10 +681,9 @@ data.path = "./cerebro.db"
hosts = [
{
host = "%s"
name = "es-servers"
name = "%s"
}
]
`, elasticsearchCertspath, elasticsearchCertspath, fmt.Sprintf("https://%s:9200",
fmt.Sprintf(fmt.Sprintf("elasticsearch-%s", clusterName))))
`, elasticsearchCertspath, elasticsearchCertspath, GetESURL(esHost, useSSL), esHost)
return x
}
26 changes: 26 additions & 0 deletions pkg/k8sutil/k8sutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package k8sutil

import (
"fmt"
"testing"
)

func TestGetESURL(t *testing.T) {

for _, v := range []struct {
host string
expected string
useSSL bool
}{
{"es-ssl", "https://es-ssl:9200", true},
{"es-bla", "http://es-bla:9200", false},
} {

esURL := GetESURL(v.host, &v.useSSL)

if esURL != v.expected {
t.Errorf(fmt.Sprintf("Expected %s, got %s", v.expected, esURL))
}

}
}
42 changes: 29 additions & 13 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func (p *Processor) WatchDataPodEvents(done chan struct{}, wg *sync.WaitGroup) {
}()
}

func (p *Processor) defaultUseSSL(specUseSSL *bool) bool {
// Default to true
if specUseSSL == nil {
logrus.Infof("use-ssl not specified, defaulting to UseSSL=true")
return true
} else {
logrus.Infof("use-ssl %v", *specUseSSL)
return *specUseSSL
}
}

func (p *Processor) refreshClusters() error {

for key, cluster := range p.clusters {
Expand All @@ -139,7 +150,7 @@ func (p *Processor) refreshClusters() error {

for _, cluster := range currentClusters.Items {
logrus.Infof("Found cluster: %s", cluster.ObjectMeta.Name)

useSSL := p.defaultUseSSL(cluster.Spec.UseSSL)
p.clusters[fmt.Sprintf("%s-%s", cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace)] = Cluster{
ESCluster: &myspec.ElasticsearchCluster{
Spec: myspec.ClusterSpec{
Expand Down Expand Up @@ -171,7 +182,7 @@ func (p *Processor) refreshClusters() error {
UserName: cluster.Spec.Snapshot.Authentication.UserName,
Password: cluster.Spec.Snapshot.Authentication.Password,
},
ElasticURL: p.k8sclient.GetClientServiceNameFullDNS(cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace),
ElasticURL: k8sutil.GetESURL(p.k8sclient.GetClientServiceNameFullDNS(cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace), cluster.Spec.UseSSL),
ClusterName: cluster.ObjectMeta.Name,
Namespace: cluster.ObjectMeta.Namespace,
},
Expand All @@ -194,7 +205,7 @@ func (p *Processor) refreshClusters() error {
Cerebro: myspec.Cerebro{
Image: cluster.Spec.Cerebro.Image,
},
UseSSL: cluster.Spec.UseSSL,
UseSSL: &useSSL,
},
},
Scheduler: snapshot.New(
Expand Down Expand Up @@ -254,6 +265,10 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)

logrus.Infof("Using [%s] as image for es cluster", baseImage)

// Default UseSSL to true
useSSL := p.defaultUseSSL(c.Spec.UseSSL)
c.Spec.UseSSL = &useSSL

if p.k8sclient.CertsSecretExists(c.ObjectMeta.Namespace, c.ObjectMeta.Name) == false {
// Create certs
logrus.Info("Creating new certs!")
Expand Down Expand Up @@ -366,27 +381,28 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster)

// Deploy Cerebro
if c.Spec.Cerebro.Image != "" {
host := fmt.Sprintf("elasticsearch-%s", c.ObjectMeta.Name)
cerebroConf := p.k8sclient.CreateCerebroConfiguration(host, c.Spec.UseSSL)
name := fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro")
if err := p.k8sclient.CreateCerebroDeployment(c.Spec.Cerebro.Image, c.ObjectMeta.Name, c.ObjectMeta.Namespace, name, c.Spec.ImagePullSecrets); err != nil {
logrus.Error("Error creating cerebro deployment ", err)
return err
}
// TODO create service

cerebroConf := p.k8sclient.CreateCerebroConfiguration(c.ObjectMeta.Name)

// create/update cerebro configMap
if p.k8sclient.ConfigmapExists(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro")) {
if err := p.k8sclient.UpdateConfigMap(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro"), cerebroConf); err != nil {
if p.k8sclient.ConfigmapExists(c.ObjectMeta.Namespace, name) {
if err := p.k8sclient.UpdateConfigMap(c.ObjectMeta.Namespace, name, cerebroConf); err != nil {
logrus.Error("Error updating configmap ", err)
return err
}
} else {
if err := p.k8sclient.CreateConfigMap(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro"), cerebroConf); err != nil {
if err := p.k8sclient.CreateConfigMap(c.ObjectMeta.Namespace, name, cerebroConf); err != nil {
logrus.Error("Error creating configmaop ", err)
return err
}
}

if err := p.k8sclient.CreateCerebroDeployment(c.Spec.Cerebro.Image, c.ObjectMeta.Name, c.ObjectMeta.Namespace, name, c.Spec.ImagePullSecrets); err != nil {
logrus.Error("Error creating cerebro deployment ", err)
return err
}

}

// Setup CronSchedule
Expand Down
25 changes: 24 additions & 1 deletion pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT

package processor

import "testing"
import (
"testing"
)

func TestBaseImage(t *testing.T) {
expectedImage := "foo/image"
Expand Down Expand Up @@ -111,3 +113,24 @@ func TestNoZonesPassed(t *testing.T) {
t.Error("Expected 1 zone, got ", len(zoneDistribution))
}
}

func TestDefaultUseSSL(t *testing.T) {
processor, _ := New(nil, "foo/image")

useSSL := processor.defaultUseSSL(nil)
if useSSL != true {
t.Errorf("Expected useSSL to default to true when not specified, got %v", useSSL)
}

useSSL = true
useSSL = processor.defaultUseSSL(&useSSL)
if useSSL != true {
t.Errorf("Expected useSSL to be true when specified as true, got %v", useSSL)
}

useSSL = false
useSSL = processor.defaultUseSSL(&useSSL)
if useSSL != false {
t.Errorf("Expected useSSL to be false when specified as false, got %v", useSSL)
}
}
3 changes: 1 addition & 2 deletions pkg/snapshot/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type Scheduler struct {
}

// New creates an instance of Scheduler
func New(bucketName, cronSchedule string, enabled bool, userName, password, svcURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler {
elasticURL := fmt.Sprintf("https://%s:9200", svcURL) // Internal service name of cluster
func New(bucketName, cronSchedule string, enabled bool, userName, password, elasticURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler {

return &Scheduler{
Kclient: kc,
Expand Down

0 comments on commit 872dad5

Please sign in to comment.