diff --git a/ci/autoscaler/scripts/deploy-autoscaler.sh b/ci/autoscaler/scripts/deploy-autoscaler.sh index 7cdbc0c6c8..d153e31312 100755 --- a/ci/autoscaler/scripts/deploy-autoscaler.sh +++ b/ci/autoscaler/scripts/deploy-autoscaler.sh @@ -134,9 +134,9 @@ function check_ops_files(){ function deploy() { # Try to silence Prometheus but do not fail deployment if there's an error - ${script_dir}/silence_prometheus_alert.sh "BOSHJobEphemeralDiskPredictWillFill" || true - ${script_dir}/silence_prometheus_alert.sh "BOSHJobProcessUnhealthy" || true - ${script_dir}/silence_prometheus_alert.sh "BOSHJobUnhealthy" || true + #${script_dir}/silence_prometheus_alert.sh "BOSHJobEphemeralDiskPredictWillFill" || true + #${script_dir}/silence_prometheus_alert.sh "BOSHJobProcessUnhealthy" || true + #${script_dir}/silence_prometheus_alert.sh "BOSHJobUnhealthy" || true create_manifest diff --git a/flake.lock b/flake.lock index 789724cc76..35448561b1 100644 --- a/flake.lock +++ b/flake.lock @@ -2,11 +2,11 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1725103162, - "narHash": "sha256-Ym04C5+qovuQDYL/rKWSR+WESseQBbNAe5DsXNx5trY=", + "lastModified": 1727348695, + "narHash": "sha256-J+PeFKSDV+pHL7ukkfpVzCOO7mBSrrpJ3svwBFABbhI=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "12228ff1752d7b7624a54e9c1af4b222b3c1073b", + "rev": "1925c603f17fc89f4c8f6bf6f631a802ad85d784", "type": "github" }, "original": { diff --git a/jobs/metricsforwarder/spec b/jobs/metricsforwarder/spec index d8feaf1be3..9413c8dc96 100644 --- a/jobs/metricsforwarder/spec +++ b/jobs/metricsforwarder/spec @@ -24,6 +24,10 @@ templates: policy_db.crt.erb: config/certs/policy_db/crt policy_db.key.erb: config/certs/policy_db/key + binding_db_ca.crt.erb: config/certs/binding_db/ca.crt + binding_db.crt.erb: config/certs/binding_db/crt + binding_db.key.erb: config/certs/binding_db/key + storedprocedure_db_ca.crt.erb: config/certs/storedprocedure_db/ca.crt storedprocedure_db.crt.erb: config/certs/storedprocedure_db/crt storedprocedure_db.key.erb: config/certs/storedprocedure_db/key @@ -126,6 +130,37 @@ properties: autoscaler.policy_db_connection_config.connection_max_lifetime: default: 60s + autoscaler.binding_db.address: + description: "IP address on which the bindingdb server will listen" + default: "autoscalerpostgres.service.cf.internal" + autoscaler.binding_db.databases: + description: "The list of databases used in bindingdb database including name" + autoscaler.binding_db.db_scheme: + description: "Database scheme to be used to access bindingdb" + default: postgres + autoscaler.binding_db.port: + description: "Port on which the bindingdb server will listen" + autoscaler.binding_db.roles: + description: "The list of database roles used in bindingdb database including name/password" + autoscaler.binding_db.tls.ca: + default: '' + description: 'PEM-encoded ca certificate for TLS database server' + autoscaler.binding_db.tls.certificate: + default: '' + description: 'PEM-encoded certificate for TLS database client' + autoscaler.binding_db.tls.private_key: + default: '' + description: 'PEM-encoded key for TLS database client' + autoscaler.binding_db.sslmode: + default: disable + description: "sslmode to connect to postgres server" + autoscaler.binding_db_connection_config.max_open_connections: + default: 20 + autoscaler.binding_db_connection_config.max_idle_connections: + default: 10 + autoscaler.binding_db_connection_config.connection_max_lifetime: + default: 60s + autoscaler.storedprocedure_db.address: description: "IP address on which the storedproceduredb server will listen" default: "" diff --git a/jobs/metricsforwarder/templates/binding_db.crt.erb b/jobs/metricsforwarder/templates/binding_db.crt.erb new file mode 100644 index 0000000000..f848f28d79 --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db.crt.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.certificate") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/binding_db.key.erb b/jobs/metricsforwarder/templates/binding_db.key.erb new file mode 100644 index 0000000000..5aee693cbc --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db.key.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.private_key") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/binding_db_ca.crt.erb b/jobs/metricsforwarder/templates/binding_db_ca.crt.erb new file mode 100644 index 0000000000..5935a893b1 --- /dev/null +++ b/jobs/metricsforwarder/templates/binding_db_ca.crt.erb @@ -0,0 +1,3 @@ +<% if_p("autoscaler.binding_db.tls.ca") do |value| %> +<%= value %> +<% end %> \ No newline at end of file diff --git a/jobs/metricsforwarder/templates/metricsforwarder.yml.erb b/jobs/metricsforwarder/templates/metricsforwarder.yml.erb index e518676955..7bb5df81dc 100644 --- a/jobs/metricsforwarder/templates/metricsforwarder.yml.erb +++ b/jobs/metricsforwarder/templates/metricsforwarder.yml.erb @@ -38,6 +38,7 @@ end ########################################### job_name = 'metricsforwarder' policy_db_url = build_db_url('policy_db', job_name) + binding_db_url = build_db_url('binding_db', job_name) if p("autoscaler.storedprocedure_db.address") != '' storedprocedure_db_url = build_db_url('storedprocedure_db', job_name) end @@ -80,6 +81,11 @@ db: max_open_connections: <%= p("autoscaler.policy_db_connection_config.max_open_connections") %> max_idle_connections: <%= p("autoscaler.policy_db_connection_config.max_idle_connections") %> connection_max_lifetime: <%= p("autoscaler.policy_db_connection_config.connection_max_lifetime") %> + binding_db: + url: <%= binding_db_url %> + max_open_connections: <%= p("autoscaler.binding_db_connection_config.max_open_connections") %> + max_idle_connections: <%= p("autoscaler.binding_db_connection_config.max_idle_connections") %> + connection_max_lifetime: <%= p("autoscaler.binding_db_connection_config.connection_max_lifetime") %> <% if p("autoscaler.storedprocedure_db.address") != '' %> storedprocedure_db: url: <%= storedprocedure_db_url %> diff --git a/new-test-policy.json b/new-test-policy.json new file mode 100644 index 0000000000..c8883b9715 --- /dev/null +++ b/new-test-policy.json @@ -0,0 +1,24 @@ +{ + "configuration": { + "custom_metrics": { + "auth": { + "credential_type": "" + }, + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_min_count": 1, + "instance_max_count": 2, + "scaling_rules": [ + { + "metric_type": "test_metric", + "breach_duration_secs": 60, + "threshold": 500, + "operator": ">=", + "cool_down_secs": 60, + "adjustment": "+1" + } + ] +} diff --git a/scripts/generate_test_certs.sh b/scripts/generate_test_certs.sh index d306ac0d4d..d65a9e3d24 100755 --- a/scripts/generate_test_certs.sh +++ b/scripts/generate_test_certs.sh @@ -102,7 +102,7 @@ if [[ "$OPENSSL_VERSION" == LibreSSL* ]]; then echo "OpenSSL needs to be used rather than LibreSSL" exit 1 fi -# valid certificate +# valid client certificates echo "${depot_path}" openssl req -new -newkey rsa:2048 -nodes -subj "/CN=sap.com/O=SAP SE/OU=organization:AB1234ORG/OU=app:an-app-id/OU=space:AB1234SPACE" -out "${depot_path}"/validmtls_client-1.csr openssl x509 -req -in "${depot_path}"/validmtls_client-1.csr -CA "${depot_path}"/valid-mtls-local-ca-1.crt -CAkey "${depot_path}"/valid-mtls-local-ca-1.key -CAcreateserial -out "${depot_path}"/validmtls_client-1.crt -days 365 -sha256 diff --git a/spec/fixtures/metricsforwarder.yml b/spec/fixtures/metricsforwarder.yml index 434d1bbfcb..8d7528741a 100644 --- a/spec/fixtures/metricsforwarder.yml +++ b/spec/fixtures/metricsforwarder.yml @@ -14,6 +14,21 @@ autoscaler: ca: BEGIN---CA---END certificate: BEGIN---CERT---END private_key: BEGIN---KEY---END + binding_db: + address: 10.11.137.101 + databases: + - name: foo + tag: default + db_scheme: postgres + port: 5432 + roles: + - name: foo + password: default + tag: default + tls: + ca: BEGIN---CA---END + certificate: BEGIN---CERT---END + private_key: BEGIN---KEY---END cf: api: https://api.cf.domain auth_endpoint: https://login.cf.domain diff --git a/spec/jobs/metricsforwarder/metricsforwarder_spec.rb b/spec/jobs/metricsforwarder/metricsforwarder_spec.rb index 140ca46a92..1496fc7249 100644 --- a/spec/jobs/metricsforwarder/metricsforwarder_spec.rb +++ b/spec/jobs/metricsforwarder/metricsforwarder_spec.rb @@ -114,6 +114,20 @@ end end end + context "binding_db" do + it "includes the ca, cert and key in url when configured" do + rendered_template["db"]["binding_db"]["url"].tap do |url| + check_if_certs_in_url(url, "binding_db") + end + end + + it "does not include the ca, cert and key in url when not configured" do + properties["autoscaler"]["binding_db"]["tls"] = nil + rendered_template["db"]["binding_db"]["url"].tap do |url| + check_if_certs_not_in_url(url, "binding_db") + end + end + end end end end diff --git a/src/acceptance/app/app_suite_test.go b/src/acceptance/app/app_suite_test.go index 1f506b778d..f519dca65d 100644 --- a/src/acceptance/app/app_suite_test.go +++ b/src/acceptance/app/app_suite_test.go @@ -29,8 +29,12 @@ var ( instanceName string initialInstanceCount int - appName string - appGUID string + appToScaleName string + appToScaleGUID string + + neighbourAppName string + + neighbourAppGUID string ) const componentName = "Application Scale Suite" @@ -59,10 +63,10 @@ func AppAfterEach() { if os.Getenv("SKIP_TEARDOWN") == "true" { fmt.Println("Skipping Teardown...") } else { - DebugInfo(cfg, setup, appName) - if appName != "" { - DeleteService(cfg, instanceName, appName) - DeleteTestApp(appName, cfg.DefaultTimeoutDuration()) + DebugInfo(cfg, setup, appToScaleName) + if appToScaleName != "" { + DeleteService(cfg, instanceName, appToScaleName) + DeleteTestApp(appToScaleName, cfg.DefaultTimeoutDuration()) } } } diff --git a/src/acceptance/app/cf_metadata_test.go b/src/acceptance/app/cf_metadata_test.go index 4683f454df..96d4246f5e 100644 --- a/src/acceptance/app/cf_metadata_test.go +++ b/src/acceptance/app/cf_metadata_test.go @@ -15,19 +15,19 @@ var _ = Describe("AutoScaler CF metadata support", func() { ) BeforeEach(func() { policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "labeled-go_app", 1) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "labeled-go_app", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) AfterEach(AppAfterEach) When("the label app-autoscaler.cloudfoundry.org/disable-autoscaling is set", func() { It("should not scale out", func() { By("Set the label app-autoscaler.cloudfoundry.org/disable-autoscaling to true") - SetLabel(cfg, appGUID, "app-autoscaler.cloudfoundry.org/disable-autoscaling", "true") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, true) + SetLabel(cfg, appToScaleGUID, "app-autoscaler.cloudfoundry.org/disable-autoscaling", "true") + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, true) Consistently(scaleOut). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). diff --git a/src/acceptance/app/custom_metric_test.go b/src/acceptance/app/custom_metric_test.go index dd098a7dcb..b53d14d090 100644 --- a/src/acceptance/app/custom_metric_test.go +++ b/src/acceptance/app/custom_metric_test.go @@ -4,6 +4,7 @@ import ( "acceptance" "acceptance/config" . "acceptance/helpers" + "fmt" "time" . "github.com/onsi/ginkgo/v2" @@ -17,11 +18,11 @@ var _ = Describe("AutoScaler custom metrics policy", func() { ) BeforeEach(func() { policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "node-custom-metric", 1) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "node-custom-metric", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) AfterEach(AppAfterEach) @@ -30,14 +31,14 @@ var _ = Describe("AutoScaler custom metrics policy", func() { Context("when scaling by custom metrics", func() { It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { By("Scale out to 2 instances") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, false) + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, false) Eventually(scaleOut). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). Should(Equal(2)) By("Scale in to 1 instances") - scaleIn := sendMetricToAutoscaler(cfg, appGUID, appName, 100, false) + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 100, false) Eventually(scaleIn). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). @@ -49,14 +50,14 @@ var _ = Describe("AutoScaler custom metrics policy", func() { Context("when scaling by custom metrics via mtls", func() { It("should scale out and scale in", Label(acceptance.LabelSmokeTests), func() { By("Scale out to 2 instances") - scaleOut := sendMetricToAutoscaler(cfg, appGUID, appName, 550, true) + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 550, true) Eventually(scaleOut). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). Should(Equal(2)) By("Scale in to 1 instance") - scaleIn := sendMetricToAutoscaler(cfg, appGUID, appName, 100, true) + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 100, true) Eventually(scaleIn). WithTimeout(5 * time.Minute). WithPolling(15 * time.Second). @@ -64,15 +65,81 @@ var _ = Describe("AutoScaler custom metrics policy", func() { }) }) + Describe("Custom metrics policy with neighbour app", func() { + JustBeforeEach(func() { + neighbourAppName = CreateTestApp(cfg, "go-neighbour-app", 1) + neighbourAppGUID, err = GetAppGuid(cfg, neighbourAppName) + Expect(err).NotTo(HaveOccurred()) + err := BindServiceToAppWithPolicy(cfg, neighbourAppName, instanceName, policy) + Expect(err).NotTo(HaveOccurred()) + StartApp(neighbourAppName, cfg.CfPushTimeoutDuration()) + }) + Context("neighbour app send custom metrics for app B via mtls", func() { + BeforeEach(func() { + policy = GenerateBindingsWithScalingPolicy("bound_app", 1, 2, "test_metric", 500, 100) + }) + It("should scale out and scale in app B", Label(acceptance.LabelSmokeTests), func() { + By(fmt.Sprintf("Scale out %s to 2 instance", appToScaleName)) + scaleOut := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 550, true) + Eventually(scaleOut). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(2)) + + By(fmt.Sprintf("Scale in %s to 1 instance", appToScaleName)) + scaleIn := sendMetricToAutoscaler(cfg, appToScaleGUID, neighbourAppName, 100, true) + Eventually(scaleIn). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(1)) + + }) + }) + Context("neighbour app send metrics if metrics strategy is not set i.e same_app", func() { + BeforeEach(func() { + policy = GenerateBindingsWithScalingPolicy("", 1, 2, "test_metric", 100, 550) + }) + When("policy is attached with neighbour app", func() { + It("should scale out and scale the neighbour app", func() { + By(fmt.Sprintf("Scale out %s to 2 instance", neighbourAppName)) + scaleOut := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 550, true) + Eventually(scaleOut). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(2)) + + By(fmt.Sprintf("Scale in %s to 1 instance", neighbourAppName)) + scaleIn := sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 90, true) + Eventually(scaleIn). + WithTimeout(5 * time.Minute). + WithPolling(15 * time.Second). + Should(Equal(1)) + + }) + }) + When("no policy is attached with neighbour app", func() { + BeforeEach(func() { + policy = "" + }) + It("should not scale neighbour app", func() { + sendMetricToAutoscaler(cfg, neighbourAppGUID, neighbourAppName, 550, true) + Expect(RunningInstances(neighbourAppGUID, 5*time.Second)).To(Equal(1)) + + }) + }) + + }) + }) }) -func sendMetricToAutoscaler(config *config.Config, appGUID string, appName string, metricThreshold int, mtls bool) func() (int, error) { +func sendMetricToAutoscaler(config *config.Config, appToScaleGUID string, neighbourAppName string, metricThreshold int, mtls bool) func() (int, error) { return func() (int, error) { + if mtls { - SendMetricMTLS(config, appName, metricThreshold) + SendMetricMTLS(config, appToScaleGUID, neighbourAppName, metricThreshold) } else { - SendMetric(config, appName, metricThreshold) + SendMetric(config, neighbourAppName, metricThreshold) } - return RunningInstances(appGUID, 5*time.Second) + return RunningInstances(appToScaleGUID, 5*time.Second) } } diff --git a/src/acceptance/app/dynamic_policy_test.go b/src/acceptance/app/dynamic_policy_test.go index a161c22e5c..79cfbd92e2 100644 --- a/src/acceptance/app/dynamic_policy_test.go +++ b/src/acceptance/app/dynamic_policy_test.go @@ -26,12 +26,12 @@ var _ = Describe("AutoScaler dynamic policy", func() { const minimalMemoryUsage = 17 // observed minimal memory usage by the test app JustBeforeEach(func() { - appName = CreateTestApp(cfg, "dynamic-policy", initialInstanceCount) + appToScaleName = CreateTestApp(cfg, "dynamic-policy", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - StartApp(appName, cfg.CfPushTimeoutDuration()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) }) BeforeEach(func() { maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage @@ -52,16 +52,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale out and then back in.", func() { By(fmt.Sprintf("Use heap %d MB of heap on app", int64(heapToUse))) - CurlAppInstance(cfg, appName, 0, fmt.Sprintf("/memory/%d/5", int64(heapToUse))) + CurlAppInstance(cfg, appToScaleName, 0, fmt.Sprintf("/memory/%d/5", int64(heapToUse))) By("wait for scale to 2") - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("Drop memory used by app") - CurlAppInstance(cfg, appName, 0, "/memory/close") + CurlAppInstance(cfg, appToScaleName, 0, "/memory/close") By("Wait for scale to minimum instances") - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -78,16 +78,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale out and back in", func() { heapToUse := min(maxHeapLimitMb, int(float64(cfg.NodeMemoryLimit)*0.80)) By(fmt.Sprintf("use 80%% or %d MB of memory in app", heapToUse)) - CurlAppInstance(cfg, appName, 0, fmt.Sprintf("/memory/%d/5", heapToUse)) + CurlAppInstance(cfg, appToScaleName, 0, fmt.Sprintf("/memory/%d/5", heapToUse)) By("Wait for scale to 2 instances") - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("drop memory used") - CurlAppInstance(cfg, appName, 0, "/memory/close") + CurlAppInstance(cfg, appToScaleName, 0, "/memory/close") By("Wait for scale down to 1 instance") - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -112,7 +112,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/slow/100", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/slow/100", cfg) ticker = time.NewTicker(1 * time.Second) rps := 5 go func(chan bool) { @@ -131,7 +131,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out", Label(acceptance.LabelSmokeTests), func() { - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) }) }) @@ -143,7 +143,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/slow/100", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/slow/100", cfg) ticker = time.NewTicker(1 * time.Second) rps := 5 go func(chan bool) { @@ -162,7 +162,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale in", func() { - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -187,7 +187,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/fast", cfg) ticker = time.NewTicker(1 * time.Second) rps := 20 go func(chan bool) { @@ -206,7 +206,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out", func() { - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) }) }) @@ -218,7 +218,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) JustBeforeEach(func() { - appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) + appUri := cfh.AppUri(appToScaleName, "/responsetime/fast", cfg) ticker = time.NewTicker(1 * time.Second) rps := 20 go func(chan bool) { @@ -239,7 +239,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale in", func() { // because we are generating 20rps but starting with 2 instances, // there should be on average 10rps per instance, which should trigger the scale in - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) @@ -254,14 +254,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("when cpu is greater than scaling out threshold", func() { By("should scale out to 2 instances") - StartCPUUsage(cfg, appName, int(float64(cfg.CPUUpperThreshold)*0.9), 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartCPUUsage(cfg, appToScaleName, int(float64(cfg.CPUUpperThreshold)*0.9), 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) By("should scale in to 1 instance after cpu usage is reduced") //only hit the one instance that was asked to run hot. - StopCPUUsage(cfg, appName, 0) + StopCPUUsage(cfg, appToScaleName, 0) - WaitForNInstancesRunning(appGUID, 1, 10*time.Minute) + WaitForNInstancesRunning(appToScaleGUID, 1, 10*time.Minute) }) }) @@ -285,16 +285,16 @@ var _ = Describe("AutoScaler dynamic policy", func() { // - app memory = 1GB // - app CPU entitlement = 4096[total shares] / (32[GB host ram] * 1024) * (1[app memory in GB] * 1024) * 0,1953 ~= 25% - ScaleMemory(cfg, appName, cfg.CPUUtilScalingPolicyTest.AppMemory) + ScaleMemory(cfg, appToScaleName, cfg.CPUUtilScalingPolicyTest.AppMemory) // cpuutil will be 100% if cpu usage is reaching the value of cpu entitlement maxCPUUsage := cfg.CPUUtilScalingPolicyTest.AppCPUEntitlement - StartCPUUsage(cfg, appName, maxCPUUsage, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartCPUUsage(cfg, appToScaleName, maxCPUUsage, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to run hot - StopCPUUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopCPUUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -305,14 +305,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out and in", func() { - ScaleDisk(cfg, appName, "1GB") + ScaleDisk(cfg, appToScaleName, "1GB") - StartDiskUsage(cfg, appName, 800, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartDiskUsage(cfg, appToScaleName, 800, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to occupy disk space - StopDiskUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopDiskUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) @@ -323,14 +323,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) It("should scale out and in", func() { - ScaleDisk(cfg, appName, "1GB") + ScaleDisk(cfg, appToScaleName, "1GB") - StartDiskUsage(cfg, appName, 800, 5) - WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) + StartDiskUsage(cfg, appToScaleName, 800, 5) + WaitForNInstancesRunning(appToScaleGUID, 2, 5*time.Minute) // only hit the one instance that was asked to occupy disk space - StopDiskUsage(cfg, appName, 0) - WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) + StopDiskUsage(cfg, appToScaleName, 0) + WaitForNInstancesRunning(appToScaleGUID, 1, 5*time.Minute) }) }) }) diff --git a/src/acceptance/app/lead_times_test.go b/src/acceptance/app/lead_times_test.go index 876a15e34b..f833ee348c 100644 --- a/src/acceptance/app/lead_times_test.go +++ b/src/acceptance/app/lead_times_test.go @@ -15,11 +15,11 @@ var _ = Describe("Autoscaler lead times for scaling", func() { ) BeforeEach(func() { policy = GenerateDynamicScaleOutAndInPolicy(1, 2, "test_metric", 500, 500) - appName = CreateTestApp(cfg, "labeled-go_app", 1) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "labeled-go_app", 1) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) AfterEach(AppAfterEach) @@ -29,8 +29,8 @@ var _ = Describe("Autoscaler lead times for scaling", func() { coolDown := TestCoolDownSeconds * time.Second scalingTimewindow := 130 * time.Second // be friendly and allow some time for "internal autoscaler processes" (metric polling interval etc.) to take place before actual scaling happens - sendMetricForScaleOutAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appGUID, appName, 510, false) - sendMetricForScaleInAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appGUID, appName, 490, false) + sendMetricForScaleOutAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 510, false) + sendMetricForScaleInAndReturnNumInstancesFunc := sendMetricToAutoscaler(cfg, appToScaleGUID, appToScaleName, 490, false) By("checking that no scaling out happens before breach_duration_secs have passed") Consistently(sendMetricForScaleOutAndReturnNumInstancesFunc). diff --git a/src/acceptance/app/recurring_schedule_policy_test.go b/src/acceptance/app/recurring_schedule_policy_test.go index 5888e6b265..8082131d12 100644 --- a/src/acceptance/app/recurring_schedule_policy_test.go +++ b/src/acceptance/app/recurring_schedule_policy_test.go @@ -22,8 +22,8 @@ var _ = Describe("AutoScaler recurring schedule policy", func() { BeforeEach(func() { instanceName = CreateService(cfg) initialInstanceCount = 1 - appName = CreateTestApp(cfg, "recurring-schedule", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "recurring-schedule", initialInstanceCount) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) }) AfterEach(AppAfterEach) @@ -36,21 +36,21 @@ var _ = Describe("AutoScaler recurring schedule policy", func() { JustBeforeEach(func() { startTime, endTime = getStartAndEndTime(time.UTC, 70*time.Second, time.Duration(interval+120)*time.Second) policy = GenerateDynamicAndRecurringSchedulePolicy(instanceMinCount, 4, 50, "UTC", startTime, endTime, daysOfMonthOrWeek, scheduleInstanceMinCount, 5, scheduleInitialMinInstanceCount) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) - StartApp(appName, cfg.CfPushTimeoutDuration()) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) }) scaleDown := func() { By("setting to initial_min_instance_count") jobRunTime := time.Until(startTime.Add(5 * time.Minute)) - WaitForNInstancesRunning(appGUID, scheduleInitialMinInstanceCount, jobRunTime, "The schedule should initially trigger scaling to initial_min_instance_count %i", scheduleInitialMinInstanceCount) + WaitForNInstancesRunning(appToScaleGUID, scheduleInitialMinInstanceCount, jobRunTime, "The schedule should initially trigger scaling to initial_min_instance_count %i", scheduleInitialMinInstanceCount) By("setting schedule's instance_min_count") jobRunTime = time.Until(endTime) - WaitForNInstancesRunning(appGUID, scheduleInstanceMinCount, jobRunTime, "The schedule should allow scaling down to instance_min_count %i", scheduleInstanceMinCount) + WaitForNInstancesRunning(appToScaleGUID, scheduleInstanceMinCount, jobRunTime, "The schedule should allow scaling down to instance_min_count %i", scheduleInstanceMinCount) By("setting to default instance_min_count") - WaitForNInstancesRunning(appGUID, instanceMinCount, time.Until(endTime.Add(time.Duration(interval+60)*time.Second)), "After the schedule ended scaling down to instance_min_count %i should be possible", instanceMinCount) + WaitForNInstancesRunning(appToScaleGUID, instanceMinCount, time.Until(endTime.Add(time.Duration(interval+60)*time.Second)), "After the schedule ended scaling down to instance_min_count %i should be possible", instanceMinCount) } Context("with days of month", func() { diff --git a/src/acceptance/app/specificdate_schedule_policy_test.go b/src/acceptance/app/specificdate_schedule_policy_test.go index 54af63779f..90ee10e7c9 100644 --- a/src/acceptance/app/specificdate_schedule_policy_test.go +++ b/src/acceptance/app/specificdate_schedule_policy_test.go @@ -21,8 +21,8 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { BeforeEach(func() { instanceName = CreateService(cfg) initialInstanceCount = 1 - appName = CreateTestApp(cfg, "date-schedule", initialInstanceCount) - appGUID, err = GetAppGuid(cfg, appName) + appToScaleName = CreateTestApp(cfg, "date-schedule", initialInstanceCount) + appToScaleGUID, err = GetAppGuid(cfg, appToScaleName) Expect(err).NotTo(HaveOccurred()) }) @@ -34,22 +34,22 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { const scheduledInstanceInit = 3 JustBeforeEach(func() { //TODO the start app needs to be after the binding but the timings require the app been up already. - StartApp(appName, cfg.CfPushTimeoutDuration()) + StartApp(appToScaleName, cfg.CfPushTimeoutDuration()) startDateTime = time.Now().In(time.UTC).Add(1 * time.Minute) endDateTime = startDateTime.Add(time.Duration(interval+120) * time.Second) policy = GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime, scheduleInstanceMin, scheduleInstanceMax, scheduledInstanceInit) - instanceName = CreatePolicy(cfg, appName, appGUID, policy) + instanceName = CreatePolicy(cfg, appToScaleName, appToScaleGUID, policy) }) It("should scale", func() { pollTime := 15 * time.Second By(fmt.Sprintf("waiting for scheduledInstanceInit: %d", scheduledInstanceInit)) - WaitForNInstancesRunning(appGUID, 3, time.Until(startDateTime.Add(1*time.Minute))) + WaitForNInstancesRunning(appToScaleGUID, 3, time.Until(startDateTime.Add(1*time.Minute))) By(fmt.Sprintf("waiting for scheduleInstanceMin: %d", scheduleInstanceMin)) jobRunTime := time.Until(endDateTime) - Eventually(func() (int, error) { return RunningInstances(appGUID, cfg.DefaultTimeoutDuration()) }). + Eventually(func() (int, error) { return RunningInstances(appToScaleGUID, cfg.DefaultTimeoutDuration()) }). //+/- poll time error margin. WithTimeout(jobRunTime + pollTime). WithPolling(pollTime). @@ -58,12 +58,12 @@ var _ = Describe("AutoScaler specific date schedule policy", func() { //+/- poll time error margin. timeout := time.Until(endDateTime) - pollTime By(fmt.Sprintf("waiting till end of schedule %dS and should stay %d instances", int(timeout.Seconds()), scheduleInstanceMin)) - Consistently(func() (int, error) { return RunningInstances(appGUID, jobRunTime) }). + Consistently(func() (int, error) { return RunningInstances(appToScaleGUID, jobRunTime) }). WithTimeout(timeout). WithPolling(pollTime). Should(Equal(2)) - WaitForNInstancesRunning(appGUID, 1, time.Duration(interval+60)*time.Second) + WaitForNInstancesRunning(appToScaleGUID, 1, time.Duration(interval+60)*time.Second) }) }) }) diff --git a/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go b/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go index 26d919ab8c..7e0d37bdf4 100644 --- a/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go +++ b/src/acceptance/assets/app/go_app/internal/app/custom_metrics.go @@ -19,7 +19,7 @@ import ( //counterfeiter:generate . CustomMetricClient type CustomMetricClient interface { - PostCustomMetric(ctx context.Context, appConfig *cfenv.App, metricsValue float64, metricName string, useMtls bool) error + PostCustomMetric(ctx context.Context, logger logr.Logger, appConfig *cfenv.App, metricsValue float64, metricName string, useMtls bool) error } type CustomMetricAPIClient struct{} @@ -52,8 +52,17 @@ func handleCustomMetricsEndpoint(logger logr.Logger, customMetricTest CustomMetr Error(c, http.StatusBadRequest, "invalid metric value: %s", err.Error()) return } - - err = customMetricTest.PostCustomMetric(c, nil, float64(metricValue), metricName, useMtls) + // required if neighbour app is sending metric for appToScaleGuid + appConfig := &cfenv.App{} + appToScaleGuid := c.Query("appToScaleGuid") + if appToScaleGuid != "" { + logger.Info("neighbour-app-relationship-found", "appToScaleGuid", appToScaleGuid) + appConfig.AppID = appToScaleGuid + //assuming the neighbour app has the same autoscaler service as the appToScale + currentApp, _ := cfenv.Current() + appConfig.Services = currentApp.Services + } + err = customMetricTest.PostCustomMetric(c, logger, appConfig, float64(metricValue), metricName, useMtls) if err != nil { logger.Error(err, "failed to submit custom metric") Error(c, http.StatusInternalServerError, "failed to submit custom metric: %s", err.Error()) @@ -63,9 +72,10 @@ func handleCustomMetricsEndpoint(logger logr.Logger, customMetricTest CustomMetr } } -func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, appConfig *cfenv.App, metricValue float64, metricName string, useMtls bool) error { +func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, logger logr.Logger, appConfig *cfenv.App, metricValue float64, metricName string, useMtls bool) error { var err error - if appConfig == nil { + logger.Info("sending custom metric", "appConfig", appConfig) + if appConfig == nil || appConfig.AppID == "" { appConfig, err = cfenv.Current() if err != nil { return fmt.Errorf("cloud foundry environment not found %w", err) @@ -103,7 +113,7 @@ func (*CustomMetricAPIClient) PostCustomMetric(ctx context.Context, appConfig *c } metrics := createSingletonMetric(metricName, metricValue) - + logger.Info("sending metric to autoscaler for app", "appId", appId, "metricName", metricName, "metricValue", metricValue) params := api.V1AppsAppGuidMetricsPostParams{AppGuid: appId} return apiClient.V1AppsAppGuidMetricsPost(ctx, metrics, params) diff --git a/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go b/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go index 83e0ca6087..f460155457 100644 --- a/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go +++ b/src/acceptance/assets/app/go_app/internal/app/custom_metrics_test.go @@ -2,6 +2,7 @@ package app_test import ( "context" + "github.com/go-logr/logr" "net/http" "code.cloudfoundry.org/app-autoscaler-release/src/acceptance/assets/app/go_app/internal/app" @@ -42,7 +43,7 @@ var _ = Describe("custom metrics tests", func() { Body(`{"mtls":false}`). End() Expect(fakeCustomMetricClient.PostCustomMetricCallCount()).To(Equal(1)) - _, _, sentValue, sentMetric, mtlsUsed := fakeCustomMetricClient.PostCustomMetricArgsForCall(0) + _, _, _, sentValue, sentMetric, mtlsUsed := fakeCustomMetricClient.PostCustomMetricArgsForCall(0) Expect(sentMetric).Should(Equal("test")) Expect(sentValue).Should(Equal(4.0)) Expect(mtlsUsed).Should(Equal(false)) @@ -90,7 +91,7 @@ var _ = Describe("custom metrics tests", func() { } client := &app.CustomMetricAPIClient{} - err := client.PostCustomMetric(context.TODO(), &appEnv, 42, "test", false) + err := client.PostCustomMetric(context.TODO(), logr.Logger{}, &appEnv, 42, "test", false) Expect(err).ToNot(HaveOccurred()) Expect(len(fakeServer.ReceivedRequests())).To(Equal(1)) diff --git a/src/acceptance/helpers/apps.go b/src/acceptance/helpers/apps.go index 44390f2882..806430a1c5 100644 --- a/src/acceptance/helpers/apps.go +++ b/src/acceptance/helpers/apps.go @@ -82,9 +82,9 @@ func SendMetric(cfg *config.Config, appName string, metric int) { cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/test_metric/%d", metric), "-f") } -func SendMetricMTLS(cfg *config.Config, appName string, metric int) { +func SendMetricMTLS(cfg *config.Config, appGuid string, appName string, metric int) { GinkgoHelper() - cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/mtls/test_metric/%d", metric), "-f") + cfh.CurlApp(cfg, appName, fmt.Sprintf("/custom-metrics/mtls/test_metric/%d?appToScaleGuid=%s", metric, appGuid), "-f") } func StartAppWithErr(appName string, timeout time.Duration) error { diff --git a/src/acceptance/helpers/helpers.go b/src/acceptance/helpers/helpers.go index 5af7ec3c68..f493342a85 100644 --- a/src/acceptance/helpers/helpers.go +++ b/src/acceptance/helpers/helpers.go @@ -34,6 +34,27 @@ const ( type Days string +type BindingConfig struct { + Configuration Configuration `json:"configuration"` + ScalingPolicy +} + +type Configuration struct { + CustomMetrics CustomMetricsConfig `json:"custom_metrics"` +} + +type CustomMetricsConfig struct { + Auth Auth `json:"auth,omitempty"` + MetricSubmissionStrategy MetricsSubmissionStrategy `json:"metric_submission_strategy"` +} + +type Auth struct { + CredentialType string `json:"credential_type"` +} +type MetricsSubmissionStrategy struct { + AllowFrom string `json:"allow_from"` +} + type ScalingPolicy struct { InstanceMin int `json:"instance_min_count"` InstanceMax int `json:"instance_max_count"` @@ -162,7 +183,28 @@ func ServicePlansUrl(cfg *config.Config, spaceGuid string) string { return url.String() } +func GenerateBindingsWithScalingPolicy(allowFrom string, instanceMin, instanceMax int, metricName string, scaleInThreshold, scaleOutThreshold int64) string { + bindingConfig := BindingConfig{ + Configuration: Configuration{CustomMetrics: CustomMetricsConfig{ + MetricSubmissionStrategy: MetricsSubmissionStrategy{AllowFrom: allowFrom}, + }}, + ScalingPolicy: buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInThreshold, scaleOutThreshold), + } + marshalledBinding, err := MarshalWithoutHTMLEscape(bindingConfig) + Expect(err).NotTo(HaveOccurred()) + return string(marshalledBinding) +} + func GenerateDynamicScaleOutPolicy(instanceMin, instanceMax int, metricName string, threshold int64) string { + + policy := buildScalingPolicy(instanceMin, instanceMax, metricName, threshold) + marshaled, err := MarshalWithoutHTMLEscape(policy) + Expect(err).NotTo(HaveOccurred()) + + return string(marshaled) +} + +func buildScalingPolicy(instanceMin int, instanceMax int, metricName string, threshold int64) ScalingPolicy { scalingOutRule := ScalingRule{ MetricType: metricName, BreachDurationSeconds: TestBreachDurationSeconds, @@ -171,16 +213,12 @@ func GenerateDynamicScaleOutPolicy(instanceMin, instanceMax int, metricName stri CoolDownSeconds: TestCoolDownSeconds, Adjustment: "+1", } - policy := ScalingPolicy{ InstanceMin: instanceMin, InstanceMax: instanceMax, ScalingRules: []*ScalingRule{&scalingOutRule}, } - marshaled, err := MarshalWithoutHTMLEscape(policy) - Expect(err).NotTo(HaveOccurred()) - - return string(marshaled) + return policy } func GenerateDynamicScaleOutPolicyWithExtraFields(instanceMin, instanceMax int, metricName string, threshold int64) (string, string) { @@ -246,6 +284,14 @@ func GenerateDynamicScaleInPolicy(instanceMin, instanceMax int, metricName strin } func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) string { + policy := buildScaleOutScaleInPolicy(instanceMin, instanceMax, metricName, scaleInWhenBelowThreshold, scaleOutWhenGreaterOrEqualThreshold) + marshaled, err := MarshalWithoutHTMLEscape(policy) + Expect(err).NotTo(HaveOccurred()) + + return string(marshaled) +} + +func buildScaleOutScaleInPolicy(instanceMin int, instanceMax int, metricName string, scaleInWhenBelowThreshold int64, scaleOutWhenGreaterOrEqualThreshold int64) ScalingPolicy { scalingOutRule := ScalingRule{ MetricType: metricName, BreachDurationSeconds: TestBreachDurationSeconds, @@ -267,11 +313,7 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName InstanceMax: instanceMax, ScalingRules: []*ScalingRule{&scalingOutRule, &scalingInRule}, } - - marshaled, err := MarshalWithoutHTMLEscape(policy) - Expect(err).NotTo(HaveOccurred()) - - return string(marshaled) + return policy } // GenerateDynamicScaleInPolicyBetween creates a scaling policy that scales down from 2 instances to 1, if the metric value is in a range of [upper, lower]. diff --git a/src/autoscaler/api/broker/broker.go b/src/autoscaler/api/broker/broker.go index 05d15baada..4183c6a88e 100644 --- a/src/autoscaler/api/broker/broker.go +++ b/src/autoscaler/api/broker/broker.go @@ -39,15 +39,17 @@ type Broker struct { } var ( - emptyJSONObject = regexp.MustCompile(`^\s*{\s*}\s*$`) - ErrCreatingServiceBinding = errors.New("error creating service binding") - ErrUpdatingServiceInstance = errors.New("error updating service instance") - ErrDeleteSchedulesForUnbinding = errors.New("failed to delete schedules for unbinding") - ErrBindingDoesNotExist = errors.New("service binding does not exist") - ErrDeletePolicyForUnbinding = errors.New("failed to delete policy for unbinding") - ErrDeleteServiceBinding = errors.New("error deleting service binding") - ErrCredentialNotDeleted = errors.New("failed to delete custom metrics credential for unbinding") - ErrInvalidCredentialType = errors.New("invalid credential type provided: allowed values are [binding-secret, x509]") + emptyJSONObject = regexp.MustCompile(`^\s*{\s*}\s*$`) + ErrCreatingServiceBinding = errors.New("error creating service binding") + ErrUpdatingServiceInstance = errors.New("error updating service instance") + ErrDeleteSchedulesForUnbinding = errors.New("failed to delete schedules for unbinding") + ErrBindingDoesNotExist = errors.New("service binding does not exist") + ErrDeletePolicyForUnbinding = errors.New("failed to delete policy for unbinding") + ErrDeleteServiceBinding = errors.New("error deleting service binding") + ErrCredentialNotDeleted = errors.New("failed to delete custom metrics credential for unbinding") + ErrInvalidCredentialType = errors.New("invalid credential type provided: allowed values are [binding-secret, x509]") + ErrInvalidConfigurations = errors.New("invalid binding configurations provided") + ErrInvalidCustomMetricsStrategy = errors.New("error: custom metrics strategy not supported") ) type Errors []error @@ -497,6 +499,24 @@ func (b *Broker) Bind(ctx context.Context, instanceID string, bindingID string, policyJson = details.RawParameters } + // extract custom metrics configs to determine metric submission strategy and set to default if not provided + + bindingConfiguration := &models.BindingConfig{} + if policyJson != nil { + err := json.Unmarshal(policyJson, &bindingConfiguration) + if err != nil { + actionReadBindingConfiguration := "read-binding-configurations" + logger.Error("unmarshal-binding-configuration", err) + return result, apiresponses.NewFailureResponseBuilder( + ErrInvalidConfigurations, http.StatusBadRequest, actionReadBindingConfiguration). + WithErrorKey(actionReadBindingConfiguration). + Build() + } + } + logger.Info("binding-configuration", lager.Data{"bindingConfiguration": bindingConfiguration}) + if bindingConfiguration.GetCustomMetricsStrategy() == "" { + bindingConfiguration.SetDefaultCustomMetricsStrategy("same_app") + } policy, err := b.getPolicyFromJsonRawMessage(policyJson, instanceID, details.PlanID) if err != nil { logger.Error("get-default-policy", err) @@ -529,15 +549,18 @@ func (b *Broker) Bind(ctx context.Context, instanceID string, bindingID string, if err := b.handleExistingBindingsResiliently(ctx, instanceID, appGUID, logger); err != nil { return result, err } + // save custom metrics strategy check - bindingConfiguration.CustomMetricsConfig.MetricSubmissionStrategy ! == "" + err = createServiceBinding(ctx, b.bindingdb, bindingID, instanceID, appGUID, bindingConfiguration.GetCustomMetricsStrategy()) - // create binding in DB - err = b.bindingdb.CreateServiceBinding(ctx, bindingID, instanceID, appGUID) if err != nil { actionCreateServiceBinding := "create-service-binding" logger.Error(actionCreateServiceBinding, err) if errors.Is(err, db.ErrAlreadyExists) { return result, apiresponses.NewFailureResponse(errors.New("error: an autoscaler service instance is already bound to the application and multiple bindings are not supported"), http.StatusConflict, actionCreateServiceBinding) } + if errors.Is(err, ErrInvalidCustomMetricsStrategy) { + return result, apiresponses.NewFailureResponse(err, http.StatusBadRequest, actionCreateServiceBinding) + } return result, apiresponses.NewFailureResponse(ErrCreatingServiceBinding, http.StatusInternalServerError, actionCreateServiceBinding) } customMetricsCredentials := &models.CustomMetricsCredentials{ @@ -844,3 +867,12 @@ func (b *Broker) deleteBinding(ctx context.Context, bindingId string, serviceIns func isValidCredentialType(credentialType string) bool { return credentialType == models.BindingSecret || credentialType == models.X509Certificate } + +func createServiceBinding(ctx context.Context, bindingDB db.BindingDB, bindingID, instanceID, appGUID string, customMetricsStrategy string) error { + //TODO call bindingDB.CreateServiceBindingWithConfigs method. No need to call CreateServiceBinding method + // Caution: CHECK the below code may break the existing functionality ?? + if customMetricsStrategy == "bound_app" || customMetricsStrategy == "same_app" { + return bindingDB.CreateServiceBinding(ctx, bindingID, instanceID, appGUID, customMetricsStrategy) + } + return ErrInvalidCustomMetricsStrategy +} diff --git a/src/autoscaler/api/brokerserver/broker_handler_test.go b/src/autoscaler/api/brokerserver/broker_handler_test.go index a92889867d..6aa56509fc 100644 --- a/src/autoscaler/api/brokerserver/broker_handler_test.go +++ b/src/autoscaler/api/brokerserver/broker_handler_test.go @@ -928,7 +928,84 @@ var _ = Describe("BrokerHandler", func() { Expect(creds.Credentials.CustomMetrics.MtlsUrl).To(Equal("Mtls-someURL")) }) }) - // test for credential-type + Context("Binding configurations are present", func() { + BeforeEach(func() { + bindingPolicy = `{ + "configuration": { + "custom_metrics": { + "auth": { + "credential_type": "binding_secret" + }, + "metric_submission_strategy": { + "allow_from": "bound_app" + } + } + }, + "instance_max_count":4, + "instance_min_count":1, + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [{ + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }] + }, + "scaling_rules":[ + { + "metric_type":"memoryused", + "threshold":30, + "operator":"<", + "adjustment":"-1" + }] + }` + bindingRequestBody.Policy = json.RawMessage(bindingPolicy) + body, err = json.Marshal(bindingRequestBody) + Expect(err).NotTo(HaveOccurred()) + bindingPolicy = `{ + "instance_max_count":4, + "instance_min_count":1, + "schedules": { + "timezone": "Asia/Shanghai", + "recurring_schedule": [{ + "start_time": "10:00", + "end_time": "18:00", + "days_of_week": [ + 1, + 2, + 3 + ], + "instance_min_count": 1, + "instance_max_count": 10, + "initial_min_instance_count": 5 + }] + }, + "scaling_rules":[ + { + "metric_type":"memoryused", + "threshold":30, + "operator":"<", + "adjustment":"-1" + }] + }` + verifyScheduleIsUpdatedInScheduler(testAppId, bindingPolicy) + }) + It("succeeds with 201", func() { + Expect(resp.Code).To(Equal(http.StatusCreated)) + + By("updating the scheduler") + Expect(schedulerServer.ReceivedRequests()).To(HaveLen(1)) + Expect(bindingdb.CreateServiceBindingCallCount()).To(Equal(1)) + }) + }) + Context("credential-type is provided while binding", func() { BeforeEach(func() { schedulerExpectedJSON = `{ @@ -1188,7 +1265,6 @@ var _ = Describe("BrokerHandler", func() { }) }) - // Context("When a default policy was provided when creating the service instance", func() { BeforeEach(func() { bindingdb.GetServiceInstanceReturns(&models.ServiceInstance{testInstanceId, testOrgId, testSpaceId, testDefaultPolicy, testDefaultGuid}, nil) diff --git a/src/autoscaler/api/db/servicebroker.db.changelog.yaml b/src/autoscaler/api/db/servicebroker.db.changelog.yaml index 0a7a80c466..d6c6978c8e 100644 --- a/src/autoscaler/api/db/servicebroker.db.changelog.yaml +++ b/src/autoscaler/api/db/servicebroker.db.changelog.yaml @@ -75,8 +75,42 @@ databaseChangeLog: type: timestamp constraints: nullable: false + - column: + name: custom_metrics_strategy + type: varchar(40) - changeSet: id: 3 + author: Arsalan + logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.json + preConditions: + - onFail: MARK_RAN + not: + - tableExists: + tableName: metrics_submission + changes: + - createTable: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + type: varchar(40) + constraints: + primaryKey: true + - insert: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + value: 'bound_app' + - insert: + tableName: metrics_submission + columns: + - column: + name: custom_metrics_strategy + value: 'same_app' + + - changeSet: + id: 4 author: qy logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.json preConditions: @@ -84,6 +118,8 @@ databaseChangeLog: not: - foreignKeyConstraintExists: foreignKeyName: fk_binding_service_instance_id + - foreignKeyConstraintExists: + foreignKeyName: fk_binding_custom_metrics_strategy changes: - addForeignKeyConstraint: baseColumnNames: service_instance_id @@ -95,8 +131,19 @@ databaseChangeLog: onUpdate: RESTRICT referencedColumnNames: service_instance_id referencedTableName: service_instance + + - addForeignKeyConstraint: + baseColumnNames: custom_metrics_strategy + baseTableName: binding + constraintName: fk_binding_custom_metrics_strategy + deferrable: false + initiallyDeferred: false + onDelete: RESTRICT + onUpdate: RESTRICT + referencedColumnNames: custom_metrics_strategy + referencedTableName: metrics_submission - changeSet: - id: 4 + id: 5 author: silvestre logicalFilePath: /var/vcap/packages/golangapiserver/servicebroker.db.changelog.yaml preConditions: diff --git a/src/autoscaler/db/db.go b/src/autoscaler/db/db.go index 0b78eac1dd..86564fc91f 100644 --- a/src/autoscaler/db/db.go +++ b/src/autoscaler/db/db.go @@ -69,7 +69,7 @@ type BindingDB interface { GetServiceInstanceByAppId(appId string) (*models.ServiceInstance, error) UpdateServiceInstance(ctx context.Context, serviceInstance models.ServiceInstance) error DeleteServiceInstance(ctx context.Context, serviceInstanceId string) error - CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error + CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string, customMetricsStrategy string) error DeleteServiceBinding(ctx context.Context, bindingId string) error DeleteServiceBindingByAppId(ctx context.Context, appId string) error CheckServiceBinding(appId string) bool @@ -78,6 +78,10 @@ type BindingDB interface { CountServiceInstancesInOrg(orgId string) (int, error) GetServiceBinding(ctx context.Context, serviceBindingId string) (*models.ServiceBinding, error) GetBindingIdsByInstanceId(ctx context.Context, instanceId string) ([]string, error) + GetAppBindingByAppId(ctx context.Context, appId string) (string, error) + IsAppBoundToSameAutoscaler(ctx context.Context, appId string, appToScaleId string) (bool, error) + CreateServiceBindingWithConfigs(ctx context.Context, bindingId string, serviceInstanceId string, appId string, strategy string) error + GetCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) } type AppMetricDB interface { diff --git a/src/autoscaler/db/sqldb/binding_sqldb.go b/src/autoscaler/db/sqldb/binding_sqldb.go index 759ace0979..5f8bb1c1b4 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb.go +++ b/src/autoscaler/db/sqldb/binding_sqldb.go @@ -131,19 +131,27 @@ func (bdb *BindingSQLDB) GetServiceInstance(ctx context.Context, serviceInstance } func (bdb *BindingSQLDB) GetServiceInstanceByAppId(appId string) (*models.ServiceInstance, error) { + serviceInstanceId, err := bdb.GetServiceInstanceIdByAppId(appId) + if err != nil { + bdb.logger.Error("get-service-instance-for-app-id", err, lager.Data{"appId": appId}) + return nil, err + } + return bdb.GetServiceInstance(context.Background(), serviceInstanceId) +} + +func (bdb *BindingSQLDB) GetServiceInstanceIdByAppId(appId string) (string, error) { query := bdb.sqldb.Rebind("SELECT service_instance_id FROM binding WHERE app_id = ?") serviceInstanceId := "" err := bdb.sqldb.Get(&serviceInstanceId, query, appId) if err != nil { - bdb.logger.Error("get-service-binding-for-app-id", err, lager.Data{"query": query, "appId": appId}) + bdb.logger.Error("get-service-instance-for-app-id", err, lager.Data{"query": query, "appId": appId}) if errors.Is(err, sql.ErrNoRows) { - return nil, db.ErrDoesNotExist + return serviceInstanceId, db.ErrDoesNotExist } - return nil, err + return serviceInstanceId, err } - - return bdb.GetServiceInstance(context.Background(), serviceInstanceId) + return serviceInstanceId, nil } func (bdb *BindingSQLDB) UpdateServiceInstance(ctx context.Context, serviceInstance models.ServiceInstance) error { @@ -192,11 +200,29 @@ func (bdb *BindingSQLDB) DeleteServiceInstance(ctx context.Context, serviceInsta return db.ErrDoesNotExist } -func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error { +func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId string, serviceInstanceId string, appId string, customMetricsStrategy string) error { + + err := bdb.isBindingExists(ctx, bindingId, serviceInstanceId, appId) + if err != nil { + return err + } + + query := bdb.sqldb.Rebind("INSERT INTO binding" + + "(binding_id, service_instance_id, app_id, created_at) " + + "VALUES(?, ?, ?, ?)") + _, err = bdb.sqldb.ExecContext(ctx, query, bindingId, serviceInstanceId, appId, time.Now()) + + if err != nil { + bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId, "bindingid": bindingId, "appid": appId}) + } + return err +} + +func (bdb *BindingSQLDB) isBindingExists(ctx context.Context, bindingId string, serviceInstanceId string, appId string) error { query := bdb.sqldb.Rebind("SELECT * FROM binding WHERE app_id =?") rows, err := bdb.sqldb.QueryContext(ctx, query, appId) if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) + bdb.logger.Error("is-binding-already-exists", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) return err } @@ -208,19 +234,10 @@ func (bdb *BindingSQLDB) CreateServiceBinding(ctx context.Context, bindingId str err = rows.Err() if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) + bdb.logger.Error("is-binding-already-exists", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) return err } - - query = bdb.sqldb.Rebind("INSERT INTO binding" + - "(binding_id, service_instance_id, app_id, created_at) " + - "VALUES(?, ?, ?, ?)") - _, err = bdb.sqldb.ExecContext(ctx, query, bindingId, serviceInstanceId, appId, time.Now()) - - if err != nil { - bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId, "bindingid": bindingId, "appid": appId}) - } - return err + return nil } func (bdb *BindingSQLDB) GetServiceBinding(ctx context.Context, serviceBindingId string) (*models.ServiceBinding, error) { @@ -280,6 +297,18 @@ func (bdb *BindingSQLDB) DeleteServiceBindingByAppId(ctx context.Context, appId } return nil } + +func (bdb *BindingSQLDB) GetAppBindingByAppId(ctx context.Context, appId string) (string, error) { + var bindingId string + query := bdb.sqldb.Rebind("SELECT binding_id FROM binding WHERE app_id =?") + err := bdb.sqldb.QueryRowContext(ctx, query, appId).Scan(&bindingId) + + if err != nil { + bdb.logger.Error("get-service-binding-by-appid", err, lager.Data{"query": query, "appId": appId}) + return "", err + } + return bindingId, nil +} func (bdb *BindingSQLDB) CheckServiceBinding(appId string) bool { var count int query := bdb.sqldb.Rebind("SELECT COUNT(*) FROM binding WHERE app_id=?") @@ -359,3 +388,80 @@ func (bdb *BindingSQLDB) GetBindingIdsByInstanceId(ctx context.Context, instance return bindingIds, rows.Err() } + +func (bdb *BindingSQLDB) IsAppBoundToSameAutoscaler(ctx context.Context, metricSubmitterAppId string, appToScaleId string) (bool, error) { + + serviceInstanceId, err := bdb.GetServiceInstanceIdByAppId(metricSubmitterAppId) + if err != nil { + bdb.logger.Error("get-service-instance-by-appId", err, lager.Data{"appId": metricSubmitterAppId}) + return false, err + } + if serviceInstanceId == "" { + bdb.logger.Error("no-service-instance-found-by-appId", err, lager.Data{"appId": metricSubmitterAppId, "serviceInstanceId": serviceInstanceId}) + return false, nil + } + // find all apps which are bound to the same service instance + appIds, err := bdb.GetAppIdsByInstanceId(ctx, serviceInstanceId) + if err != nil { + bdb.logger.Error("get-apps-by-service-instance-id", err, lager.Data{"serviceInstanceId": serviceInstanceId}) + return false, err + } + + if len(appIds) == 0 { + bdb.logger.Error("no-apps-bounded-with-serviceInstance", err, lager.Data{"serviceInstanceId": serviceInstanceId}) + return false, nil + } + // check if the app to scale is in the list of apps bound to the same service instance and return true .otherwise return false + for _, app := range appIds { + if app == appToScaleId { + return true, nil + } + } + return false, nil +} + +func (bdb *BindingSQLDB) CreateServiceBindingWithConfigs(ctx context.Context, bindingId string, serviceInstanceId string, appId string, strategy string) error { + err := bdb.isBindingExists(ctx, bindingId, serviceInstanceId, appId) + if err != nil { + return err + } + query := bdb.sqldb.Rebind("INSERT INTO binding" + + "(binding_id, service_instance_id, app_id, created_at, custom_metrics_strategy) " + + "VALUES(?, ?, ?, ?,?)") + _, err = bdb.sqldb.ExecContext(ctx, query, bindingId, serviceInstanceId, appId, time.Now(), strategy) + + if err != nil { + bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId, "bindingid": bindingId, "appid": appId, "strategy": strategy}) + } + return err +} + +func (bdb *BindingSQLDB) GetCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) { + customMetricsStrategy, err := bdb.fetchCustomMetricStrategyByAppId(ctx, appId) + if err != nil { + return "", err + } + return customMetricsStrategy, nil +} + +func (bdb *BindingSQLDB) fetchCustomMetricStrategyByAppId(ctx context.Context, appId string) (string, error) { + var customMetricsStrategy sql.NullString + query := bdb.sqldb.Rebind("SELECT custom_metrics_strategy FROM binding WHERE app_id =?") + rows, err := bdb.sqldb.QueryContext(ctx, query, appId) + + if err != nil { + bdb.logger.Error("get-custom-metrics-strategy-by-appid", err, lager.Data{"query": query, "appId": appId}) + return "", err + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + if err = rows.Scan(&customMetricsStrategy); err != nil { + bdb.logger.Error("scan-customMetricsStrategy-from-binding-table", err) + return "", err + } + return customMetricsStrategy.String, nil + + } + return customMetricsStrategy.String, rows.Err() +} diff --git a/src/autoscaler/db/sqldb/binding_sqldb_test.go b/src/autoscaler/db/sqldb/binding_sqldb_test.go index 26f6472b1a..0e78cce908 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb_test.go +++ b/src/autoscaler/db/sqldb/binding_sqldb_test.go @@ -343,7 +343,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") }) It("should return what was created", func() { expectServiceInstancesToEqual(retrievedServiceInstance, &models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) @@ -353,7 +353,7 @@ var _ = Describe("BindingSqldb", func() { Describe("CreateServiceBinding", func() { JustBeforeEach(func() { - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") }) Context("When service instance doesn't exist", func() { It("should error", func() { @@ -376,7 +376,7 @@ var _ = Describe("BindingSqldb", func() { }) Context("When service binding already exists", func() { It("should error", func() { - err := bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err := bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).To(HaveOccurred()) Expect(err).To(Equal(db.ErrAlreadyExists)) }) @@ -402,7 +402,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) }) It("should return what was created", func() { @@ -440,7 +440,7 @@ var _ = Describe("BindingSqldb", func() { }) Context("When service binding is present", func() { BeforeEach(func() { - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -456,7 +456,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) err = bdb.DeleteServiceBindingByAppId(context.Background(), testAppId) }) @@ -475,7 +475,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) }) It("should return true", func() { @@ -501,7 +501,7 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -526,15 +526,15 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "") Expect(err).NotTo(HaveOccurred()) // other unrelated service instance with bindings err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId3, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3) + err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3, "") Expect(err).NotTo(HaveOccurred()) }) It("should succeed", func() { @@ -599,17 +599,17 @@ var _ = Describe("BindingSqldb", func() { err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{testInstanceId, testOrgGuid, testSpaceGuid, policyJsonStr, policyGuid}) Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("CreateServiceInstance, failed: testInstanceId %s procId %d", testInstanceId, GinkgoParallelProcess())) - err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "") Expect(err).NotTo(HaveOccurred()) // other unrelated service instance with bindings err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{testInstanceId3, testOrgGuid, testSpaceGuid, policyJsonStr, policyGuid}) Expect(err).NotTo(HaveOccurred()) - err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3) + err = bdb.CreateServiceBinding(context.Background(), testBindingId3, testInstanceId3, testAppId3, "") Expect(err).NotTo(HaveOccurred()) }) @@ -625,6 +625,99 @@ var _ = Describe("BindingSqldb", func() { }) }) }) + + Describe("isAppBoundToSameAutoscaler", func() { + var isTestApp1Bounded bool + JustBeforeEach(func() { + isTestApp1Bounded, _ = bdb.IsAppBoundToSameAutoscaler(context.Background(), testAppId, testAppId2) + Expect(err).NotTo(HaveOccurred()) + }) + When("apps are bounded to same autoscaler instance", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId, testAppId2, "") + Expect(err).NotTo(HaveOccurred()) + }) + It("should return true", func() { + Expect(isTestApp1Bounded).To(BeTrue()) + }) + }) + Context("when neighbouring app is bounded to different autoscaler instance", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId2, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId, testInstanceId, testAppId, "") + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBinding(context.Background(), testBindingId2, testInstanceId2, testAppId2, "") + Expect(err).NotTo(HaveOccurred()) + }) + It("should return false", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(isTestApp1Bounded).To(BeFalse()) + }) + }) + + }) + + Describe("CreateServiceBindingWithConfigs", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + }) + When("configuration bounded_app is provided", func() { + JustBeforeEach(func() { + err = bdb.CreateServiceBindingWithConfigs(context.Background(), testBindingId, testInstanceId, testAppId, "bound_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should save the binding in the database", func() { + Expect(hasServiceBindingWithCustomMetricStrategy(testBindingId, testInstanceId)).To(BeTrue()) + + }) + }) + When("default configuration is provided", func() { + JustBeforeEach(func() { + err = bdb.CreateServiceBindingWithConfigs(context.Background(), testBindingId, testInstanceId, testAppId, "same_app") + Expect(err).NotTo(HaveOccurred()) + }) + It("should save the binding in the database", func() { + Expect(hasServiceBindingWithCustomMetricStrategy(testBindingId, testInstanceId)).To(BeFalse()) + //TODO check if the default was set + + }) + }) + When("configuration is not provided", func() { + JustBeforeEach(func() { + err = bdb.CreateServiceBindingWithConfigs(context.Background(), testBindingId, testInstanceId, testAppId, "") + + }) + It("should throw an error with foreign key violation", func() { + Expect(err).To(HaveOccurred()) + }) + }) + + }) + + Describe("GetCustomMetricStrategyByAppId", func() { + BeforeEach(func() { + err = bdb.CreateServiceInstance(context.Background(), models.ServiceInstance{ServiceInstanceId: testInstanceId, OrgId: testOrgGuid, SpaceId: testSpaceGuid, DefaultPolicy: policyJsonStr, DefaultPolicyGuid: policyGuid}) + Expect(err).NotTo(HaveOccurred()) + err = bdb.CreateServiceBindingWithConfigs(context.Background(), testBindingId, testInstanceId, testAppId, "bound_app") + Expect(err).NotTo(HaveOccurred()) + + }) + Context("When service instance and binding exists", func() { + It("should get the custom metrics strategy from the database", func() { + customMetricStrategy, _ := bdb.GetCustomMetricStrategyByAppId(context.Background(), testAppId) + Expect(customMetricStrategy).To(Equal("bound_app")) + }) + }) + + }) }) func addProcessIdTo(id string) string { diff --git a/src/autoscaler/db/sqldb/factories.go b/src/autoscaler/db/sqldb/factories.go index 759d5da2dc..00d195a30c 100644 --- a/src/autoscaler/db/sqldb/factories.go +++ b/src/autoscaler/db/sqldb/factories.go @@ -15,3 +15,12 @@ func CreatePolicyDb(dbConf db.DatabaseConfig, logger lager.Logger) *PolicySQLDB } return policyDB } + +func CreateBindingDB(dbConf db.DatabaseConfig, logger lager.Logger) *BindingSQLDB { + bindingDB, err := NewBindingSQLDB(dbConf, logger.Session("binding-db")) + if err != nil { + logger.Fatal("Failed To connect to bindingDB", err, lager.Data{"dbConfig": dbConf}) + os.Exit(1) + } + return bindingDB +} diff --git a/src/autoscaler/db/sqldb/sqldb_suite_test.go b/src/autoscaler/db/sqldb/sqldb_suite_test.go index 8a5c43f258..d9a90b004f 100644 --- a/src/autoscaler/db/sqldb/sqldb_suite_test.go +++ b/src/autoscaler/db/sqldb/sqldb_suite_test.go @@ -117,6 +117,16 @@ func hasServiceBinding(bindingId string, serviceInstanceId string) bool { return item } +func hasServiceBindingWithCustomMetricStrategy(bindingId string, serviceInstanceId string) bool { + query := dbHelper.Rebind("SELECT * FROM binding WHERE binding_id = ? AND service_instance_id = ? AND custom_metrics_strategy = 'bound_app'") + rows, e := dbHelper.Query(query, bindingId, serviceInstanceId) + FailOnError("can not query table binding", e) + defer func() { _ = rows.Close() }() + item := rows.Next() + FailOnError("can not query table binding", rows.Err()) + return item +} + func cleanPolicyTable() { _, e := dbHelper.Exec("DELETE from policy_json") if e != nil { diff --git a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go index 4b92530cd7..0bea46f93d 100644 --- a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go +++ b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go @@ -59,13 +59,16 @@ func main() { policyDb := sqldb.CreatePolicyDb(conf.Db[db.PolicyDb], logger) defer func() { _ = policyDb.Close() }() + bindingDB := sqldb.CreateBindingDB(conf.Db[db.BindingDb], logger) + defer func() { _ = bindingDB.Close() }() + credentialProvider := cred_helper.CredentialsProvider(conf.CredHelperImpl, conf.StoredProcedureConfig, conf.Db, conf.CacheTTL, conf.CacheCleanupInterval, logger, policyDb) defer func() { _ = credentialProvider.Close() }() httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "metricsforwarder") allowedMetricCache := cache.New(conf.CacheTTL, conf.CacheCleanupInterval) - customMetricsServer := createCustomMetricsServer(conf, logger, policyDb, credentialProvider, allowedMetricCache, httpStatusCollector) + customMetricsServer := createCustomMetricsServer(conf, logger, policyDb, bindingDB, credentialProvider, allowedMetricCache, httpStatusCollector) cacheUpdater := cacheUpdater(logger, mfClock, conf, policyDb, allowedMetricCache) members := grouper.Members{ @@ -97,9 +100,9 @@ func cacheUpdater(logger lager.Logger, mfClock clock.Clock, conf *config.Config, return cacheUpdater } -func createCustomMetricsServer(conf *config.Config, logger lager.Logger, policyDB *sqldb.PolicySQLDB, credentialProvider cred_helper.Credentials, allowedMetricCache *cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector) ifrit.Runner { +func createCustomMetricsServer(conf *config.Config, logger lager.Logger, policyDB *sqldb.PolicySQLDB, bindingDB *sqldb.BindingSQLDB, credentialProvider cred_helper.Credentials, allowedMetricCache *cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector) ifrit.Runner { rateLimiter := ratelimiter.DefaultRateLimiter(conf.RateLimit.MaxAmount, conf.RateLimit.ValidDuration, logger.Session("metricforwarder-ratelimiter")) - httpServer, err := server.NewServer(logger.Session("custom_metrics_server"), conf, policyDB, credentialProvider, *allowedMetricCache, httpStatusCollector, rateLimiter) + httpServer, err := server.NewServer(logger.Session("custom_metrics_server"), conf, policyDB, bindingDB, credentialProvider, *allowedMetricCache, httpStatusCollector, rateLimiter) if err != nil { logger.Fatal("Failed to create client to custom metrics server", err) os.Exit(1) diff --git a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go index fb67b8ce82..3b385b8174 100644 --- a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go +++ b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go @@ -63,53 +63,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { if err != nil { AbortSuite(fmt.Sprintf("DBURL not found: %s", err.Error())) } - - policyDB, err := sqlx.Open(database.DriverName, database.DataSourceName) - Expect(err).NotTo(HaveOccurred()) - - _, err = policyDB.Exec("DELETE from policy_json") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) - } - _, err = policyDB.Exec("DELETE from credentials") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) - } - - policy := ` - { - "instance_min_count": 1, - "instance_max_count": 5, - "scaling_rules":[ - { - "metric_type":"custom", - "breach_duration_secs":600, - "threshold":30, - "operator":"<", - "cool_down_secs":300, - "adjustment":"-1" - } - ] - }` - query := policyDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") - _, err = policyDB.Exec(query, "an-app-id", policy, "1234") - if err != nil { - AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) - } - - encryptedUsername, _ := bcrypt.GenerateFromPassword([]byte(username), 8) - encryptedPassword, _ := bcrypt.GenerateFromPassword([]byte(password), 8) - - query = policyDB.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") - _, err = policyDB.Exec(query, "an-app-id", encryptedUsername, encryptedPassword, "2011-06-18 15:36:38") - if err != nil { - AbortSuite(fmt.Sprintf("Failed to add credentials: %s", err.Error())) - } - - err = policyDB.Close() - if err != nil { - AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) - } + preparePolicyDb(database) + prepareBindingDb(database) return []byte(mf) }, func(pathsByte []byte) { @@ -153,6 +108,12 @@ var _ = SynchronizedBeforeSuite(func() []byte { MaxIdleConnections: 5, ConnectionMaxLifetime: 10 * time.Second, } + cfg.Db[db.BindingDb] = db.DatabaseConfig{ + URL: dbUrl, + MaxOpenConnections: 10, + MaxIdleConnections: 5, + ConnectionMaxLifetime: 10 * time.Second, + } cfg.CredHelperImpl = "default" @@ -162,6 +123,69 @@ var _ = SynchronizedBeforeSuite(func() []byte { healthHttpClient = &http.Client{} }) +func preparePolicyDb(database *db.Database) { + policyDB, err := sqlx.Open(database.DriverName, database.DataSourceName) + Expect(err).NotTo(HaveOccurred()) + + _, err = policyDB.Exec("DELETE from policy_json") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) + } + _, err = policyDB.Exec("DELETE from credentials") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) + } + + policy := ` + { + "instance_min_count": 1, + "instance_max_count": 5, + "scaling_rules":[ + { + "metric_type":"custom", + "breach_duration_secs":600, + "threshold":30, + "operator":"<", + "cool_down_secs":300, + "adjustment":"-1" + } + ] + }` + query := policyDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") + _, err = policyDB.Exec(query, "an-app-id", policy, "1234") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean credentials %s", err.Error())) + } + + encryptedUsername, _ := bcrypt.GenerateFromPassword([]byte(username), 8) + encryptedPassword, _ := bcrypt.GenerateFromPassword([]byte(password), 8) + + query = policyDB.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") + _, err = policyDB.Exec(query, "an-app-id", encryptedUsername, encryptedPassword, "2011-06-18 15:36:38") + if err != nil { + AbortSuite(fmt.Sprintf("Failed to add credentials: %s", err.Error())) + } + + err = policyDB.Close() + if err != nil { + AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) + } +} + +func prepareBindingDb(database *db.Database) { + bindingDB, err := sqlx.Open(database.DriverName, database.DataSourceName) + Expect(err).NotTo(HaveOccurred()) + + _, err = bindingDB.Exec("DELETE from binding") + if err != nil { + AbortSuite(fmt.Sprintf("Failed clean policy_json %s", err.Error())) + } + err = bindingDB.Close() + if err != nil { + AbortSuite(fmt.Sprintf("Failed to close connection: %s", err.Error())) + } +} + var _ = SynchronizedAfterSuite(func() { grpcIngressTestServer.Stop() os.Remove(configFile.Name()) diff --git a/src/autoscaler/metricsforwarder/config/config.go b/src/autoscaler/metricsforwarder/config/config.go index 743ff79b61..5118de1636 100644 --- a/src/autoscaler/metricsforwarder/config/config.go +++ b/src/autoscaler/metricsforwarder/config/config.go @@ -182,6 +182,9 @@ func (c *Config) Validate() error { if c.Db[db.PolicyDb].URL == "" { return fmt.Errorf("Configuration error: Policy DB url is empty") } + if c.Db[db.BindingDb].URL == "" { + return fmt.Errorf("Configuration error: Binding DB url is empty") + } if c.UsingSyslog() { if c.SyslogConfig.TLS.CACertFile == "" { return fmt.Errorf("Configuration error: SyslogServer Loggregator CACert is empty") diff --git a/src/autoscaler/metricsforwarder/config/config_test.go b/src/autoscaler/metricsforwarder/config/config_test.go index 01c00332a9..b6f0addf0d 100644 --- a/src/autoscaler/metricsforwarder/config/config_test.go +++ b/src/autoscaler/metricsforwarder/config/config_test.go @@ -287,6 +287,12 @@ health: MaxIdleConnections: 5, ConnectionMaxLifetime: 60 * time.Second, } + conf.Db[db.BindingDb] = db.DatabaseConfig{ + URL: "postgres://pqgotest:password@localhost/pqgotest", + MaxOpenConnections: 10, + MaxIdleConnections: 5, + ConnectionMaxLifetime: 60 * time.Second, + } conf.RateLimit.MaxAmount = 10 conf.RateLimit.ValidDuration = 1 * time.Second @@ -362,6 +368,16 @@ health: }) }) + When("binding db url is not set", func() { + BeforeEach(func() { + conf.Db[db.BindingDb] = db.DatabaseConfig{URL: ""} + }) + + It("should error", func() { + Expect(err).To(MatchError(MatchRegexp("Configuration error: Binding DB url is empty"))) + }) + }) + When("Loggregator CACert is not set", func() { BeforeEach(func() { conf.LoggregatorConfig.TLS.CACertFile = "" diff --git a/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go b/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go index e6c49bbbc5..7a706f6f56 100644 --- a/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go +++ b/src/autoscaler/metricsforwarder/server/auth/auth_suite_test.go @@ -26,6 +26,7 @@ var ( serverProcess ifrit.Process serverUrl string policyDB *fakes.FakePolicyDB + fakeBindingDB *fakes.FakeBindingDB rateLimiter *fakes.FakeLimiter fakeCredentials *fakes.FakeCredentials @@ -75,13 +76,14 @@ var _ = SynchronizedBeforeSuite(func() []byte { LoggregatorConfig: loggregatorConfig, } policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} credentialCache = *cache.New(10*time.Minute, -1) allowedMetricCache = *cache.New(10*time.Minute, -1) httpStatusCollector := &fakes.FakeHTTPStatusCollector{} rateLimiter = &fakes.FakeLimiter{} fakeCredentials = &fakes.FakeCredentials{} - httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDB, + httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDB, fakeBindingDB, fakeCredentials, allowedMetricCache, httpStatusCollector, rateLimiter) Expect(err).NotTo(HaveOccurred()) serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port) diff --git a/src/autoscaler/metricsforwarder/server/auth/auth_test.go b/src/autoscaler/metricsforwarder/server/auth/auth_test.go index 3f39950be7..43b93b8d5d 100644 --- a/src/autoscaler/metricsforwarder/server/auth/auth_test.go +++ b/src/autoscaler/metricsforwarder/server/auth/auth_test.go @@ -24,22 +24,26 @@ var _ = Describe("Authentication", func() { var ( authTest *auth.Auth fakeCredentials *fakes.FakeCredentials + fakeBindingDB *fakes.FakeBindingDB resp *httptest.ResponseRecorder req *http.Request body []byte vars map[string]string + testAppId string ) BeforeEach(func() { fakeCredentials = &fakes.FakeCredentials{} + fakeBindingDB = &fakes.FakeBindingDB{} vars = make(map[string]string) + testAppId = "an-app-id" resp = httptest.NewRecorder() }) JustBeforeEach(func() { logger := lager.NewLogger("auth-test") var err error - authTest, err = auth.New(logger, fakeCredentials) + authTest, err = auth.New(logger, fakeCredentials, fakeBindingDB) Expect(err).ToNot(HaveOccurred()) }) @@ -47,7 +51,7 @@ var _ = Describe("Authentication", func() { Context("a request to publish custom metrics comes", func() { Context("credentials are valid", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -65,7 +69,7 @@ var _ = Describe("Authentication", func() { Context("credentials are valid but db error occurs", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -83,7 +87,7 @@ var _ = Describe("Authentication", func() { Context("credentials are invalid", func() { It("should validate the credentials", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("Authorization", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=") vars["appid"] = "an-app-id" nextCalled := 0 @@ -107,7 +111,7 @@ var _ = Describe("Authentication", func() { const validClientCert1 = "../../../../../test-certs/validmtls_client-1.crt" Context("correct xfcc header with correct CA is supplied for cert 1", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert1)) vars["appid"] = "an-app-id" nextCalled := 0 @@ -125,7 +129,7 @@ var _ = Describe("Authentication", func() { Context("correct xfcc header with correct CA is supplied for cert 2", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) const validClientCert2 = "../../../../../test-certs/validmtls_client-2.crt" req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) vars["appid"] = "an-app-id" @@ -144,7 +148,7 @@ var _ = Describe("Authentication", func() { Context("correct xfcc header including \"'s around the cert", func() { It("should call next handler", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", fmt.Sprintf("%q", MustReadXFCCcert(validClientCert1))) vars["appid"] = "an-app-id" nextCalled := 0 @@ -162,7 +166,7 @@ var _ = Describe("Authentication", func() { Context("valid cert with wrong app-id is supplied", func() { It("should return status code 403", func() { - req = CreateRequest(body) + req = CreateRequest(body, testAppId) req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert1)) nextCalled := 0 nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -178,6 +182,53 @@ var _ = Describe("Authentication", func() { Expect(nextCalled).To(Equal(0)) }) }) + + Context("Request from neighbour (different) app arrives for app B", func() { + const validClientCert2 = "../../../../../test-certs/validmtls_client-2.crt" + When("custom-metrics-submission-strategy is not set in the scaling policy", func() { + It("It should not call next handler and return with status code 403", func() { + testAppId = "app-to-scale-id" + req = CreateRequest(body, testAppId) + vars["appid"] = testAppId + req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("same_app", nil) + nextCalled := 0 + nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = nextCalled + 1 + }) + + authTest.AuthenticateHandler(nextFunc)(resp, req, vars) + + Expect(policyDB.GetCredentialCallCount()).To(Equal(0)) + Expect(resp.Code).To(Equal(http.StatusForbidden)) + Expect(resp.Body.String()).To(Equal(`{"code":"Forbidden","message":"Unauthorized"}`)) + Expect(nextCalled).To(Equal(0)) + }) + }) + Context("custom-metrics-submission-strategy is set to bound_app in the scaling policy", func() { + It("It should call next handler and return with status code 200", func() { + req = CreateRequest(body, testAppId) + testAppId = "app-to-scale-id" + vars["appid"] = testAppId + req.Header.Add("X-Forwarded-Client-Cert", MustReadXFCCcert(validClientCert2)) + // ToDO: this should be read via configurations aka scaling policy binding parameters + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("bound_app", nil) + nextCalled := 0 + nextFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = nextCalled + 1 + }) + fakeBindingDB.IsAppBoundToSameAutoscalerReturns(true, nil) + + authTest.AuthenticateHandler(nextFunc)(resp, req, vars) + + Expect(fakeBindingDB.IsAppBoundToSameAutoscalerCallCount()).To(Equal(1)) + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(resp.Body.String()).To(BeEmpty()) + Expect(nextCalled).To(Equal(1)) + }) + }) + + }) }) }) @@ -190,8 +241,8 @@ func MustReadXFCCcert(fileName string) string { return base64.StdEncoding.EncodeToString(block.Bytes) } -func CreateRequest(body []byte) *http.Request { - req, err := http.NewRequest(http.MethodPost, serverUrl+"/v1/apps/an-app-id/metrics", bytes.NewReader(body)) +func CreateRequest(body []byte, appId string) *http.Request { + req, err := http.NewRequest(http.MethodPost, serverUrl+"/v1/apps/"+appId+"/metrics", bytes.NewReader(body)) Expect(err).ToNot(HaveOccurred()) req.Header.Add("Content-Type", "application/json") return req diff --git a/src/autoscaler/metricsforwarder/server/auth/authenticator.go b/src/autoscaler/metricsforwarder/server/auth/authenticator.go index eee5bc7e08..8d3b76d4f0 100644 --- a/src/autoscaler/metricsforwarder/server/auth/authenticator.go +++ b/src/autoscaler/metricsforwarder/server/auth/authenticator.go @@ -1,6 +1,7 @@ package auth import ( + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" "errors" "net/http" @@ -17,12 +18,14 @@ var ErrorAuthNotFound = errors.New("authentication method not found") type Auth struct { logger lager.Logger credentials cred_helper.Credentials + bindingDB db.BindingDB } -func New(logger lager.Logger, credentials cred_helper.Credentials) (*Auth, error) { +func New(logger lager.Logger, credentials cred_helper.Credentials, bindingDB db.BindingDB) (*Auth, error) { return &Auth{ logger: logger, credentials: credentials, + bindingDB: bindingDB, }, nil } @@ -52,7 +55,8 @@ func (a *Auth) AuthenticateHandler(next http.Handler) func(w http.ResponseWriter func (a *Auth) CheckAuth(r *http.Request, appID string) error { var errAuth error - errAuth = a.XFCCAuth(r, appID) + a.logger.Info("checking authentication for app", lager.Data{"app_id": appID}) + errAuth = a.XFCCAuth(r, a.bindingDB, appID) if errAuth != nil { if errors.Is(errAuth, ErrXFCCHeaderNotFound) { a.logger.Info("Trying basic auth") diff --git a/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go b/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go new file mode 100644 index 0000000000..5b65ab1b78 --- /dev/null +++ b/src/autoscaler/metricsforwarder/server/auth/custom_metrics_strategy.go @@ -0,0 +1,49 @@ +package auth + +import ( + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/lager/v3" + "net/http" +) + +type MetricsSubmissionStrategy interface { + validate(appId string, submitterAppIdFromCert string, logger lager.Logger, bindingDB db.BindingDB, r *http.Request) error +} + +var _ MetricsSubmissionStrategy = &DefaultMetricsSubmissionStrategy{} + +type DefaultMetricsSubmissionStrategy struct{} + +func (d *DefaultMetricsSubmissionStrategy) validate(appId string, submitterAppIdFromCert string, logger lager.Logger, bindingDB db.BindingDB, r *http.Request) error { + // check if appID is same as AppIdFromCert + if appId != submitterAppIdFromCert { + return ErrorAppIDWrong + } + return nil +} + +type BoundedMetricsSubmissionStrategy struct{} + +func (c *BoundedMetricsSubmissionStrategy) validate(appId string, submitterAppIdFromCert string, logger lager.Logger, bindingDB db.BindingDB, r *http.Request) error { + if appId != submitterAppIdFromCert { + c.verifyMetricSubmissionStrategy(r, logger, bindingDB, submitterAppIdFromCert, appId) + } + return nil +} + +func (c *BoundedMetricsSubmissionStrategy) verifyMetricSubmissionStrategy(r *http.Request, logger lager.Logger, bindingDB db.BindingDB, submitterAppCert string, appID string) (bool, error) { + + logger.Info("custom-metrics-submission-strategy-found", lager.Data{"appID": appID, "submitterAppCertID": submitterAppCert}) + // check if the app is bound to same autoscaler instance by check the binding id from the bindingdb + // if the app is bound to the same autoscaler instance, then allow the request to the next handler i.e publish custom metrics + isAppBound, err := bindingDB.IsAppBoundToSameAutoscaler(r.Context(), submitterAppCert, appID) + if err != nil { + logger.Error("error-checking-app-bound-to-same-service", err, lager.Data{"metric-submitter-app-id": submitterAppCert}) + return false, err + } + if isAppBound == false { + logger.Info("app-not-bound-to-same-service", lager.Data{"app-id": submitterAppCert}) + return false, ErrorAppNotBound + } + return true, nil +} diff --git a/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go b/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go index a91980cbe8..0b6ab1e818 100644 --- a/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go +++ b/src/autoscaler/metricsforwarder/server/auth/xfcc_auth.go @@ -1,6 +1,8 @@ package auth import ( + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/lager/v3" "crypto/x509" "encoding/base64" "errors" @@ -9,11 +11,14 @@ import ( "strings" ) +const customMetricsStrategyType = "bound_app" + var ErrXFCCHeaderNotFound = errors.New("mTLS authentication method not found") var ErrorNoAppIDFound = errors.New("certificate does not contain an app id") -var ErrorAppIDWrong = errors.New("app id in certificate is not valid") +var ErrorAppIDWrong = errors.New("app is not allowed to send metrics due to invalid app id in certificate") +var ErrorAppNotBound = errors.New("application is not bound to the same service instance") -func (a *Auth) XFCCAuth(r *http.Request, appID string) error { +func (a *Auth) XFCCAuth(r *http.Request, bindingDB db.BindingDB, appID string) error { xfccHeader := r.Header.Get("X-Forwarded-Client-Cert") if xfccHeader == "" { return ErrXFCCHeaderNotFound @@ -29,20 +34,77 @@ func (a *Auth) XFCCAuth(r *http.Request, appID string) error { return fmt.Errorf("failed to parse certificate: %w", err) } - certAppId := getAppId(cert) + submitterAppCert := readAppIdFromCert(cert) - if len(certAppId) == 0 { + if len(submitterAppCert) == 0 { return ErrorNoAppIDFound } - if appID != certAppId { - return ErrorAppIDWrong + // appID = custom metrics producer + // submitterAppCert = app id in certificate + // Case 1 : custom metrics can only be published by the app itself + // Case 2 : custom metrics can be published by any app bound to the same autoscaler instance + // In short, if the requester is not same as the scaling app + if appID != submitterAppCert { + var metricSubmissionStrategy MetricsSubmissionStrategy + customMetricSubmissionStrategy, err := bindingDB.GetCustomMetricStrategyByAppId(r.Context(), submitterAppCert) + a.logger.Info("custom-metrics-submission-strategy", lager.Data{"submitterAppCert": submitterAppCert, "strategy": customMetricSubmissionStrategy}) + if customMetricSubmissionStrategy == customMetricsStrategyType { + metricSubmissionStrategy = &BoundedMetricsSubmissionStrategy{} + } else { + metricSubmissionStrategy = &DefaultMetricsSubmissionStrategy{} + } + err = metricSubmissionStrategy.validate(appID, submitterAppCert, a.logger, bindingDB, r) + if err != nil { + return err + } + //////// + /*a.logger.Info("Checking custom metrics submission strategy") + validSubmitter, err := verifyMetricSubmissionStrategy(r, a.logger, bindingDB, submitterAppCert, appID) + if err != nil { + a.logger.Error("error-verifying-custom-metrics-submitter-app", err, lager.Data{"metric-submitter-app-id": submitterAppCert}) + return err + } /* no need to check as this is the default case + else if customMetricSubmissionStrategy == "same_app" || customMetricSubmissionStrategy == "" { // default case + // if the app is the same app, then allow the request to the next handler i.e 403 + a.logger.Info("custom-metrics-submission-strategy", lager.Data{"strategy": customMetricSubmissionStrategy}) + return ErrorAppIDWrong + } */ + /*if validSubmitter == true { + return nil + } else { + return ErrorAppIDWrong */ } return nil } -func getAppId(cert *x509.Certificate) string { +func verifyMetricSubmissionStrategy(r *http.Request, logger lager.Logger, bindingDB db.BindingDB, submitterAppCert string, appID string) (bool, error) { + + customMetricSubmissionStrategy := r.Header.Get("custom-metrics-submission-strategy") + customMetricSubmissionStrategy = strings.ToLower(customMetricSubmissionStrategy) + if customMetricSubmissionStrategy == "" { + logger.Info("custom-metrics-submission-strategy-not-found", lager.Data{"appID": appID}) + return false, nil + } + if customMetricSubmissionStrategy == "bound_app" { + logger.Info("custom-metrics-submission-strategy-found", lager.Data{"appID": appID, "strategy": customMetricSubmissionStrategy}) + // check if the app is bound to same autoscaler instance by check the binding id from the bindingdb + // if the app is bound to the same autoscaler instance, then allow the request to the next handler i.e publish custom metrics + isAppBound, err := bindingDB.IsAppBoundToSameAutoscaler(r.Context(), submitterAppCert, appID) + if err != nil { + logger.Error("error-checking-app-bound-to-same-service", err, lager.Data{"metric-submitter-app-id": submitterAppCert}) + return false, err + } + if isAppBound == false { + logger.Info("app-not-bound-to-same-service", lager.Data{"app-id": submitterAppCert}) + return false, ErrorAppNotBound + } + } + return true, nil +} + +func readAppIdFromCert(cert *x509.Certificate) string { var certAppId string for _, ou := range cert.Subject.OrganizationalUnit { if strings.Contains(ou, "app:") { diff --git a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go index e3807c7a28..bd32de39ad 100644 --- a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go +++ b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers.go @@ -20,23 +20,25 @@ import ( ) var ( - ErrorReadingBody = errors.New("error reading custom metrics request body") - ErrorUnmarshalingBody = errors.New("error unmarshaling custom metrics request body") - ErrorParsingBody = errors.New("error parsing request body") + ErrorReadingBody = errors.New("error reading custom metrics request body") + ErrorUnmarshallingBody = errors.New("error unmarshalling custom metrics request body") + ErrorParsingBody = errors.New("error parsing request body") ) type CustomMetricsHandler struct { metricForwarder forwarder.MetricForwarder policyDB db.PolicyDB + bindingDB db.BindingDB allowedMetricCache cache.Cache cacheTTL time.Duration logger lager.Logger } -func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, allowedMetricCache cache.Cache) *CustomMetricsHandler { +func NewCustomMetricsHandler(logger lager.Logger, metricForwarder forwarder.MetricForwarder, policyDB db.PolicyDB, bindingDB db.BindingDB, allowedMetricCache cache.Cache) *CustomMetricsHandler { return &CustomMetricsHandler{ metricForwarder: metricForwarder, policyDB: policyDB, + bindingDB: bindingDB, allowedMetricCache: allowedMetricCache, logger: logger, } @@ -51,7 +53,7 @@ func (mh *CustomMetricsHandler) VerifyCredentialsAndPublishMetrics(w http.Respon handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ Code: "Internal-Server-Error", Message: "error reading custom metrics request body"}) - } else if errors.Is(err, ErrorUnmarshalingBody) { + } else if errors.Is(err, ErrorUnmarshallingBody) { handlers.WriteJSONResponse(w, http.StatusBadRequest, models.ErrorResponse{ Code: "Bad-Request", Message: "Error unmarshaling custom metrics request body"}) @@ -97,12 +99,12 @@ func (mh *CustomMetricsHandler) PublishMetrics(w http.ResponseWriter, r *http.Re var metricsConsumer *models.MetricsConsumer err = json.Unmarshal(body, &metricsConsumer) if err != nil { - mh.logger.Error("error-unmarshaling-metrics", err, lager.Data{"body": r.Body}) - return ErrorUnmarshalingBody + mh.logger.Error("error-unmarshalling-metrics", err, lager.Data{"body": r.Body}) + return ErrorUnmarshallingBody } err = mh.validateCustomMetricTypes(appID, metricsConsumer) if err != nil { - mh.logger.Error("failed-validating-metrictypes", err, lager.Data{"metrics": metricsConsumer}) + mh.logger.Error("failed-validating-metric-types", err, lager.Data{"metrics": metricsConsumer}) return fmt.Errorf("metric validation Failed %w", err) } @@ -138,7 +140,17 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric // AllowedMetrics found in cache allowedMetricTypeSet = res.(map[string]struct{}) } else { - // AllowedMetrics not found in cache, find AllowedMetrics from Database + // allow app with strategy as bound_app to submit metrics without policy + isAppWithBoundStrategy, err := mh.isAppWithBoundStrategy(appGUID) + if err != nil { + mh.logger.Error("error-finding-app-submission-strategy", err, lager.Data{"appId": appGUID}) + return err + } + if isAppWithBoundStrategy { + mh.logger.Info("app-with-bound-strategy-found", lager.Data{"appId": appGUID}) + return nil + } + scalingPolicy, err := mh.policyDB.GetAppPolicy(context.TODO(), appGUID) if err != nil { mh.logger.Error("error-getting-policy", err, lager.Data{"appId": appGUID}) @@ -172,6 +184,20 @@ func (mh *CustomMetricsHandler) validateCustomMetricTypes(appGUID string, metric return nil } +func (mh *CustomMetricsHandler) isAppWithBoundStrategy(appGUID string) (bool, error) { + // allow app with submission_strategy as bound_app to submit custom metrics even without policy + submissionStrategy, err := mh.bindingDB.GetCustomMetricStrategyByAppId(context.TODO(), appGUID) + if err != nil { + mh.logger.Error("error-getting-custom-metrics-strategy", err, lager.Data{"appId": appGUID}) + return false, err + } + if submissionStrategy == "bound_app" { + mh.logger.Info("bounded-metrics-submission-strategy", lager.Data{"appId": appGUID, "submission_strategy": submissionStrategy}) + return true, nil + } + return false, nil +} + func (mh *CustomMetricsHandler) getMetrics(appID string, metricsConsumer *models.MetricsConsumer) []*models.CustomMetric { var metrics []*models.CustomMetric for _, metric := range metricsConsumer.CustomMetrics { diff --git a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go index 9eea0d12ef..31bf3d8e94 100644 --- a/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go +++ b/src/autoscaler/metricsforwarder/server/custom_metrics_handlers_test.go @@ -29,6 +29,7 @@ var _ = Describe("MetricHandler", func() { allowedMetricTypeSet map[string]struct{} policyDB *fakes.FakePolicyDB + fakeBindingDB *fakes.FakeBindingDB metricsforwarder *fakes.FakeMetricForwarder resp *httptest.ResponseRecorder @@ -46,12 +47,13 @@ var _ = Describe("MetricHandler", func() { BeforeEach(func() { logger := lager.NewLogger("metrichandler-test") policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} metricsforwarder = &fakes.FakeMetricForwarder{} allowedMetricCache = *cache.New(10*time.Minute, -1) allowedMetricTypeSet = make(map[string]struct{}) vars = make(map[string]string) resp = httptest.NewRecorder() - handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, allowedMetricCache) + handler = NewCustomMetricsHandler(logger, metricsforwarder, policyDB, fakeBindingDB, allowedMetricCache) allowedMetricCache.Flush() }) @@ -71,7 +73,7 @@ var _ = Describe("MetricHandler", func() { }, nil) body = []byte(`{ "instance_index":0, - "test" : + "test" : "metrics":[ { "name":"custom_metric1", @@ -265,6 +267,55 @@ var _ = Describe("MetricHandler", func() { }) }) + + Context("when a valid request to publish custom metrics comes from a neighbour App", func() { + When("neighbour app is bound to same autoscaler instance with policy", func() { + BeforeEach(func() { + scalingPolicy = &models.ScalingPolicy{ + InstanceMin: 1, + InstanceMax: 6, + ScalingRules: []*models.ScalingRule{{ + MetricType: "queuelength", + BreachDurationSeconds: 60, + Threshold: 10, + Operator: ">", + CoolDownSeconds: 60, + Adjustment: "+1"}}} + policyDB.GetAppPolicyReturns(scalingPolicy, nil) + customMetrics := []*models.CustomMetric{ + { + Name: "queuelength", Value: 12, Unit: "unit", InstanceIndex: 1, AppGUID: "an-app-id", + }, + } + body, err = json.Marshal(models.MetricsConsumer{InstanceIndex: 0, CustomMetrics: customMetrics}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should returns status code 200 and policy exists", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(policyDB.GetAppPolicyCallCount()).To(Equal(1)) + + }) + }) + When("neighbour app is bound to same autoscaler instance without policy", func() { + BeforeEach(func() { + fakeBindingDB.GetCustomMetricStrategyByAppIdReturns("bound_app", nil) + customMetrics := []*models.CustomMetric{ + { + Name: "queuelength", Value: 12, Unit: "unit", InstanceIndex: 1, AppGUID: "an-app-id", + }, + } + body, err = json.Marshal(models.MetricsConsumer{InstanceIndex: 0, CustomMetrics: customMetrics}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should returns status code 200", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) + Expect(fakeBindingDB.GetCustomMetricStrategyByAppIdCallCount()).To(Equal(1)) + + }) + }) + }) }) }) diff --git a/src/autoscaler/metricsforwarder/server/server.go b/src/autoscaler/metricsforwarder/server/server.go index 582ee37b0e..f98f6798cf 100644 --- a/src/autoscaler/metricsforwarder/server/server.go +++ b/src/autoscaler/metricsforwarder/server/server.go @@ -23,14 +23,14 @@ import ( "github.com/tedsuo/ifrit" ) -func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, credentials cred_helper.Credentials, allowedMetricCache cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector, rateLimiter ratelimiter.Limiter) (ifrit.Runner, error) { +func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, bindingDB db.BindingDB, credentials cred_helper.Credentials, allowedMetricCache cache.Cache, httpStatusCollector healthendpoint.HTTPStatusCollector, rateLimiter ratelimiter.Limiter) (ifrit.Runner, error) { metricForwarder, err := createMetricForwarder(logger, conf) if err != nil { return nil, fmt.Errorf("failed to create metric forwarder: %w", err) } - mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, allowedMetricCache) - authenticator, err := auth.New(logger, credentials) + mh := NewCustomMetricsHandler(logger, *metricForwarder, policyDb, bindingDB, allowedMetricCache) + authenticator, err := auth.New(logger, credentials, bindingDB) if err != nil { return nil, fmt.Errorf("failed to create auth middleware: %w", err) } diff --git a/src/autoscaler/metricsforwarder/server/server_suite_test.go b/src/autoscaler/metricsforwarder/server/server_suite_test.go index 23dbdbcf31..f620989f49 100644 --- a/src/autoscaler/metricsforwarder/server/server_suite_test.go +++ b/src/autoscaler/metricsforwarder/server/server_suite_test.go @@ -24,10 +24,13 @@ import ( ) var ( - conf *config.Config - serverProcess ifrit.Process - serverUrl string - policyDB *fakes.FakePolicyDB + conf *config.Config + serverProcess ifrit.Process + serverUrl string + policyDB *fakes.FakePolicyDB + + fakeBindingDB *fakes.FakeBindingDB + rateLimiter *fakes.FakeLimiter fakeCredentials *fakes.FakeCredentials @@ -82,6 +85,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { Health: healthConfig, } policyDB = &fakes.FakePolicyDB{} + fakeBindingDB = &fakes.FakeBindingDB{} + allowedMetricCache = *cache.New(10*time.Minute, -1) httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "metricsforwarder") @@ -91,7 +96,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { logger := lager.NewLogger("server_suite_test") logger.RegisterSink(lager.NewWriterSink(GinkgoWriter, lager.DEBUG)) - httpServer, err := NewServer(logger, conf, policyDB, + httpServer, err := NewServer(logger, conf, policyDB, fakeBindingDB, fakeCredentials, allowedMetricCache, httpStatusCollector, rateLimiter) Expect(err).NotTo(HaveOccurred()) serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port) diff --git a/src/autoscaler/metricsforwarder/server/server_test.go b/src/autoscaler/metricsforwarder/server/server_test.go index 8c3d121249..fb7698ed95 100644 --- a/src/autoscaler/metricsforwarder/server/server_test.go +++ b/src/autoscaler/metricsforwarder/server/server_test.go @@ -44,7 +44,7 @@ func setupRequest(method, url, authHeader string, body []byte) (*http.Client, *h return client, req, nil } -var _ = Describe("CustomMetrics Server", func() { +var _ = Describe("CustomMetricsConfig Server", func() { var ( resp *http.Response req *http.Request diff --git a/src/autoscaler/models/binding_configs.go b/src/autoscaler/models/binding_configs.go new file mode 100644 index 0000000000..7b2a9249fd --- /dev/null +++ b/src/autoscaler/models/binding_configs.go @@ -0,0 +1,42 @@ +package models + +// BindingConfig +/* The configuration object received as part of the binding parameters. Example config: +{ + "configuration": { + "custom_metrics": { + "auth": { + "credential_type": "binding_secret" + }, + "metric_submission_strategy": { + "allow_from": "bound_app or same_app" + } + } + } +*/ +type BindingConfig struct { + Configuration Configuration `json:"configuration"` +} +type Configuration struct { + CustomMetrics CustomMetricsConfig `json:"custom_metrics"` +} + +type CustomMetricsConfig struct { + Auth Auth `json:"auth,omitempty"` + MetricSubmissionStrategy MetricsSubmissionStrategy `json:"metric_submission_strategy"` +} + +type Auth struct { + CredentialType string `json:"credential_type"` +} +type MetricsSubmissionStrategy struct { + AllowFrom string `json:"allow_from"` +} + +func (b *BindingConfig) GetCustomMetricsStrategy() string { + return b.Configuration.CustomMetrics.MetricSubmissionStrategy.AllowFrom +} + +func (b *BindingConfig) SetDefaultCustomMetricsStrategy(allowFrom string) { + b.Configuration.CustomMetrics.MetricSubmissionStrategy.AllowFrom = allowFrom +} diff --git a/templates/app-autoscaler.yml b/templates/app-autoscaler.yml index fc72485f0f..66b62f37c5 100644 --- a/templates/app-autoscaler.yml +++ b/templates/app-autoscaler.yml @@ -479,6 +479,7 @@ instance_groups: properties: autoscaler: policy_db: *database + binding_db: *database policy_db_connection_config: *databaseConnectionConfig metricsforwarder: health: