diff --git a/.gitignore b/.gitignore index 7c7c0a68..795b0735 100644 --- a/.gitignore +++ b/.gitignore @@ -355,3 +355,10 @@ admin/Cargo.lock /target .envrc + +# locally generated certs for testing TLS +*.crt +*.pem +*.csr +*.srl +*.ext diff --git a/CHANGELOG.md b/CHANGELOG.md index bf046ace..a6ed0ce8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ This changelog keeps track of work items that have been completed and are ready ### Improvements -- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) +- **General**: Add configurable TLS on the wire support to the interceptor proxy ([#907](https://github.com/kedacore/http-add-on/issues/907)) ### Fixes diff --git a/Makefile b/Makefile index 7cb8d1da..1bb081b1 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,17 @@ GO_LDFLAGS="-X github.com/kedacore/http-add-on/pkg/build.version=${VERSION} -X g GIT_COMMIT ?= $(shell git rev-list -1 HEAD) GIT_COMMIT_SHORT ?= $(shell git rev-parse --short HEAD) +define DOMAINS +basicConstraints=CA:FALSE +keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment +subjectAltName = @alt_names +[alt_names] +DNS.1 = localhost +DNS.2 = *.keda +DNS.3 = *.interceptor-tls-test-ns +endef +export DOMAINS + # Build targets build-operator: @@ -45,8 +56,22 @@ build-scaler: build: build-operator build-interceptor build-scaler +# generate certs for local unit and e2e tests +rootca-test-certs: + mkdir -p certs + openssl req -x509 -nodes -new -sha256 -days 1024 -newkey rsa:2048 -keyout certs/RootCA.key -out certs/RootCA.pem -subj "/C=US/CN=Keda-Root-CA" + openssl x509 -outform pem -in certs/RootCA.pem -out certs/RootCA.crt + +test-certs: rootca-test-certs + echo "$$DOMAINS" > certs/domains.ext + openssl req -new -nodes -newkey rsa:2048 -keyout certs/tls.key -out certs/tls.csr -subj "/C=US/ST=KedaState/L=KedaCity/O=Keda-Certificates/CN=keda.local" + openssl x509 -req -sha256 -days 1024 -in certs/tls.csr -CA certs/RootCA.pem -CAkey certs/RootCA.key -CAcreateserial -extfile certs/domains.ext -out certs/tls.crt + +clean-test-certs: + rm -r certs || true + # Test targets -test: fmt vet +test: fmt vet test-certs go test ./... e2e-test: @@ -156,6 +181,12 @@ deploy: manifests kustomize ## Deploy to the K8s cluster specified in ~/.kube/co cd config/interceptor && \ $(KUSTOMIZE) edit add patch --path e2e-test/scaledobject.yaml --group keda.sh --kind ScaledObject --name interceptor --version v1alpha1 + cd config/interceptor && \ + $(KUSTOMIZE) edit add patch --path tls/deployment.yaml --group apps --kind Deployment --name interceptor --version v1 + + cd config/interceptor && \ + $(KUSTOMIZE) edit add patch --path tls/proxy.service.yaml --kind Service --name interceptor-proxy --version v1 + cd config/scaler && \ $(KUSTOMIZE) edit set image ghcr.io/kedacore/http-add-on-scaler=${IMAGE_SCALER_VERSIONED_TAG} @@ -174,3 +205,8 @@ kind-load: kind load docker-image ghcr.io/kedacore/http-add-on-operator:${VERSION} kind load docker-image ghcr.io/kedacore/http-add-on-interceptor:${VERSION} kind load docker-image ghcr.io/kedacore/http-add-on-scaler:${VERSION} + +k3d-import: + k3d image import ghcr.io/kedacore/http-add-on-operator:main + k3d image import ghcr.io/kedacore/http-add-on-interceptor:main + k3d image import ghcr.io/kedacore/http-add-on-scaler:main diff --git a/config/interceptor/tls/deployment.yaml b/config/interceptor/tls/deployment.yaml new file mode 100644 index 00000000..0b836524 --- /dev/null +++ b/config/interceptor/tls/deployment.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: interceptor +spec: + replicas: 1 + template: + spec: + containers: + - name: interceptor + ports: + - name: proxy-tls + containerPort: 8443 + env: + - name: KEDA_HTTP_PROXY_TLS_ENABLED + value: "true" + - name: KEDA_HTTP_PROXY_TLS_CERT_PATH + value: "/certs/tls.crt" + - name: KEDA_HTTP_PROXY_TLS_KEY_PATH + value: "/certs/tls.key" + - name: KEDA_HTTP_PROXY_TLS_PORT + value: "8443" + volumeMounts: + - readOnly: true + mountPath: "/certs" + name: certs + volumes: + - name: certs + secret: + secretName: keda-tls diff --git a/config/interceptor/tls/kustomization.yaml b/config/interceptor/tls/kustomization.yaml new file mode 100644 index 00000000..511a3a88 --- /dev/null +++ b/config/interceptor/tls/kustomization.yaml @@ -0,0 +1,5 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- deployment.yaml +- proxy.service.yaml diff --git a/config/interceptor/tls/proxy.service.yaml b/config/interceptor/tls/proxy.service.yaml new file mode 100644 index 00000000..08d0bbfe --- /dev/null +++ b/config/interceptor/tls/proxy.service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: interceptor-proxy +spec: + type: ClusterIP + ports: + - name: proxy-tls + protocol: TCP + port: 8443 + targetPort: proxy-tls diff --git a/docs/operate.md b/docs/operate.md index 9c066dfa..a3b67a9c 100644 --- a/docs/operate.md +++ b/docs/operate.md @@ -19,3 +19,9 @@ The OTEL exporter can be enabled by setting the `KEDA_HTTP_OTEL_HTTP_EXPORTER_EN If the collector is exposed on a unsecured endpoint then you can set the `KEDA_HTTP_OTEL_HTTP_COLLECTOR_INSECURE` environment variable to `true` (`false` by default) which will disable client security on the exporter. If you need to provide any headers such as authentication details in order to utilise your OTEL collector you can add them into the `KEDA_HTTP_OTEL_HTTP_HEADERS` environment variable. The frequency at which the metrics are exported can be configured by setting `KEDA_HTTP_OTEL_METRIC_EXPORT_INTERVAL` to the number of seconds you require between each export interval (`30` by default). + +# Configuring TLS for the KEDA HTTP Add-on interceptor proxy + +The interceptor proxy has the ability to run both a HTTP and HTTPS server simultaneously to allow you to scale workloads that use either protocol. By default, the interceptor proxy will only serve over HTTP, but this behavior can be changed by configuring the appropriate environment variables on the deployment. + +The TLS server can be enabled by setting the environment variable `KEDA_HTTP_PROXY_TLS_ENABLED` to `true` on the interceptor deployment (`false` by default). The TLS server will start on port `8443` by default, but this can be configured by setting `KEDA_HTTP_PROXY_TLS_PORT` to your desired port number. The TLS server will require valid TLS certificates to start, the path to the certificates can be configured via the `KEDA_HTTP_PROXY_TLS_CERT_PATH` and `KEDA_HTTP_PROXY_TLS_KEY_PATH` environment variables (`/certs/tls.crt` and `/certs/tls.key` by default). diff --git a/interceptor/config/serving.go b/interceptor/config/serving.go index 50267f51..fb191394 100644 --- a/interceptor/config/serving.go +++ b/interceptor/config/serving.go @@ -34,6 +34,15 @@ type Serving struct { // // This is the interval (in milliseconds) representing how often to do a fetch EndpointsCachePollIntervalMS int `envconfig:"KEDA_HTTP_ENDPOINTS_CACHE_POLLING_INTERVAL_MS" default:"250"` + // ProxyTLSEnabled is a flag to specify whether the interceptor proxy should + // be running using a TLS enabled server + ProxyTLSEnabled bool `envconfig:"KEDA_HTTP_PROXY_TLS_ENABLED" default:"false"` + // TLSCertPath is the path to read the certificate file from for the TLS server + TLSCertPath string `envconfig:"KEDA_HTTP_PROXY_TLS_CERT_PATH" default:"/certs/tls.crt"` + // TLSKeyPath is the path to read the private key file from for the TLS server + TLSKeyPath string `envconfig:"KEDA_HTTP_PROXY_TLS_KEY_PATH" default:"/certs/tls.key"` + // TLSPort is the port that the server should serve on if TLS is enabled + TLSPort int `envconfig:"KEDA_HTTP_PROXY_TLS_PORT" default:"8443"` } // Parse parses standard configs using envconfig and returns a pointer to the diff --git a/interceptor/main.go b/interceptor/main.go index a3812205..2d98776c 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "errors" "flag" "fmt" @@ -68,6 +70,7 @@ func main() { proxyPort := servingCfg.ProxyPort adminPort := servingCfg.AdminPort + proxyTLSEnabled := servingCfg.ProxyTLSEnabled // setup the configured metrics collectors metrics.NewMetricsCollectors(metricsCfg) @@ -160,12 +163,29 @@ func main() { }) } - // start the proxy server. this is the server that + // start the proxy servers. This is the server that // accepts, holds and forwards user requests + // start a proxy server with TLS + if proxyTLSEnabled { + eg.Go(func() error { + proxyTLSConfig := map[string]string{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath} + proxyTLSPort := servingCfg.TLSPort + + setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort) + + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { + setupLog.Error(err, "tls proxy server failed") + return err + } + return nil + }) + } + + // start a proxy server without TLS. eg.Go(func() error { - setupLog.Info("starting the proxy server", "port", proxyPort) + setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { setupLog.Error(err, "proxy server failed") return err } @@ -199,7 +219,7 @@ func runAdminServer( addr := fmt.Sprintf("0.0.0.0:%d", port) lggr.Info("admin server starting", "address", addr) - return kedahttp.ServeContext(ctx, addr, adminServer) + return kedahttp.ServeContext(ctx, addr, adminServer, false, nil) } func runMetricsServer( @@ -209,7 +229,7 @@ func runMetricsServer( ) error { lggr.Info("starting the prometheus metrics server", "port", metricsCfg.OtelPrometheusExporterPort, "path", "/metrics") addr := fmt.Sprintf("0.0.0.0:%d", metricsCfg.OtelPrometheusExporterPort) - return kedahttp.ServeContext(ctx, addr, promhttp.Handler()) + return kedahttp.ServeContext(ctx, addr, promhttp.Handler(), false, nil) } func runProxyServer( @@ -220,6 +240,8 @@ func runProxyServer( routingTable routing.Table, timeouts *config.Timeouts, port int, + tlsEnabled bool, + tlsConfig map[string]string, ) error { dialer := kedanet.NewNetDialer(timeouts.Connect, timeouts.KeepAlive) dialContextFunc := kedanet.DialContextWithRetry(dialer, timeouts.DefaultBackoff()) @@ -229,12 +251,33 @@ func runProxyServer( }) go probeHandler.Start(ctx) + tlsCfg := tls.Config{} + if tlsEnabled { + caCert, err := os.ReadFile(tlsConfig["certificatePath"]) + if err != nil { + logger.Error(fmt.Errorf("error reading file from TLSCertPath"), "error", err) + os.Exit(1) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + cert, err := tls.LoadX509KeyPair(tlsConfig["certificatePath"], tlsConfig["keyPath"]) + + if err != nil { + logger.Error(fmt.Errorf("error creating TLS configuration for proxy server"), "error", err) + os.Exit(1) + } + + tlsCfg.RootCAs = caCertPool + tlsCfg.Certificates = []tls.Certificate{cert} + } + var upstreamHandler http.Handler upstreamHandler = newForwardingHandler( logger, dialContextFunc, waitFunc, newForwardingConfigFromTimeouts(timeouts), + &tlsCfg, ) upstreamHandler = middleware.NewCountingMiddleware( q, @@ -246,6 +289,7 @@ func runProxyServer( routingTable, probeHandler, upstreamHandler, + tlsEnabled, ) rootHandler = middleware.NewLogging( logger, @@ -258,5 +302,5 @@ func runProxyServer( addr := fmt.Sprintf("0.0.0.0:%d", port) logger.Info("proxy server starting", "address", addr) - return kedahttp.ServeContext(ctx, addr, rootHandler) + return kedahttp.ServeContext(ctx, addr, rootHandler, tlsEnabled, tlsConfig) } diff --git a/interceptor/main_test.go b/interceptor/main_test.go index 4a39c28a..3a08be53 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -2,8 +2,11 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "net/http" + "os" "strconv" "testing" "time" @@ -74,6 +77,8 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { routingTable, timeouts, port, + false, + map[string]string{}, ) }) // wait for server to start @@ -147,3 +152,148 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { done() r.Error(g.Wait()) } + +func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) { + const ( + port = 8443 + host = "samplehost" + ) + r := require.New(t) + ctx, done := context.WithCancel( + context.Background(), + ) + defer done() + + originHdl := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }), + ) + originSrv, originURL, err := kedanet.StartTestServer(originHdl) + r.NoError(err) + defer originSrv.Close() + originPort, err := strconv.Atoi(originURL.Port()) + r.NoError(err) + g, ctx := errgroup.WithContext(ctx) + q := queue.NewFakeCounter() + + httpso := targetFromURL( + originURL, + originPort, + "testdepl", + "testsvc", + ) + namespacedName := k8s.NamespacedNameFromObject(httpso).String() + + // set up a fake host that we can spoof + // when we later send request to the proxy, + // so that the proxy calculates a URL for that + // host that points to the (above) fake origin + // server + routingTable := routingtest.NewTable() + routingTable.Memory[host] = httpso + + timeouts := &config.Timeouts{} + waiterCh := make(chan struct{}) + waitFunc := func(_ context.Context, _, _ string) (bool, error) { + <-waiterCh + return false, nil + } + + g.Go(func() error { + return runProxyServer( + ctx, + logr.Discard(), + q, + waitFunc, + routingTable, + timeouts, + port, + true, + map[string]string{"certificatePath": "../certs/tls.crt", "keyPath": "../certs/tls.key"}, + ) + }) + + // wait for server to start + time.Sleep(500 * time.Millisecond) + + // make an HTTPs request in the background + g.Go(func() error { + f, err := os.ReadFile("../certs/RootCA.pem") + if err != nil { + t.Errorf("Unable to find RootCA for test, please run tests via `make test`") + } + rootCAs, _ := x509.SystemCertPool() + rootCAs.AppendCertsFromPEM(f) + + http.DefaultClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: rootCAs}, + } + + req, err := http.NewRequest( + "GET", + fmt.Sprintf( + "https://localhost:%d", port, + ), nil, + ) + if err != nil { + return err + } + req.Host = host + // Allow us to use our self made certs + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf( + "unexpected status code: %d", + resp.StatusCode, + ) + } + if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" { + return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start")) + } + return nil + }) + time.Sleep(100 * time.Millisecond) + select { + case hostAndCount := <-q.ResizedCh: + r.Equal(namespacedName, hostAndCount.Host) + r.Equal(1, hostAndCount.Count) + case <-time.After(2000 * time.Millisecond): + r.Fail("timeout waiting for +1 queue resize") + } + + // tell the wait func to proceed + select { + case waiterCh <- struct{}{}: + case <-time.After(5 * time.Second): + r.Fail("timeout producing on waiterCh") + } + + select { + case hostAndCount := <-q.ResizedCh: + r.Equal(namespacedName, hostAndCount.Host) + r.Equal(1, hostAndCount.Count) + case <-time.After(2 * time.Second): + r.Fail("timeout waiting for -1 queue resize") + } + + // check the queue to make sure all counts are at 0 + countsPtr, err := q.Current() + r.NoError(err) + counts := countsPtr.Counts + r.Equal(1, len(counts)) + _, foundHost := counts[namespacedName] + r.True( + foundHost, + "couldn't find host %s in the queue", + host, + ) + r.Equal(0, counts[namespacedName].Concurrency) + + done() + r.Error(g.Wait()) +} diff --git a/interceptor/middleware/routing.go b/interceptor/middleware/routing.go index 41dd1081..197f5c50 100644 --- a/interceptor/middleware/routing.go +++ b/interceptor/middleware/routing.go @@ -21,13 +21,15 @@ type Routing struct { routingTable routing.Table probeHandler http.Handler upstreamHandler http.Handler + tlsEnabled bool } -func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler) *Routing { +func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, tlsEnabled bool) *Routing { return &Routing{ routingTable: routingTable, probeHandler: probeHandler, upstreamHandler: upstreamHandler, + tlsEnabled: tlsEnabled, } } @@ -63,6 +65,14 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) { + if rm.tlsEnabled { + return url.Parse(fmt.Sprintf( + "https://%s.%s:%d", + httpso.Spec.ScaleTargetRef.Service, + httpso.GetNamespace(), + httpso.Spec.ScaleTargetRef.Port, + )) + } //goland:noinspection HttpUrlsUsage return url.Parse(fmt.Sprintf( "http://%s.%s:%d", diff --git a/interceptor/middleware/routing_test.go b/interceptor/middleware/routing_test.go index 63c82479..b26f8086 100644 --- a/interceptor/middleware/routing_test.go +++ b/interceptor/middleware/routing_test.go @@ -23,7 +23,7 @@ var _ = Describe("RoutingMiddleware", func() { probeHandler.Handle("/probe", emptyHandler) upstreamHandler.Handle("/upstream", emptyHandler) - rm := NewRouting(routingTable, probeHandler, upstreamHandler) + rm := NewRouting(routingTable, probeHandler, upstreamHandler, false) Expect(rm).NotTo(BeNil()) Expect(rm.routingTable).To(Equal(routingTable)) Expect(rm.probeHandler).To(Equal(probeHandler)) @@ -58,7 +58,7 @@ var _ = Describe("RoutingMiddleware", func() { upstreamHandler = http.NewServeMux() probeHandler = http.NewServeMux() routingTable = routingtest.NewTable() - routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler) + routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, false) w = httptest.NewRecorder() diff --git a/interceptor/proxy_handlers.go b/interceptor/proxy_handlers.go index 4dbf96ff..e39cf6d1 100644 --- a/interceptor/proxy_handlers.go +++ b/interceptor/proxy_handlers.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "fmt" "net/http" "strconv" @@ -48,6 +49,7 @@ func newForwardingHandler( dialCtxFunc kedanet.DialContextFunc, waitFunc forwardWaitFunc, fwdCfg forwardingConfig, + tlsCfg *tls.Config, ) http.Handler { roundTripper := &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -58,6 +60,7 @@ func newForwardingHandler( TLSHandshakeTimeout: fwdCfg.tlsHandshakeTimeout, ExpectContinueTimeout: fwdCfg.expectContinueTimeout, ResponseHeaderTimeout: fwdCfg.respHeaderTimeout, + TLSClientConfig: tlsCfg, } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/interceptor/proxy_handlers_integration_test.go b/interceptor/proxy_handlers_integration_test.go index 61b9a012..6ae02691 100644 --- a/interceptor/proxy_handlers_integration_test.go +++ b/interceptor/proxy_handlers_integration_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "fmt" "net" "net/http" @@ -306,7 +307,9 @@ func newHarness( waitTimeout: activeEndpointsTimeout, respHeaderTimeout: time.Second, }, - )) + &tls.Config{}), + false, + ) proxySrv, proxySrvURL, err := kedanet.StartTestServer(proxyHdl) if err != nil { diff --git a/interceptor/proxy_handlers_test.go b/interceptor/proxy_handlers_test.go index b282e09f..a09517ca 100644 --- a/interceptor/proxy_handlers_test.go +++ b/interceptor/proxy_handlers_test.go @@ -2,10 +2,14 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "fmt" + "log" "net/http" "net/http/httptest" "net/url" + "os" "strconv" "strings" "testing" @@ -23,6 +27,25 @@ import ( "github.com/kedacore/http-add-on/pkg/util" ) +var TestTLSConfig = tls.Config{} + +func init() { + caCert, err := os.ReadFile("../certs/tls.crt") + if err != nil { + log.Fatalf("Error getting tests certs - make sure to run make test to generate them: %v", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + cert, err := tls.LoadX509KeyPair("../certs/tls.crt", "../certs/tls.key") + + if err != nil { + log.Fatalf("Error getting tests certs - make sure to run make test to generate them %v", err) + } + + TestTLSConfig.RootCAs = caCertPool + TestTLSConfig.Certificates = []tls.Certificate{cert} +} + // the proxy should successfully forward a request to a running server func TestImmediatelySuccessfulProxy(t *testing.T) { host := fmt.Sprintf("%s.testing", t.Name()) @@ -54,6 +77,7 @@ func TestImmediatelySuccessfulProxy(t *testing.T) { waitTimeout: timeouts.WorkloadReplicas, respHeaderTimeout: timeouts.ResponseHeader, }, + &tls.Config{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -74,6 +98,57 @@ func TestImmediatelySuccessfulProxy(t *testing.T) { r.Equal("test response", res.Body.String()) } +func TestImmediatelySuccessfulProxyTLS(t *testing.T) { + host := fmt.Sprintf("%s.testing", t.Name()) + r := require.New(t) + + originHdl := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(200) + _, err := w.Write([]byte("test response")) + r.NoError(err) + }), + ) + srv, originURL, err := kedanet.StartTestServer(originHdl) + r.NoError(err) + defer srv.Close() + originPort, err := strconv.Atoi(originURL.Port()) + r.NoError(err) + + timeouts := defaultTimeouts() + dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) + waitFunc := func(context.Context, string, string) (bool, error) { + return false, nil + } + hdl := newForwardingHandler( + logr.Discard(), + dialCtxFunc, + waitFunc, + forwardingConfig{ + waitTimeout: timeouts.WorkloadReplicas, + respHeaderTimeout: timeouts.ResponseHeader, + }, + &TestTLSConfig, + ) + const path = "/testfwd" + res, req, err := reqAndRes(path) + r.NoError(err) + req = util.RequestWithHTTPSO(req, targetFromURL( + originURL, + originPort, + "testdepl", + "testsvc", + )) + req = util.RequestWithStream(req, originURL) + req.Host = host + + hdl.ServeHTTP(res, req) + + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") + r.Equal(200, res.Code, "expected response code 200") + r.Equal("test response", res.Body.String()) +} + // the proxy should wait for a timeout and fail if there is no // origin to which to connect func TestWaitFailedConnection(t *testing.T) { @@ -98,6 +173,7 @@ func TestWaitFailedConnection(t *testing.T) { waitTimeout: timeouts.WorkloadReplicas, respHeaderTimeout: timeouts.ResponseHeader, }, + &tls.Config{}, ) stream, err := url.Parse("http://0.0.0.0:0") r.NoError(err) @@ -125,6 +201,57 @@ func TestWaitFailedConnection(t *testing.T) { r.Equal(502, res.Code, "response code was unexpected") } +func TestWaitFailedConnectionTLS(t *testing.T) { + const host = "TestWaitFailedConnection.testing" + r := require.New(t) + + timeouts := defaultTimeouts() + backoff := timeouts.DefaultBackoff() + backoff.Steps = 2 + dialCtxFunc := retryDialContextFunc( + timeouts, + backoff, + ) + waitFunc := func(context.Context, string, string) (bool, error) { + return false, nil + } + hdl := newForwardingHandler( + logr.Discard(), + dialCtxFunc, + waitFunc, + forwardingConfig{ + waitTimeout: timeouts.WorkloadReplicas, + respHeaderTimeout: timeouts.ResponseHeader, + }, + &TestTLSConfig, + ) + stream, err := url.Parse("http://0.0.0.0:0") + r.NoError(err) + const path = "/testfwd" + res, req, err := reqAndRes(path) + r.NoError(err) + req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Deployment: "nosuchdepl", + Service: "nosuchdepl", + Port: 8081, + }, + TargetPendingRequests: ptr.To[int32](1234), + }, + }) + req = util.RequestWithStream(req, stream) + req.Host = host + + hdl.ServeHTTP(res, req) + + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") + r.Equal(502, res.Code, "response code was unexpected") +} + // the proxy handler should wait for the wait function until it hits // a timeout, then it should fail func TestTimesOutOnWaitFunc(t *testing.T) { @@ -147,6 +274,7 @@ func TestTimesOutOnWaitFunc(t *testing.T) { waitTimeout: timeouts.WorkloadReplicas, respHeaderTimeout: timeouts.ResponseHeader, }, + &tls.Config{}, ) stream, err := url.Parse("http://1.1.1.1") r.NoError(err) @@ -198,6 +326,79 @@ func TestTimesOutOnWaitFunc(t *testing.T) { r.True(waitFuncCalled, "wait function was not called") } +func TestTimesOutOnWaitFuncTLS(t *testing.T) { + r := require.New(t) + + timeouts := defaultTimeouts() + timeouts.WorkloadReplicas = 25 * time.Millisecond + timeouts.ResponseHeader = 25 * time.Millisecond + dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) + + waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc() + defer finishWaitFunc() + noSuchHost := fmt.Sprintf("%s.testing", t.Name()) + + hdl := newForwardingHandler( + logr.Discard(), + dialCtxFunc, + waitFunc, + forwardingConfig{ + waitTimeout: timeouts.WorkloadReplicas, + respHeaderTimeout: timeouts.ResponseHeader, + }, + &TestTLSConfig, + ) + stream, err := url.Parse("http://1.1.1.1") + r.NoError(err) + const path = "/testfwd" + res, req, err := reqAndRes(path) + r.NoError(err) + req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Deployment: "nosuchdepl", + Service: "nosuchsvc", + Port: 9091, + }, + TargetPendingRequests: ptr.To[int32](1234), + }, + }) + req = util.RequestWithStream(req, stream) + req.Host = noSuchHost + + start := time.Now() + hdl.ServeHTTP(res, req) + elapsed := time.Since(start) + + t.Logf("elapsed time was %s", elapsed) + // serving should take at least timeouts.DeploymentReplicas, but no more than + // timeouts.DeploymentReplicas*4 + r.GreaterOrEqual(elapsed, timeouts.WorkloadReplicas) + r.LessOrEqual(elapsed, timeouts.WorkloadReplicas*4) + r.Equal(502, res.Code, "response code was unexpected") + + // we will always return the X-KEDA-HTTP-Cold-Start header + // when we are able to forward the + // request to the backend but not if we have failed due + // to a timeout from a waitFunc or earlier in the pipeline, + // for example, if we cannot reach the Kubernetes control + // plane. + r.Equal("", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start to be empty") + + // waitFunc should have been called, even though it timed out + waitFuncCalled := false + select { + case <-waitFuncCalledCh: + waitFuncCalled = true + default: + } + + r.True(waitFuncCalled, "wait function was not called") +} + // Test to make sure the proxy handler will wait for the waitFunc to // complete func TestWaitsForWaitFunc(t *testing.T) { @@ -228,6 +429,7 @@ func TestWaitsForWaitFunc(t *testing.T) { waitTimeout: timeouts.WorkloadReplicas, respHeaderTimeout: timeouts.ResponseHeader, }, + &tls.Config{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -265,6 +467,71 @@ func TestWaitsForWaitFunc(t *testing.T) { ) } +func TestWaitsForWaitFuncTLS(t *testing.T) { + r := require.New(t) + + timeouts := defaultTimeouts() + dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) + + waitFunc, waitFuncCalledCh, finishWaitFunc := notifyingFunc() + const ( + noSuchHost = "TestWaitsForWaitFunc.test" + originRespCode = 201 + ) + testSrv, testSrvURL, err := kedanet.StartTestServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(originRespCode) + }), + ) + r.NoError(err) + defer testSrv.Close() + _, originPort, err := splitHostPort(testSrvURL.Host) + r.NoError(err) + hdl := newForwardingHandler( + logr.Discard(), + dialCtxFunc, + waitFunc, + forwardingConfig{ + waitTimeout: timeouts.WorkloadReplicas, + respHeaderTimeout: timeouts.ResponseHeader, + }, + &TestTLSConfig, + ) + const path = "/testfwd" + res, req, err := reqAndRes(path) + r.NoError(err) + req = util.RequestWithHTTPSO(req, targetFromURL( + testSrvURL, + originPort, + "nosuchdepl", + "nosuchsvc", + )) + req = util.RequestWithStream(req, testSrvURL) + req.Host = noSuchHost + + // make the wait function finish after a short duration + const waitDur = 100 * time.Millisecond + go func() { + time.Sleep(waitDur) + finishWaitFunc() + }() + + start := time.Now() + hdl.ServeHTTP(res, req) + elapsed := time.Since(start) + r.NoError(waitForSignal(waitFuncCalledCh, 1*time.Second)) + + // should take at least waitDur, but no more than waitDur*4 + r.GreaterOrEqual(elapsed, waitDur) + r.Less(elapsed, waitDur*4) + + r.Equal( + originRespCode, + res.Code, + "response code was unexpected", + ) +} + // the proxy should connect to a server, and then time out if // the server doesn't respond in time func TestWaitHeaderTimeout(t *testing.T) { @@ -298,6 +565,7 @@ func TestWaitHeaderTimeout(t *testing.T) { waitTimeout: timeouts.WorkloadReplicas, respHeaderTimeout: timeouts.ResponseHeader, }, + &tls.Config{}, ) const path = "/testfwd" res, req, err := reqAndRes(path) @@ -324,6 +592,65 @@ func TestWaitHeaderTimeout(t *testing.T) { close(originHdlCh) } +func TestWaitHeaderTimeoutTLS(t *testing.T) { + r := require.New(t) + + // the origin will wait for this channel to receive or close before it sends any data back to the + // proxy + originHdlCh := make(chan struct{}) + originHdl := kedanet.NewTestHTTPHandlerWrapper( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + <-originHdlCh + w.WriteHeader(200) + _, err := w.Write([]byte("test response")) + r.NoError(err) + }), + ) + srv, originURL, err := kedanet.StartTestServer(originHdl) + r.NoError(err) + defer srv.Close() + + timeouts := defaultTimeouts() + dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) + waitFunc := func(context.Context, string, string) (bool, error) { + return false, nil + } + hdl := newForwardingHandler( + logr.Discard(), + dialCtxFunc, + waitFunc, + forwardingConfig{ + waitTimeout: timeouts.WorkloadReplicas, + respHeaderTimeout: timeouts.ResponseHeader, + }, + &TestTLSConfig, + ) + const path = "/testfwd" + res, req, err := reqAndRes(path) + r.NoError(err) + req = util.RequestWithHTTPSO(req, &httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Deployment: "nosuchdepl", + Service: "testsvc", + Port: 9094, + }, + TargetPendingRequests: ptr.To[int32](1234), + }, + }) + req = util.RequestWithStream(req, originURL) + req.Host = originURL.Host + + hdl.ServeHTTP(res, req) + + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") + r.Equal(502, res.Code, "response code was unexpected") + close(originHdlCh) +} + func waitForSignal(sig <-chan struct{}, waitDur time.Duration) error { tmr := time.NewTimer(waitDur) defer tmr.Stop() diff --git a/pkg/http/server.go b/pkg/http/server.go index 501887ea..062154bd 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -7,7 +7,7 @@ import ( "github.com/kedacore/http-add-on/pkg/util" ) -func ServeContext(ctx context.Context, addr string, hdl http.Handler) error { +func ServeContext(ctx context.Context, addr string, hdl http.Handler, tlsEnabled bool, tlsConfig map[string]string) error { srv := &http.Server{ Handler: hdl, Addr: addr, @@ -22,5 +22,9 @@ func ServeContext(ctx context.Context, addr string, hdl http.Handler) error { } }() + if tlsEnabled { + return srv.ListenAndServeTLS(tlsConfig["certificatePath"], tlsConfig["keyPath"]) + } + return srv.ListenAndServe() } diff --git a/pkg/http/server_test.go b/pkg/http/server_test.go index 2cea3d88..a4b38a3a 100644 --- a/pkg/http/server_test.go +++ b/pkg/http/server_test.go @@ -29,7 +29,35 @@ func TestServeContext(t *testing.T) { done() }() start := time.Now() - err := ServeContext(ctx, addr, hdl) + err := ServeContext(ctx, addr, hdl, false, map[string]string{}) + elapsed := time.Since(start) + + r.Error(err) + r.True(errors.Is(err, http.ErrServerClosed), "error is not a http.ErrServerClosed (%w)", err) + r.Greater(elapsed, cancelDur) + r.Less(elapsed, cancelDur*4) +} + +func TestServeContextWithTLS(t *testing.T) { + r := require.New(t) + ctx, done := context.WithCancel( + context.Background(), + ) + hdl := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("foo", "bar") + _, err := w.Write([]byte("hello world")) + if err != nil { + t.Fatalf("error writing message to client from handler") + } + }) + addr := "localhost:1234" + const cancelDur = 500 * time.Millisecond + go func() { + time.Sleep(cancelDur) + done() + }() + start := time.Now() + err := ServeContext(ctx, addr, hdl, true, map[string]string{"certificatePath": "../../certs/tls.crt", "keyPath": "../../certs/tls.key"}) elapsed := time.Since(start) r.Error(err) diff --git a/tests/checks/interceptor_tls/interceptor_tls_test.go b/tests/checks/interceptor_tls/interceptor_tls_test.go new file mode 100644 index 00000000..8ae88b8f --- /dev/null +++ b/tests/checks/interceptor_tls/interceptor_tls_test.go @@ -0,0 +1,201 @@ +//go:build e2e +// +build e2e + +package interceptor_tls_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + . "github.com/kedacore/http-add-on/tests/helper" +) + +const ( + testName = "interceptor-tls-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + serviceName = fmt.Sprintf("%s-service", testName) + clientName = fmt.Sprintf("%s-client", testName) + httpScaledObjectName = fmt.Sprintf("%s-http-so", testName) + host = testName + minReplicaCount = 0 + maxReplicaCount = 1 +) + +type templateData struct { + TestNamespace string + DeploymentName string + ServiceName string + ClientName string + HTTPScaledObjectName string + Host string + MinReplicas int + MaxReplicas int +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + ports: + - port: 8080 + targetPort: http + protocol: TCP + name: http + - port: 8443 + targetPort: https + protocol: TCP + name: https + selector: + app: {{.DeploymentName}} +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}} + image: registry.k8s.io/e2e-test-images/agnhost:2.45 + args: + - netexec + - --http-port + - "8443" + - --tls-cert-file + - /certs/tls.crt + - --tls-private-key-file + - /certs/tls.key + ports: + - name: http + containerPort: 8080 + protocol: TCP + - name: https + containerPort: 8443 + protocol: TCP + volumeMounts: + - readOnly: true + mountPath: "/certs" + name: certs + readinessProbe: + httpGet: + path: / + port: https + scheme: HTTPS + volumes: + - name: certs + secret: + secretName: test-tls +` + + httpScaledObjectTemplate = ` +kind: HTTPScaledObject +apiVersion: http.keda.sh/v1alpha1 +metadata: + name: {{.HTTPScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + hosts: + - {{.Host}} + targetPendingRequests: 100 + scaledownPeriod: 10 + scaleTargetRef: + deployment: {{.DeploymentName}} + service: {{.ServiceName}} + port: 8443 + replicas: + min: {{ .MinReplicas }} + max: {{ .MaxReplicas }} +` + + clientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.ClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.ClientName}} + image: curlimages/curl + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestInterceptorTLS(t *testing.T) { + // setup + t.Log("--- setting up ---") + + // create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + // setup certs + _, err := ExecuteCommand(fmt.Sprintf("kubectl -n %s create secret tls test-tls --cert ../../../certs/tls.crt --key ../../../certs/tls.key", testNamespace)) + require.NoErrorf(t, err, "could not create tls cert secret in %s namespace - %s", testNamespace, err) + + // wait for test pod to start + assert.True(t, WaitForAllPodRunningInNamespace(t, kc, testNamespace, 10, 2), + "test client count should be available after 20 seconds") + + // send test request and validate response body + sendRequest(t) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func sendRequest(t *testing.T) { + t.Log("--- sending request ---") + + stdout, _, err := ExecCommandOnSpecificPod(t, clientName, testNamespace, fmt.Sprintf("curl -k -H 'Host: %s' https://keda-http-add-on-interceptor-proxy.keda:8443/echo?msg=tls_test", host)) + require.NoErrorf(t, err, "could not run command on test client pod - %s", err) + + assert.Equal(t, "tls_test", stdout, fmt.Sprintf("incorrect response body from test request: expected %s, got %s", "tls_test", stdout)) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ServiceName: serviceName, + ClientName: clientName, + HTTPScaledObjectName: httpScaledObjectName, + Host: host, + MinReplicas: minReplicaCount, + MaxReplicas: maxReplicaCount, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "serviceNameTemplate", Config: serviceTemplate}, + {Name: "clientTemplate", Config: clientTemplate}, + {Name: "httpScaledObjectTemplate", Config: httpScaledObjectTemplate}, + } +} diff --git a/tests/utils/cleanup_test.go b/tests/utils/cleanup_test.go index f379dce8..8a9fd464 100644 --- a/tests/utils/cleanup_test.go +++ b/tests/utils/cleanup_test.go @@ -47,3 +47,10 @@ func TestRemoveOpentelemetryComponents(t *testing.T) { require.NoErrorf(t, err, "cannot uninstall opentelemetry-collector - %s", err) DeleteNamespace(t, OpentelemetryNamespace) } + +func TestCleanUpCerts(t *testing.T) { + out, err := ExecuteCommandWithDir("make clean-test-certs", "../..") + require.NoErrorf(t, err, "error cleaning up test certs - %s", err) + t.Log(string(out)) + t.Log("test certificates successfully cleaned up") +} diff --git a/tests/utils/setup_test.go b/tests/utils/setup_test.go index 63182655..0c28df6d 100644 --- a/tests/utils/setup_test.go +++ b/tests/utils/setup_test.go @@ -172,6 +172,16 @@ func TestSetupKEDA(t *testing.T) { "replica count should be 1 after 3 minutes") } +func TestSetupTLSConfiguration(t *testing.T) { + out, err := ExecuteCommandWithDir("make test-certs", "../..") + require.NoErrorf(t, err, "error generating test certs - %s", err) + t.Log(string(out)) + t.Log("test certificates successfully generated") + + _, err = ExecuteCommand("kubectl -n keda create secret tls keda-tls --cert ../../certs/tls.crt --key ../../certs/tls.key") + require.NoErrorf(t, err, "could not create tls cert secret in keda namespace - %s", err) +} + func TestDeployKEDAHttpAddOn(t *testing.T) { out, err := ExecuteCommandWithDir("make deploy", "../..") require.NoErrorf(t, err, "error deploying KEDA Http Add-on - %s", err) @@ -205,7 +215,6 @@ func TestSetupOpentelemetryComponents(t *testing.T) { CreateNamespace(t, KubeClient, OpentelemetryNamespace) _, err = ExecuteCommand(fmt.Sprintf("helm upgrade --install opentelemetry-collector open-telemetry/opentelemetry-collector -f %s --namespace %s", otlpTempFileName, OpentelemetryNamespace)) - require.NoErrorf(t, err, "cannot install opentelemetry - %s", err) _, err = ExecuteCommand(fmt.Sprintf("kubectl apply -f %s -n %s", otlpServiceTempFileName, OpentelemetryNamespace))