diff --git a/pkg/apis/elasticsearchoperator/v1/cluster.go b/pkg/apis/elasticsearchoperator/v1/cluster.go index 252b172bc..151b751b7 100644 --- a/pkg/apis/elasticsearchoperator/v1/cluster.go +++ b/pkg/apis/elasticsearchoperator/v1/cluster.go @@ -118,7 +118,8 @@ type ClusterSpec struct { //KeepSecretsOnDelete tells the operator to not delete secrets when a cluster is destroyed KeepSecretsOnDelete bool `json:"keep-secrets-on-delete"` - UseSSL bool `json:"use-ssl"` + // Use SSL for clients connections + UseSSL *bool `json:"use-ssl,omitempty"` } // ImagePullSecrets defines credentials to pull image from private repository diff --git a/pkg/k8sutil/deployments.go b/pkg/k8sutil/deployments.go index 612a18411..249d2ea83 100644 --- a/pkg/k8sutil/deployments.go +++ b/pkg/k8sutil/deployments.go @@ -96,7 +96,7 @@ func (k *K8sutil) DeleteDeployment(clusterName, namespace, deploymentType string // CreateClientDeployment creates the client deployment func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, javaOptions string, - resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace string, useSSL bool) error { + resources myspec.Resources, imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace string, useSSL *bool) error { component := fmt.Sprintf("elasticsearch-%s", clusterName) discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName) @@ -108,6 +108,11 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java // Check if deployment exists deployment, err := k.Kclient.ExtensionsV1beta1().Deployments(namespace).Get(deploymentName, metav1.GetOptions{}) + enableSSL := "false" + if useSSL != nil && *useSSL { + enableSSL = "true" + } + if len(deployment.Name) == 0 { logrus.Infof("Deployment %s not found, creating...", deploymentName) @@ -117,7 +122,7 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java requestCPU, _ := resource.ParseQuantity(resources.Requests.CPU) requestMemory, _ := resource.ParseQuantity(resources.Requests.Memory) scheme := v1.URISchemeHTTP - if useSSL { + if useSSL != nil && *useSSL { scheme = v1.URISchemeHTTPS } probe := &v1.Probe{ @@ -195,6 +200,14 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java Name: "HTTP_ENABLE", Value: "true", }, + v1.EnvVar{ + Name: "SEARCHGUARD_SSL_TRANSPORT_ENABLED", + Value: enableSSL, + }, + v1.EnvVar{ + Name: "SEARCHGUARD_SSL_HTTP_ENABLED", + Value: enableSSL, + }, v1.EnvVar{ Name: "ES_JAVA_OPTS", Value: javaOptions, @@ -297,17 +310,19 @@ func (k *K8sutil) CreateClientDeployment(baseImage string, replicas *int32, java } // CreateKibanaDeployment creates a deployment of Kibana -func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace string, imagePullSecrets []myspec.ImagePullSecrets, elasticsearchUseSSL bool) error { +func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace string, imagePullSecrets []myspec.ImagePullSecrets, useSSL *bool) error { replicaCount := int32(1) component := fmt.Sprintf("elasticsearch-%s", clusterName) - elasticHTTPEndpoint := fmt.Sprintf("https://%s:9200", component) - if !elasticsearchUseSSL { - elasticHTTPEndpoint = fmt.Sprintf("http://%s:9200", component) - } + deploymentName := fmt.Sprintf("%s-%s", kibanaDeploymentName, clusterName) + enableSSL := "false" + if useSSL != nil && *useSSL { + enableSSL = "true" + } + // Check if deployment exists deployment, err := k.Kclient.ExtensionsV1beta1().Deployments(namespace).Get(deploymentName, metav1.GetOptions{}) probe := &v1.Probe{ @@ -353,7 +368,7 @@ func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace strin Env: []v1.EnvVar{ v1.EnvVar{ Name: "ELASTICSEARCH_URL", - Value: elasticHTTPEndpoint, + Value: GetESURL(component, useSSL), }, v1.EnvVar{ Name: "ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES", @@ -361,7 +376,7 @@ func (k *K8sutil) CreateKibanaDeployment(baseImage, clusterName, namespace strin }, v1.EnvVar{ Name: "SERVER_SSL_ENABLED", - Value: "true", + Value: enableSSL, }, v1.EnvVar{ Name: "SERVER_SSL_KEY", diff --git a/pkg/k8sutil/k8sutil.go b/pkg/k8sutil/k8sutil.go index 445002575..49af21ec0 100644 --- a/pkg/k8sutil/k8sutil.go +++ b/pkg/k8sutil/k8sutil.go @@ -360,9 +360,20 @@ func TemplateImagePullSecrets(ips []myspec.ImagePullSecrets) []v1.LocalObjectRef return outSecrets } +// GetESURL Returns Elasticsearch URL +func GetESURL(esHost string, useSSL *bool) string { + + if useSSL == nil || !*useSSL { + return fmt.Sprintf("http://%s:9200", esHost) + } + + return fmt.Sprintf("https://%s:9200", esHost) + +} + // CreateDataNodeDeployment creates the data node deployment func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int32, baseImage, storageClass string, dataDiskSize string, resources myspec.Resources, - imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace, javaOptions string, useSSL bool) error { + imagePullSecrets []myspec.ImagePullSecrets, clusterName, statsdEndpoint, networkHost, namespace, javaOptions string, useSSL *bool) error { var deploymentName, role, isNodeMaster, isNodeData string @@ -378,6 +389,11 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 isNodeData = "false" } + enableSSL := "false" + if useSSL != nil && *useSSL { + enableSSL = "true" + } + component := fmt.Sprintf("elasticsearch-%s", clusterName) discoveryServiceNameCluster := fmt.Sprintf("%s-%s", discoveryServiceName, clusterName) statefulSetName := fmt.Sprintf("%s-%s", deploymentName, storageClass) @@ -396,7 +412,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 logrus.Infof("StatefulSet %s not found, creating...", statefulSetName) scheme := v1.URISchemeHTTP - if useSSL { + if useSSL != nil && *useSSL { scheme = v1.URISchemeHTTPS } probe := &v1.Probe{ @@ -500,6 +516,14 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 Name: "HTTP_ENABLE", Value: "true", }, + v1.EnvVar{ + Name: "SEARCHGUARD_SSL_TRANSPORT_ENABLED", + Value: enableSSL, + }, + v1.EnvVar{ + Name: "SEARCHGUARD_SSL_HTTP_ENABLED", + Value: enableSSL, + }, v1.EnvVar{ Name: "ES_JAVA_OPTS", Value: javaOptions, @@ -598,9 +622,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 } } - _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet) - - if err != nil { + if _, err := k.Kclient.AppsV1beta2().StatefulSets(namespace).Create(statefulSet); err != nil { logrus.Error("Could not create stateful set: ", err) return err } @@ -618,6 +640,7 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 if err != nil { logrus.Error("Could not scale statefulSet: ", err) + return err } } } @@ -625,7 +648,8 @@ func (k *K8sutil) CreateDataNodeDeployment(deploymentType string, replicas *int3 return nil } -func (k *K8sutil) CreateCerebroConfiguration(clusterName string) map[string]string { +// CreateCerebroConfiguration creates Cerebro configuration +func (k *K8sutil) CreateCerebroConfiguration(esHost string, useSSL *bool) map[string]string { x := map[string]string{} x["application.conf"] = fmt.Sprintf(` @@ -657,10 +681,9 @@ data.path = "./cerebro.db" hosts = [ { host = "%s" - name = "es-servers" + name = "%s" } ] - `, elasticsearchCertspath, elasticsearchCertspath, fmt.Sprintf("https://%s:9200", - fmt.Sprintf(fmt.Sprintf("elasticsearch-%s", clusterName)))) + `, elasticsearchCertspath, elasticsearchCertspath, GetESURL(esHost, useSSL), esHost) return x } diff --git a/pkg/k8sutil/k8sutil_test.go b/pkg/k8sutil/k8sutil_test.go new file mode 100644 index 000000000..bd87c7a21 --- /dev/null +++ b/pkg/k8sutil/k8sutil_test.go @@ -0,0 +1,26 @@ +package k8sutil + +import ( + "fmt" + "testing" +) + +func TestGetESURL(t *testing.T) { + + for _, v := range []struct { + host string + expected string + useSSL bool + }{ + {"es-ssl", "https://es-ssl:9200", true}, + {"es-bla", "http://es-bla:9200", false}, + } { + + esURL := GetESURL(v.host, &v.useSSL) + + if esURL != v.expected { + t.Errorf(fmt.Sprintf("Expected %s, got %s", v.expected, esURL)) + } + + } +} diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index bfda33359..81183ea15 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -119,6 +119,17 @@ func (p *Processor) WatchDataPodEvents(done chan struct{}, wg *sync.WaitGroup) { }() } +func (p *Processor) defaultUseSSL(specUseSSL *bool) bool { + // Default to true + if specUseSSL == nil { + logrus.Infof("use-ssl not specified, defaulting to UseSSL=true") + return true + } else { + logrus.Infof("use-ssl %v", *specUseSSL) + return *specUseSSL + } +} + func (p *Processor) refreshClusters() error { for key, cluster := range p.clusters { @@ -139,7 +150,7 @@ func (p *Processor) refreshClusters() error { for _, cluster := range currentClusters.Items { logrus.Infof("Found cluster: %s", cluster.ObjectMeta.Name) - + useSSL := p.defaultUseSSL(cluster.Spec.UseSSL) p.clusters[fmt.Sprintf("%s-%s", cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace)] = Cluster{ ESCluster: &myspec.ElasticsearchCluster{ Spec: myspec.ClusterSpec{ @@ -171,7 +182,7 @@ func (p *Processor) refreshClusters() error { UserName: cluster.Spec.Snapshot.Authentication.UserName, Password: cluster.Spec.Snapshot.Authentication.Password, }, - ElasticURL: p.k8sclient.GetClientServiceNameFullDNS(cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace), + ElasticURL: k8sutil.GetESURL(p.k8sclient.GetClientServiceNameFullDNS(cluster.ObjectMeta.Name, cluster.ObjectMeta.Namespace), cluster.Spec.UseSSL), ClusterName: cluster.ObjectMeta.Name, Namespace: cluster.ObjectMeta.Namespace, }, @@ -194,7 +205,7 @@ func (p *Processor) refreshClusters() error { Cerebro: myspec.Cerebro{ Image: cluster.Spec.Cerebro.Image, }, - UseSSL: cluster.Spec.UseSSL, + UseSSL: &useSSL, }, }, Scheduler: snapshot.New( @@ -254,6 +265,10 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) logrus.Infof("Using [%s] as image for es cluster", baseImage) + // Default UseSSL to true + useSSL := p.defaultUseSSL(c.Spec.UseSSL) + c.Spec.UseSSL = &useSSL + if p.k8sclient.CertsSecretExists(c.ObjectMeta.Namespace, c.ObjectMeta.Name) == false { // Create certs logrus.Info("Creating new certs!") @@ -366,27 +381,28 @@ func (p *Processor) processElasticSearchCluster(c *myspec.ElasticsearchCluster) // Deploy Cerebro if c.Spec.Cerebro.Image != "" { + host := fmt.Sprintf("elasticsearch-%s", c.ObjectMeta.Name) + cerebroConf := p.k8sclient.CreateCerebroConfiguration(host, c.Spec.UseSSL) name := fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro") - if err := p.k8sclient.CreateCerebroDeployment(c.Spec.Cerebro.Image, c.ObjectMeta.Name, c.ObjectMeta.Namespace, name, c.Spec.ImagePullSecrets); err != nil { - logrus.Error("Error creating cerebro deployment ", err) - return err - } - // TODO create service - - cerebroConf := p.k8sclient.CreateCerebroConfiguration(c.ObjectMeta.Name) // create/update cerebro configMap - if p.k8sclient.ConfigmapExists(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro")) { - if err := p.k8sclient.UpdateConfigMap(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro"), cerebroConf); err != nil { + if p.k8sclient.ConfigmapExists(c.ObjectMeta.Namespace, name) { + if err := p.k8sclient.UpdateConfigMap(c.ObjectMeta.Namespace, name, cerebroConf); err != nil { logrus.Error("Error updating configmap ", err) return err } } else { - if err := p.k8sclient.CreateConfigMap(c.ObjectMeta.Namespace, fmt.Sprintf("%s-%s", c.ObjectMeta.Name, "cerebro"), cerebroConf); err != nil { + if err := p.k8sclient.CreateConfigMap(c.ObjectMeta.Namespace, name, cerebroConf); err != nil { logrus.Error("Error creating configmaop ", err) return err } } + + if err := p.k8sclient.CreateCerebroDeployment(c.Spec.Cerebro.Image, c.ObjectMeta.Name, c.ObjectMeta.Namespace, name, c.Spec.ImagePullSecrets); err != nil { + logrus.Error("Error creating cerebro deployment ", err) + return err + } + } // Setup CronSchedule diff --git a/pkg/processor/processor_test.go b/pkg/processor/processor_test.go index 615779d8a..c4ee78139 100644 --- a/pkg/processor/processor_test.go +++ b/pkg/processor/processor_test.go @@ -24,7 +24,9 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT package processor -import "testing" +import ( + "testing" +) func TestBaseImage(t *testing.T) { expectedImage := "foo/image" @@ -111,3 +113,24 @@ func TestNoZonesPassed(t *testing.T) { t.Error("Expected 1 zone, got ", len(zoneDistribution)) } } + +func TestDefaultUseSSL(t *testing.T) { + processor, _ := New(nil, "foo/image") + + useSSL := processor.defaultUseSSL(nil) + if useSSL != true { + t.Errorf("Expected useSSL to default to true when not specified, got %v", useSSL) + } + + useSSL = true + useSSL = processor.defaultUseSSL(&useSSL) + if useSSL != true { + t.Errorf("Expected useSSL to be true when specified as true, got %v", useSSL) + } + + useSSL = false + useSSL = processor.defaultUseSSL(&useSSL) + if useSSL != false { + t.Errorf("Expected useSSL to be false when specified as false, got %v", useSSL) + } +} diff --git a/pkg/snapshot/scheduler.go b/pkg/snapshot/scheduler.go index 305a776ce..9b191ca43 100644 --- a/pkg/snapshot/scheduler.go +++ b/pkg/snapshot/scheduler.go @@ -52,8 +52,7 @@ type Scheduler struct { } // New creates an instance of Scheduler -func New(bucketName, cronSchedule string, enabled bool, userName, password, svcURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler { - elasticURL := fmt.Sprintf("https://%s:9200", svcURL) // Internal service name of cluster +func New(bucketName, cronSchedule string, enabled bool, userName, password, elasticURL, clusterName, namespace string, kc kubernetes.Interface) *Scheduler { return &Scheduler{ Kclient: kc,