diff --git a/integrationservertest/apm-version-bug_test.go b/integrationservertest/apm-version-bug_test.go new file mode 100644 index 00000000000..e85cca9f88e --- /dev/null +++ b/integrationservertest/apm-version-bug_test.go @@ -0,0 +1,195 @@ +package integrationservertest + +import ( + "errors" + "fmt" + "regexp" + "testing" + + "github.com/elastic/apm-server/integrationservertest/internal/ech" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" +) + +// See https://github.com/elastic/ingest-dev/issues/5701 +// Available tests: +// TestAPMResourcesVersionBug/8.16.2_to_8.17.8 +// TestAPMResourcesVersionBug/8.17.7_to_8.17.8 +// TestAPMResourcesVersionBug/8.17.8_to_8.18.3 +// TestAPMResourcesVersionBug/8.18.2_to_8.18.3 +// TestAPMResourcesVersionBug/8.18.3_to_8.19.0 +func TestAPMResourcesVersionBug(t *testing.T) { + config, err := parseConfig("upgrade-config.yaml") + if err != nil { + t.Fatal(err) + } + + zeroEventIngestedDocs := checkFieldExistsInDocsStep{ + dataStreamName: "traces-apm-default", + fieldName: "event.ingested", + checkFn: func(i int64) bool { return i == 0 }, + } + noEventIngestedInMappings := checkMappingStep{ + datastreamname: "traces-apm-default", + indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000001"), + checkFn: func(mappings types.TypeMapping) error { + if !hasNestedField(mappings, "event.ingested") { + return nil + } + return errors.New("there should be no event.ingested here") + }, + } + someEventIngestedDocs := checkFieldExistsInDocsStep{ + dataStreamName: "traces-apm-default", + fieldName: "event.ingested", + } + + // this wraps the usual steps build to add additional checks in between. + // Expect one upgrade, no ingested docs before, some after. + oneUpgradeZeroThenSome := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep { + steps := buildTestSteps(t, versions, config, false) + return []testStep{ + steps[0], // create + steps[1], // ingest + zeroEventIngestedDocs, + noEventIngestedInMappings, + steps[2], // upgrade + steps[3], // ingest + someEventIngestedDocs, + checkMappingStep{ + datastreamname: "traces-apm-default", + indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), + checkFn: func(mappings types.TypeMapping) error { + if hasNestedField(mappings, "event.ingested") { + return nil + } + return fmt.Errorf("there should be an event.ingested here") + }, + }, + } + } + + // this wraps the usual steps build to add additional checks in between. + // Expect two upgrades, some ingested docs after the first upgrade, some after the second. + twoUpgradesSomeThenSome := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep { + steps := buildTestSteps(t, versions, config, false) + howmany := int64(0) + return []testStep{ + steps[0], // create + steps[1], // ingest + steps[2], // upgrade + steps[3], // ingest + checkFieldExistsInDocsStep{ + dataStreamName: "traces-apm-default", + fieldName: "event.ingested", + checkFn: func(i int64) bool { + if i == 0 { + return false + } + // Store howmany to cross check it's higher after next upgrade/ingest cycle. + howmany = i + return true + }, + }, + checkMappingStep{ + datastreamname: "traces-apm-default", + indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), + checkFn: func(mappings types.TypeMapping) error { + if hasNestedField(mappings, "event.ingested") { + return nil + } + return fmt.Errorf("there should be an event.ingested here") + }, + }, + steps[4], // upgrade + steps[5], // upgrade + checkFieldExistsInDocsStep{ + dataStreamName: "traces-apm-default", + fieldName: "event.ingested", + checkFn: func(i int64) bool { + if i == 0 { + return false + } + if i <= howmany { + return false + } + return true + }, + }, + checkMappingStep{ + datastreamname: "traces-apm-default", + indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000003"), + checkFn: func(mappings types.TypeMapping) error { + if hasNestedField(mappings, "event.ingested") { + return nil + } + return fmt.Errorf("there should be an event.ingested here") + }, + }, + } + } + + version8162 := vsCache.GetLatestVersionOrSkip(t, "8.16.2") + version8177 := vsCache.GetLatestVersionOrSkip(t, "8.17.7") + version8178 := vsCache.GetLatestBCOrSkip(t, "8.17.8") // latest 8.17 as of today + version8182 := vsCache.GetLatestVersionOrSkip(t, "8.18.2") + version8183 := vsCache.GetLatestBCOrSkip(t, "8.18.3") // latest 8.18 as of today + // version8190 := vsCache.GetLatestBCOrSkip(t, "8.19.0") // latest 8.19 as of today + // the event.ingested change was introduced in 8.16.3, as + // it only shows if we upgrade a cluster from a version with + // the bug we need to start from 8.16.2. + start := version8162 + + t.Run("8.16.2 to 8.17.8", func(t *testing.T) { + versions := []ech.Version{start, version8177} + runner := testStepsRunner{ + Target: *target, + Steps: oneUpgradeZeroThenSome(t, versions, config), + } + runner.Run(t) + }) + + t.Run("8.17.7 to 8.17.8", func(t *testing.T) { + versions := []ech.Version{start, version8177, version8178} + runner := testStepsRunner{ + Target: *target, + Steps: twoUpgradesSomeThenSome(t, versions, config), + } + runner.Run(t) + }) + + t.Run("8.17.8 to 8.18.3", func(t *testing.T) { + versions := []ech.Version{version8178, version8183} + runner := testStepsRunner{ + Target: *target, + Steps: buildTestSteps(t, versions, config, false), + } + runner.Run(t) + }) + + t.Run("8.18.2 to 8.18.3", func(t *testing.T) { + versions := []ech.Version{start, version8182, version8183} + runner := testStepsRunner{ + Target: *target, + Steps: twoUpgradesSomeThenSome(t, versions, config), + } + runner.Run(t) + }) + + t.Run("8.18.3 to 8.19.0", func(t *testing.T) { + // FIXME: use 8.19.0 BC when it becomes available. + start := vsCache.GetLatestSnapshot(t, "8.16.2") + version8183 := vsCache.GetLatestSnapshot(t, "8.18.3") + version8190 := vsCache.GetLatestSnapshot(t, "8.19.0") + versions := []ech.Version{start, version8183, version8190} + steps := twoUpgradesSomeThenSome(t, versions, config) + steps = append(steps, checkFieldExistsInDocsStep{ + dataStreamName: "traces-apm-default", + fieldName: "event.success_count", + }) + runner := testStepsRunner{ + Target: *target, + Steps: steps, + } + runner.Run(t) + }) +} diff --git a/integrationservertest/buglib.go b/integrationservertest/buglib.go new file mode 100644 index 00000000000..5f0c13dbf0c --- /dev/null +++ b/integrationservertest/buglib.go @@ -0,0 +1,152 @@ +package integrationservertest + +import ( + "context" + "errors" + "fmt" + "regexp" + "strings" + "testing" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/count" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" +) + +// hasNestedField checks if a nested field (dot-separated path) exists in the properties map. +func hasNestedField(mappings types.TypeMapping, fieldPath string) bool { + parts := strings.Split(fieldPath, ".") + current := mappings.Properties + // fmt.Println("parts", parts) + // fmt.Println("curre", current) + for i, part := range parts { + // Check if the part exists in the current properties map + val, exists := current[part] + // fmt.Println("val", val, "parts", i == len(parts)-1) + if !exists { + return false + } + // If this is the last part, the field exists + if i == len(parts)-1 { + return true + } + // Try to descend into sub-properties (for nested/object fields) + fieldMap, ok := val.(*types.ObjectProperty) + if !ok { + return false + } + // fmt.Println("fieldMap", fieldMap) + subProps := fieldMap.Properties + current = subProps + } + return false +} + +type checkMappingStep struct { + datastreamname string + indexName *regexp.Regexp + checkFn func(mappings types.TypeMapping) error +} + +func (c checkMappingStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { + t.Logf("------ check data stream mappings in %s ------", e.currentVersion()) + t.Logf("checking data stream mapping on %s", c.datastreamname) + err := c.checkDataStreamMappings(t, ctx, e.esc.TypedClient()) + if err != nil { + t.Log(err) + t.Fail() + } + t.Log("success") +} + +func (c checkMappingStep) checkDataStreamMappings(t *testing.T, ctx context.Context, es *elasticsearch.TypedClient) error { + dataStreamName := c.datastreamname + + info, err := es.Indices.GetDataStream().Name(dataStreamName).Do(ctx) + if err != nil { + return fmt.Errorf("Error getting data stream info: %s", err) + } + is := []string{} + for _, v := range info.DataStreams[0].Indices { + is = append(is, v.IndexName) + } + t.Logf("indices: %s", is) + + var backingIndex string + for _, v := range info.DataStreams[0].Indices { + if c.indexName.Match([]byte(v.IndexName)) { + backingIndex = v.IndexName + } + } + if backingIndex == "" { + // collect available indices for easier troubleshooting + indices := []string{} + for _, v := range info.DataStreams[0].Indices { + indices = append(indices, v.IndexName) + } + return errors.New(fmt.Sprintf("no index matches the filter regexp; filter=%s indices=%s", c.indexName, indices)) + } + + mappingRes, err := es.Indices.GetMapping().Index(backingIndex).Do(ctx) + if err != nil { + return fmt.Errorf("Error getting mapping: %w", err) + } + + indexMappingRecord, ok := mappingRes[backingIndex] + if !ok { + return errors.New(fmt.Sprintf("Backing index %s not found in mapping", backingIndex)) + } + + if err := c.checkFn(indexMappingRecord.Mappings); err != nil { + return fmt.Errorf("mapping check failed: %w", err) + } + + return nil +} + +type checkFieldExistsInDocsStep struct { + dataStreamName string + fieldName string + // checkFn can be used to customize the check this step performs on the returned + // doc count. By default it checks if the vlaue is greater than 0. + checkFn func(i int64) bool +} + +func (s checkFieldExistsInDocsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { + t.Logf("------ check data stream docs with field %s in %s ------", s.fieldName, e.currentVersion()) + count, err := getDocCountWithField(ctx, e.esc.TypedClient(), s.dataStreamName, s.fieldName) + if err != nil { + t.Log(err) + t.Fail() + } + t.Logf("documents in data stream %s with '%s': %d\n", s.dataStreamName, s.fieldName, count) + + if s.checkFn == nil { + s.checkFn = func(i int64) bool { return i > 0 } + } + + if !s.checkFn(count) { + t.Log("checkFieldExistsInDocsStep: check function failed") + t.Fail() + return + } + t.Log("check successful") +} + +func getDocCountWithField(ctx context.Context, c *elasticsearch.TypedClient, dataStreamName, fieldName string) (int64, error) { + req := &count.Request{ + Query: &types.Query{ + Exists: &types.ExistsQuery{ + Field: fieldName, + }, + }, + } + resp, err := c.Core.Count(). + Index(dataStreamName). + Request(req). + Do(ctx) + if err != nil { + return 0, fmt.Errorf("cannot retrieve doc count: %w", err) + } + return resp.Count, nil +} diff --git a/integrationservertest/go.mod b/integrationservertest/go.mod index 8ed576a14b9..fa772f918af 100644 --- a/integrationservertest/go.mod +++ b/integrationservertest/go.mod @@ -19,6 +19,7 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/analysis v0.21.2 // indirect diff --git a/integrationservertest/go.sum b/integrationservertest/go.sum index 507e74ed6d3..4acd6d71d6e 100644 --- a/integrationservertest/go.sum +++ b/integrationservertest/go.sum @@ -40,6 +40,8 @@ github.com/elastic/cloud-sdk-go v1.23.0 h1:IH41W2E+GIlHjuPtkNPbpYoACAJ8ffk3QiyDj github.com/elastic/cloud-sdk-go v1.23.0/go.mod h1:k0ZebhZKX22l6Ysl5Zbpc8VLF54hfwDtHppEEEVUJ04= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031 h1:6zBFaT5Klj/0zHi/35wfTUhy1lgvf/tC9mStYRW9dYA= +github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031/go.mod h1:6DwhqJIaaVnN8AOAfAG0BRG3EsxoQ4ai/NAOxK20sJE= github.com/elastic/go-elasticsearch/v8 v8.16.0 h1:f7bR+iBz8GTAVhwyFO3hm4ixsz2eMaEy0QroYnXV3jE= github.com/elastic/go-elasticsearch/v8 v8.16.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= diff --git a/integrationservertest/internal/ech/version.go b/integrationservertest/internal/ech/version.go index 3d138271875..b6458aabc70 100644 --- a/integrationservertest/internal/ech/version.go +++ b/integrationservertest/internal/ech/version.go @@ -190,6 +190,10 @@ func (v Version) String() string { return fmt.Sprintf("%d.%d.%d%s", v.Major, v.Minor, v.Patch, suffix) } +func (v Version) MajorMinorPatch() string { + return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch) +} + func (v Version) MajorMinor() string { return fmt.Sprintf("%d.%d", v.Major, v.Minor) } diff --git a/integrationservertest/internal/elasticsearch/client.go b/integrationservertest/internal/elasticsearch/client.go index e308166c7ad..e45fac43989 100644 --- a/integrationservertest/internal/elasticsearch/client.go +++ b/integrationservertest/internal/elasticsearch/client.go @@ -79,6 +79,10 @@ func formatDurationElasticsearch(d time.Duration) string { return fmt.Sprintf("%dnanos", d) } +func (c *Client) TypedClient() *elasticsearch.TypedClient { + return c.es +} + // CreateAPIKey creates an API Key, and returns it in the base64-encoded form // that agents should provide. // diff --git a/integrationservertest/internal/terraform/terraform.go b/integrationservertest/internal/terraform/terraform.go index 29b07b2ed53..aa2e082439e 100644 --- a/integrationservertest/internal/terraform/terraform.go +++ b/integrationservertest/internal/terraform/terraform.go @@ -89,6 +89,16 @@ func (t *Runner) Destroy(ctx context.Context, vars ...tfexec.DestroyOption) erro } func (t *Runner) Output(name string, res any) error { + // try to load outputs from the terraform state if + // our cache is empty + if len(t.outputs) == 0 { + outputs, err := t.tf.Output(context.Background()) + // only load if output command succeeded + if err == nil { + t.outputs = outputs + } + } + o, ok := t.outputs[name] if !ok { return fmt.Errorf("output named %s not found", name) diff --git a/integrationservertest/main_test.go b/integrationservertest/main_test.go index bcf0e443906..465377b93be 100644 --- a/integrationservertest/main_test.go +++ b/integrationservertest/main_test.go @@ -28,6 +28,12 @@ import ( ) var ( + doCleanup = flag.Bool( + "cleanup", + true, + "Whether to cleanup terraform resources or not.", + ) + // cleanupOnFailure determines whether the created resources should be cleaned up on test failure. cleanupOnFailure = flag.Bool( "cleanup-on-failure", diff --git a/integrationservertest/setup.go b/integrationservertest/setup.go index f7a2a454136..8a4e03ef352 100644 --- a/integrationservertest/setup.go +++ b/integrationservertest/setup.go @@ -175,15 +175,28 @@ func createCluster( name := terraform.Var("name", deployName) require.NoError(t, tf.Apply(ctx, ecTarget, ecRegion, ecDeploymentTpl, ver, integrations, name)) - t.Cleanup(func() { - if !t.Failed() || cleanupOnFailure { - t.Log("cleanup terraform resources") - require.NoError(t, tf.Destroy(ctx, ecTarget, ecRegion, ecDeploymentTpl, name, ver)) - } else { - t.Log("test failed and cleanup-on-failure is false, skipping cleanup") - } - }) + if *doCleanup { + t.Cleanup(func() { + if !t.Failed() || cleanupOnFailure { + t.Log("cleanup terraform resources") + require.NoError(t, tf.Destroy(ctx, ecTarget, ecRegion, ecDeploymentTpl, name, ver)) + } else { + t.Log("test failed and cleanup-on-failure is false, skipping cleanup") + } + }) + } + + deploymentID, apmID, info := getDeploymentInfo(t, tf, deployName) + + standaloneOrManaged := "standalone" + if enableIntegrations { + standaloneOrManaged = "managed" + } + t.Logf("created deployment %s (%s) with %s APM (%s)", deployName, deploymentID, standaloneOrManaged, apmID) + return info +} +func getDeploymentInfo(t *testing.T, tf *terraform.Runner, deployName string) (string, string, deploymentInfo) { var deploymentID string require.NoError(t, tf.Output("deployment_id", &deploymentID)) var apmID string @@ -195,12 +208,7 @@ func createCluster( require.NoError(t, tf.Output("password", &info.Password)) require.NoError(t, tf.Output("kb_url", &info.KibanaURL)) - standaloneOrManaged := "standalone" - if enableIntegrations { - standaloneOrManaged = "managed" - } - t.Logf("created deployment %s (%s) with %s APM (%s)", deployName, deploymentID, standaloneOrManaged, apmID) - return info + return deploymentID, apmID, info } // upgradeCluster applies the terraform configuration from the test terraform folder. diff --git a/integrationservertest/upgrade-config.yaml b/integrationservertest/upgrade-config.yaml index 1704103e774..59445848d82 100644 --- a/integrationservertest/upgrade-config.yaml +++ b/integrationservertest/upgrade-config.yaml @@ -4,15 +4,63 @@ data-stream-lifecycle: 8.15: DSL 8.16: DSL -# Define versions that have lazy rollover, i.e. when upgrading from some older -# version to the specified minor version, there will be lazy rollover. -# Exceptions are specified as a list, i.e. no lazy rollover if upgrade is from -# that minor version. +# Define versions exceptions to lazy rollovers. +# +# All versions listed are expected to have a lazy rollover. An exception may be +# needed, to specify that upgrading from a previous version does not trigger +# a lazy rollover. In this case that version must be specified as a entry in the +# minor or patch release key. +# Keys can be in the form of major.minor or major.minor.patch. +# Values can be in the form of major.minor or major.minor.patch. +# +# Example: upgrading to 8.18 will trigger a lazy rollover from any previous +# version. +# ``` +# 8.18: +# ``` +# +# Example: upgrading to 8.18 will trigger a lazy rollover from all previous +# versions except any 8.17 ones (all patches included).: +# ``` +# 8.18: +# - 8.17 +# ``` +# +# Example: upgrading to 8.18.1 will trigger a lazy rollover from any previous +# version. +# ``` +# 8.18.1: +# ``` +# +# Example: upgrading to 8.18 will trigger a lazy rollover from all previous +# versions except 8.17.8. +# ``` +# 8.18: +# - 8.17.8 +# ``` +# +# NOTE: If a version is not in this list it is **not** expected to rollover. +# NOTE: quote major.minor when used as values or they'll be floats lazy-rollover-with-exceptions: 8.16: + # No rollover expected when upgrading from any 8.17 to other 8.17. Exceptions are listed as top level keys. + - "8.16" 8.17: - # 8.19 and 9.1 have the same template due to - # https://github.com/elastic/elasticsearch/pull/123166. + # No rollover expected when upgrading from any 8.17 to other 8.17. Exceptions are listed as top level keys. + - "8.17" + # 8.17.8 forces a rollover as we updated the elasticsearch resource version number. + 8.17.8: + 8.18: + # No rollover expected when upgrading from any 8.18 to other 8.18. Exceptions are listed as top level keys. + - "8.18" + # 8.18.3 forces a rollover as we updated the elasticsearch resource version number. + 8.18.3: + # 8.17.8 and 8.18.3 share the same templates. + - 8.17.8 8.19: + # No rollover expected when upgrading from any 8.19 to other 8.19. Exceptions are listed as top level keys. + - "8.19" 9.1: - - "8.19" \ No newline at end of file + # 8.19 and 9.1 have the same template due to + # https://github.com/elastic/elasticsearch/pull/123166. + - "8.19" diff --git a/integrationservertest/upgrade_test.go b/integrationservertest/upgrade_test.go index ef3972af895..3afaf2820ba 100644 --- a/integrationservertest/upgrade_test.go +++ b/integrationservertest/upgrade_test.go @@ -121,7 +121,10 @@ func buildTestSteps(t *testing.T, versions ech.Versions, config upgradeTestConfi prev := versions[i-1] oldIndicesManagedBy := slices.Clone(indicesManagedBy) if config.HasLazyRollover(prev, ver) { + t.Logf("we will expect a lazy rollover happening between %s and %s", prev, ver) indicesManagedBy = append(indicesManagedBy, lifecycle) + } else { + t.Logf("**no** lazy rollover expected between %s and %s", prev, ver) } steps = append(steps, upgradeStep{ @@ -203,15 +206,28 @@ func (cfg upgradeTestConfig) ExpectedLifecycle(version ech.Version) string { // HasLazyRollover checks if the upgrade path is expected to have lazy rollover. func (cfg upgradeTestConfig) HasLazyRollover(from, to ech.Version) bool { - exceptions, ok := cfg.LazyRolloverWithExceptions[to.MajorMinor()] + // We first check if there is a config entry for the full version under test. + exceptions, ok := cfg.LazyRolloverWithExceptions[to.MajorMinorPatch()] + // If there is not, we check if there is an entry for the minor. + if !ok { + exceptions, ok = cfg.LazyRolloverWithExceptions[to.MajorMinor()] + } + // At this point if we didn't find anything there is no rollover expected. if !ok { return false } + // Otherwise we match the exception list items with from major.minor.patch + // and, if not present, major.minor values to signal no rollovers expected. for _, exception := range exceptions { + if strings.EqualFold(from.MajorMinorPatch(), exception) { + return false + } if strings.EqualFold(from.MajorMinor(), exception) { return false } } + // At this point we checked all keys and related fields a we know a rollover + // is expected. return true }