Skip to content

Commit

Permalink
functionaltests: restart Integrations Server after ingestion (#15356)
Browse files Browse the repository at this point in the history
* expose deployment id

* add ecclient package

* restart apm-server after ingest

* refactor RunBlockingWait to restart APM Server

This removes the previous implementation with document
count checks and the logic that was trying to ensure
ingestion was completed.
The method now relies only on restarting the APM Server.

(cherry picked from commit c230240)

# Conflicts:
#	functionaltests/go.sum
  • Loading branch information
endorama authored and mergify[bot] committed Jan 24, 2025
1 parent 167f1f8 commit c2383b9
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 58 deletions.
12 changes: 10 additions & 2 deletions functionaltests/8_15_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/elastic/apm-server/functionaltests/internal/ecclient"
"github.com/elastic/apm-server/functionaltests/internal/esclient"
"github.com/elastic/apm-server/functionaltests/internal/gen"
"github.com/elastic/apm-server/functionaltests/internal/terraform"
Expand Down Expand Up @@ -59,6 +60,8 @@ func TestUpgrade_8_15_4_to_8_16_0(t *testing.T) {
}
})

var deploymentID string
require.NoError(t, tf.Output("deployment_id", &deploymentID))
var escfg esclient.Config
tf.Output("apm_url", &escfg.APMServerURL)
tf.Output("es_url", &escfg.ElasticsearchURL)
Expand All @@ -68,6 +71,9 @@ func TestUpgrade_8_15_4_to_8_16_0(t *testing.T) {

t.Logf("created deployment %s", escfg.KibanaURL)

c, err := ecclient.New(endpointFrom(*target))
require.NoError(t, err)

ecc, err := esclient.New(escfg)
require.NoError(t, err)

Expand All @@ -81,7 +87,8 @@ func TestUpgrade_8_15_4_to_8_16_0(t *testing.T) {
previous, err := getDocsCountPerDS(t, ctx, ecc)
require.NoError(t, err)

g.RunBlockingWait(ctx, ecc, expectedIngestForASingleRun(), previous, 1*time.Minute)
require.NoError(t, g.RunBlockingWait(ctx, c, deploymentID))
t.Logf("time elapsed: %s", time.Now().Sub(start))

beforeUpgradeCount, err := getDocsCountPerDS(t, ctx, ecc)
require.NoError(t, err)
Expand Down Expand Up @@ -124,7 +131,8 @@ func TestUpgrade_8_15_4_to_8_16_0(t *testing.T) {
IndicesManagedBy: []string{"Data stream lifecycle"},
}, dss)

g.RunBlockingWait(ctx, ecc, expectedIngestForASingleRun(), previous, 1*time.Minute)
require.NoError(t, g.RunBlockingWait(ctx, c, deploymentID))
t.Logf("time elapsed: %s", time.Now().Sub(start))

t.Log("check number of documents")
afterUpgradeIngestionCount, err := getDocsCountPerDS(t, ctx, ecc)
Expand Down
4 changes: 4 additions & 0 deletions functionaltests/TestUpgrade_8_15_4_to_8_16_0/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ output "password" {
output "kb_url" {
value = module.ec_deployment.kibana_url
}

output "deployment_id" {
value = module.ec_deployment.deployment_id
}
22 changes: 22 additions & 0 deletions functionaltests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.23.2

require (
github.com/elastic/apm-perf v0.0.0-20241230130730-2ad47482b731
github.com/elastic/cloud-sdk-go v1.23.0
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/hashicorp/terraform-exec v0.21.0
github.com/stretchr/testify v1.10.0
Expand All @@ -13,13 +14,32 @@ require (
require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.21.2 // indirect
github.com/go-openapi/errors v0.20.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/loads v0.21.1 // indirect
github.com/go-openapi/runtime v0.23.0 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
github.com/go-openapi/strfmt v0.21.2 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-openapi/validate v0.20.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/terraform-json v0.22.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
Expand All @@ -29,6 +49,7 @@ require (
github.com/zclconf/go-cty v1.14.4 // indirect
go.elastic.co/apm/v2 v2.6.2 // indirect
go.elastic.co/fastjson v1.4.0 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
Expand All @@ -37,5 +58,6 @@ require (
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.8.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
346 changes: 345 additions & 1 deletion functionaltests/go.sum

Large diffs are not rendered by default.

132 changes: 132 additions & 0 deletions functionaltests/internal/ecclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ecclient

import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/elastic/cloud-sdk-go/pkg/api"
"github.com/elastic/cloud-sdk-go/pkg/api/deploymentapi"
"github.com/elastic/cloud-sdk-go/pkg/auth"
"github.com/elastic/cloud-sdk-go/pkg/client/deployments"
)

type Client struct {
*api.API
endpoint string
}

func New(endpoint string) (*Client, error) {
apiKey := os.Getenv("EC_API_KEY")
if apiKey == "" {
return nil, fmt.Errorf("unable to obtain value from EC_API_KEY environment variable")
}

if endpoint == "" {
return nil, fmt.Errorf("endpoint is required")
}

ess, err := api.NewAPI(api.Config{
AuthWriter: auth.APIKey(apiKey),
Client: new(http.Client),
Host: endpoint,
})
if err != nil {
return nil, fmt.Errorf("cannot create Elastic Cloud API client: %w", err)
}

c := Client{ess, endpoint}

return &c, nil
}

func (c *Client) RestartIntegrationServer(ctx context.Context, deploymentID string) error {
res, err := deploymentapi.Get(deploymentapi.GetParams{
API: c.API,
DeploymentID: deploymentID,
})
if err != nil {

return fmt.Errorf("cannot retrieve ref id of integrations server for deployment %s: %w", deploymentID, err)
}

refID := *res.Resources.IntegrationsServer[0].RefID

// This is an undocumented API, but it works.
// Is like https://www.elastic.co/docs/api/doc/cloud/operation/operation-restart-deployment-es-resource
// but using integrations_server instead of elasticsearch. integrations_server is the expected Kind for
// the 8.x APM server setup on Elastic Cloud.
url := fmt.Sprintf("%s/api/v1/deployments/%s/integrations_server/%s/_restart", c.endpoint, deploymentID, refID)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return fmt.Errorf("cannot create integrations server restart request for deployment %s: %w", deploymentID, err)
}

req = c.API.AuthWriter.AuthRequest(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("cannot execute HTTP request for restarting deployment %s: %w", deploymentID, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusAccepted {
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("cannot read body after receiving a %d response while restarting integrations server: %w", resp.StatusCode, err)
}
return fmt.Errorf("restarting integrations server returned %d response with content: %s", resp.StatusCode, b)
}

// Wait until the integration server is back online.
status := func() (string, error) {
r, err := c.API.V1API.Deployments.GetDeploymentIntegrationsServerResourceInfo(
deployments.NewGetDeploymentIntegrationsServerResourceInfoParams().
WithDeploymentID(deploymentID).
WithRefID(refID),
c.API.AuthWriter)
if err != nil {
return "", err
}

return *r.Payload.Info.Status, nil
}
timeout := 10 * time.Minute
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-tctx.Done():
return fmt.Errorf("timeout reached waiting for integrations server to restart")
case <-ticker.C:
s, err := status()
if err != nil {
return fmt.Errorf("cannot retrieve integrations server status: %w", err)
}
if s == "started" {
return nil
}
}
}
}
62 changes: 9 additions & 53 deletions functionaltests/internal/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"context"
"fmt"
"net/url"
"time"

"go.uber.org/zap"

"github.com/elastic/apm-perf/pkg/telemetrygen"
"github.com/elastic/apm-server/functionaltests/internal/esclient"
"github.com/elastic/apm-server/functionaltests/internal/ecclient"
)

type Generator struct {
Expand Down Expand Up @@ -68,61 +67,18 @@ func (g *Generator) RunBlocking(ctx context.Context) error {
}

// RunBlockingWait runs the underlying generator in blocking mode and waits until the
// document count retrieved through ecc matches expected or timeout. It supports
// specifying previous document count to offset the expectation based on a previous state.
// expected and previous must be maps of <datastream name, document count>.
func (g *Generator) RunBlockingWait(ctx context.Context, ecc *esclient.Client, expected, previous map[string]int, timeout time.Duration) error {
// cluster Integrations Server has been restarted.
// Restarting APM Server ensures all data, including aggregations, in flushed before
// shutdown, ensuring ingestion and 1m aggregations to be completed.
func (g *Generator) RunBlockingWait(ctx context.Context, c *ecclient.Client, deploymentID string) error {
if err := g.RunBlocking(ctx); err != nil {
return fmt.Errorf("cannot run generator: %w", err)
}

// this function checks that expected docs count is reached,
// accounting for any previous state.
checkDocsCount := func(docsCount map[string]int) bool {
equal := false
for ds, c := range docsCount {
if e, ok := expected[ds]; ok {
got := c - previous[ds]
equal = (e == got)
}
}
return equal
}
prevdocs := map[string]int{}
equaltoprevdocs := 0
// this function checks that all returned data streams doc counts
// stay still for some iterations. This forces a 15 seconds longer wait
// but ensures that aggredation data streams have stabilized before
// allowing the code to proceed. This should prevent situation where
// aggregation are still running when expected data streams reached
// their expected doc count.
checkAggregationDocs := func(prevdocs, docsCount map[string]int) bool {
if equaltoprevdocs == 3 {
return true
}
if fmt.Sprint(prevdocs) == fmt.Sprint(docsCount) {
equaltoprevdocs += 1
}
return false
g.Logger.Info("restarting integrations server to flush apm server data")
if err := c.RestartIntegrationServer(ctx, deploymentID); err != nil {
return fmt.Errorf("cannot restart integrations server: %w", err)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-tctx.Done():
return nil
case <-ticker.C:
docsCount, err := ecc.ApmDocCount(ctx)
if err != nil {
return fmt.Errorf("cannot retrieve APM doc count: %w", err)
}
if checkDocsCount(docsCount) && checkAggregationDocs(prevdocs, docsCount) {
return nil
}
prevdocs = docsCount
}
}
return nil
}
22 changes: 20 additions & 2 deletions functionaltests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,34 @@ func assertDatastreams(t *testing.T, expected checkDatastreamWant, actual []type

}

const (
targetQA = "qa"
// we use 'pro' because is the target passed by the Buildkite pipeline running
// these tests.
targetProd = "pro"
)

// regionFrom returns the appropriate region to run test
// againts based on specified target.
// https://www.elastic.co/guide/en/cloud/current/ec-regions-templates-instances.html
func regionFrom(target string) string {
switch target {
case "qa":
case targetQA:
return "aws-eu-west-1"
case "pro":
case targetProd:
return "eu-west-1"
default:
panic("target value is not accepted")
}
}

func endpointFrom(target string) string {
switch target {
case targetQA:
return "https://public-api.qa.cld.elstc.co"
case targetProd:
return "https://api.elastic-cloud.com"
default:
panic("target value is not accepted")
}
}
5 changes: 5 additions & 0 deletions testing/infra/terraform/modules/ec_deployment/outputs.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
output "deployment_id" {
value = ec_deployment.deployment.id
description = "The deployment ID for the created cluster"
}

output "kibana_url" {
value = ec_deployment.deployment.kibana.0.https_endpoint
description = "The secure Kibana URL"
Expand Down

0 comments on commit c2383b9

Please sign in to comment.