-
Notifications
You must be signed in to change notification settings - Fork 536
add integration server tests for resources version checks #17312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
91754fb
1eae9bb
07b3644
cf94108
6252267
af20d51
b09f302
8efcfac
0b64a86
53bd820
9874d2d
e068415
bf1abc5
cf7f10f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package integrationservertest | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"regexp" | ||
"testing" | ||
|
||
"github.com/elastic/apm-server/integrationservertest/internal/ech" | ||
"github.com/elastic/go-elasticsearch/v8/typedapi/types" | ||
) | ||
|
||
// See https://github.com/elastic/ingest-dev/issues/5701 | ||
// Available tests: | ||
// TestAPMResourcesVersionBug/8.16.2_to_8.17.8 | ||
// TestAPMResourcesVersionBug/8.17.7_to_8.17.8 | ||
// TestAPMResourcesVersionBug/8.17.8_to_8.18.3 | ||
// TestAPMResourcesVersionBug/8.18.2_to_8.18.3 | ||
// TestAPMResourcesVersionBug/8.18.3_to_8.19.0 | ||
func TestAPMResourcesVersionBug(t *testing.T) { | ||
config, err := parseConfig("upgrade-config.yaml") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
zeroEventIngestedDocs := checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
checkFn: func(i int64) bool { return i == 0 }, | ||
} | ||
noEventIngestedInMappings := checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000001"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if !hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return errors.New("there should be no event.ingested here") | ||
}, | ||
} | ||
someEventIngestedDocs := checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
} | ||
|
||
// this wraps the usual steps build to add additional checks in between. | ||
// Expect one upgrade, no ingested docs before, some after. | ||
oneUpgradeZeroThenSome := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep { | ||
steps := buildTestSteps(t, versions, config, false) | ||
return []testStep{ | ||
steps[0], // create | ||
steps[1], // ingest | ||
zeroEventIngestedDocs, | ||
noEventIngestedInMappings, | ||
steps[2], // upgrade | ||
steps[3], // ingest | ||
someEventIngestedDocs, | ||
checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return fmt.Errorf("there should be an event.ingested here") | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// this wraps the usual steps build to add additional checks in between. | ||
// Expect two upgrades, some ingested docs after the first upgrade, some after the second. | ||
twoUpgradesSomeThenSome := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep { | ||
steps := buildTestSteps(t, versions, config, false) | ||
howmany := int64(0) | ||
return []testStep{ | ||
steps[0], // create | ||
steps[1], // ingest | ||
steps[2], // upgrade | ||
steps[3], // ingest | ||
checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
checkFn: func(i int64) bool { | ||
if i == 0 { | ||
return false | ||
} | ||
// Store howmany to cross check it's higher after next upgrade/ingest cycle. | ||
howmany = i | ||
return true | ||
}, | ||
}, | ||
checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return fmt.Errorf("there should be an event.ingested here") | ||
}, | ||
}, | ||
steps[4], // upgrade | ||
steps[5], // upgrade | ||
checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
checkFn: func(i int64) bool { | ||
if i == 0 { | ||
return false | ||
} | ||
if i <= howmany { | ||
return false | ||
} | ||
return true | ||
}, | ||
}, | ||
checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000003"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return fmt.Errorf("there should be an event.ingested here") | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
version8162 := vsCache.GetLatestVersionOrSkip(t, "8.16.2") | ||
version8177 := vsCache.GetLatestVersionOrSkip(t, "8.17.7") | ||
version8178 := vsCache.GetLatestBCOrSkip(t, "8.17.8") // latest 8.17 as of today | ||
version8182 := vsCache.GetLatestVersionOrSkip(t, "8.18.2") | ||
version8183 := vsCache.GetLatestBCOrSkip(t, "8.18.3") // latest 8.18 as of today | ||
// version8190 := vsCache.GetLatestBCOrSkip(t, "8.19.0") // latest 8.19 as of today | ||
// the event.ingested change was introduced in 8.16.3, as | ||
// it only shows if we upgrade a cluster from a version with | ||
// the bug we need to start from 8.16.2. | ||
start := version8162 | ||
|
||
t.Run("8.16.2 to 8.17.8", func(t *testing.T) { | ||
versions := []ech.Version{start, version8177} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: oneUpgradeZeroThenSome(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.17.7 to 8.17.8", func(t *testing.T) { | ||
versions := []ech.Version{start, version8177, version8178} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: twoUpgradesSomeThenSome(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.17.8 to 8.18.3", func(t *testing.T) { | ||
versions := []ech.Version{version8178, version8183} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: buildTestSteps(t, versions, config, false), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.18.2 to 8.18.3", func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this includes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I named this after our test cases for this bug. I don't think we want to merge this PR as it is, with all these test cases for all these versions. This is now useful to ensure our fixes are working but I would make it more general if we want to include it as our regular testing. I think a necessary test case to add is an upgrade test: from the "previous previous" minor (ie in this case 8.16.2) to the previous minor and than to the current minor. Such a test could find issues that only appears with an upgrade, a scenario we don't yet cover. |
||
versions := []ech.Version{start, version8182, version8183} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: twoUpgradesSomeThenSome(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.18.3 to 8.19.0", func(t *testing.T) { | ||
// FIXME: use 8.19.0 BC when it becomes available. | ||
start := vsCache.GetLatestSnapshot(t, "8.16.2") | ||
version8183 := vsCache.GetLatestSnapshot(t, "8.18.3") | ||
version8190 := vsCache.GetLatestSnapshot(t, "8.19.0") | ||
versions := []ech.Version{start, version8183, version8190} | ||
steps := twoUpgradesSomeThenSome(t, versions, config) | ||
steps = append(steps, checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.success_count", | ||
}) | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: steps, | ||
} | ||
runner.Run(t) | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,152 @@ | ||||||
package integrationservertest | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"regexp" | ||||||
"strings" | ||||||
"testing" | ||||||
|
||||||
"github.com/elastic/go-elasticsearch/v8" | ||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/count" | ||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/types" | ||||||
) | ||||||
|
||||||
// hasNestedField checks if a nested field (dot-separated path) exists in the properties map. | ||||||
func hasNestedField(mappings types.TypeMapping, fieldPath string) bool { | ||||||
parts := strings.Split(fieldPath, ".") | ||||||
current := mappings.Properties | ||||||
// fmt.Println("parts", parts) | ||||||
// fmt.Println("curre", current) | ||||||
for i, part := range parts { | ||||||
// Check if the part exists in the current properties map | ||||||
val, exists := current[part] | ||||||
// fmt.Println("val", val, "parts", i == len(parts)-1) | ||||||
if !exists { | ||||||
return false | ||||||
} | ||||||
// If this is the last part, the field exists | ||||||
if i == len(parts)-1 { | ||||||
return true | ||||||
} | ||||||
// Try to descend into sub-properties (for nested/object fields) | ||||||
fieldMap, ok := val.(*types.ObjectProperty) | ||||||
if !ok { | ||||||
return false | ||||||
} | ||||||
// fmt.Println("fieldMap", fieldMap) | ||||||
subProps := fieldMap.Properties | ||||||
current = subProps | ||||||
} | ||||||
return false | ||||||
} | ||||||
|
||||||
type checkMappingStep struct { | ||||||
datastreamname string | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
indexName *regexp.Regexp | ||||||
checkFn func(mappings types.TypeMapping) error | ||||||
} | ||||||
|
||||||
func (c checkMappingStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { | ||||||
t.Logf("------ check data stream mappings in %s ------", e.currentVersion()) | ||||||
t.Logf("checking data stream mapping on %s", c.datastreamname) | ||||||
err := c.checkDataStreamMappings(t, ctx, e.esc.TypedClient()) | ||||||
if err != nil { | ||||||
t.Log(err) | ||||||
t.Fail() | ||||||
} | ||||||
t.Log("success") | ||||||
} | ||||||
|
||||||
func (c checkMappingStep) checkDataStreamMappings(t *testing.T, ctx context.Context, es *elasticsearch.TypedClient) error { | ||||||
dataStreamName := c.datastreamname | ||||||
|
||||||
info, err := es.Indices.GetDataStream().Name(dataStreamName).Do(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("Error getting data stream info: %s", err) | ||||||
} | ||||||
is := []string{} | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
is = append(is, v.IndexName) | ||||||
} | ||||||
t.Logf("indices: %s", is) | ||||||
|
||||||
var backingIndex string | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
if c.indexName.Match([]byte(v.IndexName)) { | ||||||
backingIndex = v.IndexName | ||||||
} | ||||||
} | ||||||
if backingIndex == "" { | ||||||
// collect available indices for easier troubleshooting | ||||||
indices := []string{} | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
indices = append(indices, v.IndexName) | ||||||
} | ||||||
return errors.New(fmt.Sprintf("no index matches the filter regexp; filter=%s indices=%s", c.indexName, indices)) | ||||||
} | ||||||
|
||||||
mappingRes, err := es.Indices.GetMapping().Index(backingIndex).Do(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("Error getting mapping: %w", err) | ||||||
} | ||||||
|
||||||
indexMappingRecord, ok := mappingRes[backingIndex] | ||||||
if !ok { | ||||||
return errors.New(fmt.Sprintf("Backing index %s not found in mapping", backingIndex)) | ||||||
} | ||||||
|
||||||
if err := c.checkFn(indexMappingRecord.Mappings); err != nil { | ||||||
return fmt.Errorf("mapping check failed: %w", err) | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
type checkFieldExistsInDocsStep struct { | ||||||
dataStreamName string | ||||||
fieldName string | ||||||
// checkFn can be used to customize the check this step performs on the returned | ||||||
// doc count. By default it checks if the vlaue is greater than 0. | ||||||
checkFn func(i int64) bool | ||||||
} | ||||||
|
||||||
func (s checkFieldExistsInDocsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { | ||||||
t.Logf("------ check data stream docs with field %s in %s ------", s.fieldName, e.currentVersion()) | ||||||
count, err := getDocCountWithField(ctx, e.esc.TypedClient(), s.dataStreamName, s.fieldName) | ||||||
if err != nil { | ||||||
t.Log(err) | ||||||
t.Fail() | ||||||
} | ||||||
t.Logf("documents in data stream %s with '%s': %d\n", s.dataStreamName, s.fieldName, count) | ||||||
|
||||||
if s.checkFn == nil { | ||||||
s.checkFn = func(i int64) bool { return i > 0 } | ||||||
} | ||||||
|
||||||
if !s.checkFn(count) { | ||||||
t.Log("checkFieldExistsInDocsStep: check function failed") | ||||||
t.Fail() | ||||||
return | ||||||
} | ||||||
t.Log("check successful") | ||||||
} | ||||||
|
||||||
func getDocCountWithField(ctx context.Context, c *elasticsearch.TypedClient, dataStreamName, fieldName string) (int64, error) { | ||||||
req := &count.Request{ | ||||||
Query: &types.Query{ | ||||||
Exists: &types.ExistsQuery{ | ||||||
Field: fieldName, | ||||||
}, | ||||||
}, | ||||||
} | ||||||
resp, err := c.Core.Count(). | ||||||
Index(dataStreamName). | ||||||
Request(req). | ||||||
Do(ctx) | ||||||
if err != nil { | ||||||
return 0, fmt.Errorf("cannot retrieve doc count: %w", err) | ||||||
} | ||||||
return resp.Count, nil | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will skip the whole test entirely if the BC does not exist. Since both
GetLatestVersionOrSkip
andGetLatestBCOrSkip
are not used currently, you can update them to achieve what you need instead.