-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
System tests for transforms #2242
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
"fmt" | ||
"net/http" | ||
"os" | ||
"path" | ||
"path/filepath" | ||
"regexp" | ||
"slices" | ||
|
@@ -1896,6 +1897,12 @@ func (r *tester) checkTransforms(ctx context.Context, config *testConfig, pkgMan | |
return fmt.Errorf("no documents found in preview for transform %q", transformId) | ||
} | ||
|
||
// Check that there is no problem running the actual transform. | ||
err = r.checkRunningTransformHealth(ctx, transformId) | ||
if err != nil { | ||
return fmt.Errorf("there are issues with installed transform %q: %w", transformId, err) | ||
} | ||
|
||
transformRootPath := filepath.Dir(transform.Path) | ||
fieldsValidator, err := fields.CreateValidatorForDirectory(transformRootPath, | ||
fields.WithSpecVersion(pkgManifest.SpecVersion), | ||
|
@@ -1978,6 +1985,252 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co | |
return preview.Documents, nil | ||
} | ||
|
||
/* XXX: unused code now, commented out for the linter till we investigate the issues with reset | ||
|
||
func (r *tester) resetTransform(ctx context.Context, transformId string) error { | ||
resp, err := r.esAPI.TransformResetTransform(transformId, | ||
r.esAPI.TransformResetTransform.WithContext(ctx), | ||
r.esAPI.TransformResetTransform.WithForce(true), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.IsError() { | ||
return fmt.Errorf("failed to reset transform %q: %s", transformId, resp.String()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *tester) startTransform(ctx context.Context, transformId string) error { | ||
resp, err := r.esAPI.TransformStartTransform(transformId, | ||
r.esAPI.TransformStartTransform.WithContext(ctx), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.IsError() { | ||
return fmt.Errorf("failed to start transform %q: %s", transformId, resp.String()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
*/ | ||
|
||
func (r *tester) scheduleTransform(ctx context.Context, transformId string) error { | ||
resp, err := r.esAPI.TransformScheduleNowTransform(transformId, | ||
r.esAPI.TransformScheduleNowTransform.WithContext(ctx), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.IsError() { | ||
return fmt.Errorf("failed to schedule transform %q: %s", transformId, resp.String()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type transformStats struct { | ||
Checkpointing struct { | ||
Last struct { | ||
Checkpoint int `json:"checkpoint"` | ||
} `json:"last"` | ||
Next struct { | ||
Checkpoint int `json:"checkpoint"` | ||
} `json:"next"` | ||
LastSearchTime int `json:"last_search_time"` | ||
} `json:"checkpointing"` | ||
Health transformHealth `json:"health"` | ||
Reason string `json:"reason"` | ||
State string `json:"state"` | ||
} | ||
|
||
type transformHealth struct { | ||
Status string `json:"status"` | ||
Issues []struct { | ||
Issue string `json:"issue"` | ||
Details string `json:"details"` | ||
Count int `json:"count"` | ||
FirstOccurrence time.Time `json:"first_occurrence"` | ||
} `json:"issues"` | ||
} | ||
|
||
func (r *tester) getTransformStats(ctx context.Context, transformId string) (*transformStats, error) { | ||
resp, err := r.esAPI.TransformGetTransformStats(transformId, | ||
r.esAPI.TransformGetTransformStats.WithContext(ctx), | ||
r.esAPI.TransformGetTransformStats.WithAllowNoMatch(false), | ||
) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.IsError() { | ||
return nil, fmt.Errorf("failed to get transform stats for %q: %s", transformId, resp.String()) | ||
} | ||
|
||
var response struct { | ||
Transforms []transformStats `json:"transforms"` | ||
} | ||
err = json.NewDecoder(resp.Body).Decode(&response) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to decode response: %w", err) | ||
} | ||
if len(response.Transforms) != 1 { | ||
return nil, fmt.Errorf("stats for %d transforms received when requesting only for %s", len(response.Transforms), transformId) | ||
} | ||
return &response.Transforms[0], nil | ||
} | ||
|
||
func (r *tester) checkTransformAuditMessages(ctx context.Context, transformId string) error { | ||
// XXX: This is an internal API, are these audit messages available somewhere else? | ||
const internalTransformsPath = "/internal/transform/transforms" | ||
messagesPath := path.Join(internalTransformsPath, transformId, "messages") | ||
query := "?sortField=timestamp&sortDirection=desc" // Required | ||
statusCode, body, err := r.kibanaClient.SendRequest(ctx, http.MethodGet, messagesPath+query, nil) | ||
if err != nil { | ||
return fmt.Errorf("could not get transform audit messages: %w", err) | ||
} | ||
if statusCode >= 400 { | ||
return fmt.Errorf("could not get transform audit messages: status code %d, body: %s", statusCode, body) | ||
} | ||
|
||
var resp struct { | ||
Messages []struct { | ||
TransformID string `json:"transform_id"` | ||
Message string `json:"message"` | ||
Level string `json:"level"` | ||
Timestamp int `json:"timestamp"` | ||
NodeName string `json:"node_name"` | ||
} `json:"messages"` | ||
} | ||
err = json.Unmarshal(body, &resp) | ||
if err != nil { | ||
return fmt.Errorf("could not decode response: %w", err) | ||
} | ||
|
||
for _, message := range resp.Messages { | ||
if message.Level == "error" { | ||
return fmt.Errorf("failure found in transform: %s", message.Message) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// checkRunningTransformHealth checks the following for a given transform: | ||
// - That it is started. | ||
// - That it can execute at least once during the check. | ||
// - That it hasn't generated any error message. | ||
// - That it is healthy after executing at least once. | ||
func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId string) error { | ||
const ( | ||
period = 1 * time.Second | ||
timeout = 60 * time.Second | ||
) | ||
lastSearchTime := 0 | ||
last := -1 | ||
running := false | ||
|
||
// Reset transform to clean any previous state. | ||
/* XXX: It fails to create the index after reset :? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is failing now because it tries to create indexes with a name that is matched by a data stream template, and transforms need to use indexes, not data streams. |
||
err := r.resetTransform(ctx, transformId) | ||
if err != nil { | ||
return fmt.Errorf("failed to reset transform: %w", err) | ||
} | ||
err = r.startTransform(ctx, transformId) | ||
if err != nil { | ||
return fmt.Errorf("failed to start transform after reset: %w", err) | ||
} | ||
*/ | ||
ok, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { | ||
stats, err := r.getTransformStats(ctx, transformId) | ||
if err != nil { | ||
return false, err | ||
} | ||
if last < 0 { | ||
last = stats.Checkpointing.Last.Checkpoint | ||
lastSearchTime = stats.Checkpointing.LastSearchTime | ||
} | ||
logger.Debugf("transform %s state: %s, health: %s, checkpoint %d, last search %d", | ||
transformId, | ||
stats.State, stats.Health.Status, | ||
stats.Checkpointing.Last.Checkpoint, stats.Checkpointing.LastSearchTime) | ||
switch stats.State { | ||
case "failed": | ||
return false, fmt.Errorf("transform in failed state: %s", stats.Reason) | ||
case "aborting", "stopping", "stopped": | ||
return false, fmt.Errorf("transform unexpectedly %s", stats.State) | ||
case "indexing": | ||
// It is already running, wait till indexing finishes. | ||
running = true | ||
return false, nil | ||
case "started": | ||
if !running { | ||
logger.Debugf("scheduling transform %s now", transformId) | ||
err := r.scheduleTransform(ctx, transformId) | ||
if err != nil { | ||
return false, fmt.Errorf("failed to schedule transform: %w", err) | ||
} | ||
running = true | ||
return false, nil | ||
} | ||
default: | ||
return false, fmt.Errorf("unexpected transform state %q", stats.State) | ||
} | ||
|
||
if stats.Checkpointing.Last.Checkpoint <= last && stats.Checkpointing.LastSearchTime <= lastSearchTime { | ||
// There hasn't been any update yet, try again. | ||
return false, nil | ||
} | ||
|
||
// We need to check the audit messages in case a document is removed but caused issues. | ||
err = r.checkTransformAuditMessages(ctx, transformId) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
err = healthError(stats.Health) | ||
return err == nil, err | ||
}, period, timeout) | ||
if err != nil { | ||
return err | ||
} | ||
if !ok { | ||
return fmt.Errorf("could not confirm successful executions of transform %s", transformId) | ||
} | ||
return nil | ||
} | ||
|
||
func healthError(health transformHealth) error { | ||
if health.Status == "green" { | ||
return nil | ||
} | ||
|
||
var msg strings.Builder | ||
msg.WriteString("unexpected transform health status (" + health.Status + ")") | ||
|
||
if len(health.Issues) > 0 { | ||
msg.WriteString(": ") | ||
for i, issue := range health.Issues { | ||
msg.WriteString(issue.Issue + "(" + issue.Details + ")") | ||
if i+1 < len(health.Issues) { | ||
msg.WriteString(", ") | ||
} | ||
} | ||
} | ||
|
||
return errors.New(msg.String()) | ||
} | ||
|
||
func filterAgents(allAgents []kibana.Agent, svcInfo servicedeployer.ServiceInfo) []kibana.Agent { | ||
if svcInfo.Agent.Host.NamePrefix != "" { | ||
logger.Debugf("filter agents using criteria: NamePrefix=%s", svcInfo.Agent.Host.NamePrefix) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found that some failures only appear in the audit messages, but these messages seem to be available only on internal APIs, that I guess don't have compatibility guarantees, nor will be available in all deployments.
We have to look for some other API.
@kpollich do you know how safe these internal APIs are to use? Or do you know if there are alternatives?
Though the errors found here also make the transform to fail with real-time data, so maybe this is an issue with test data that is discarded by being old. Then we'd have to review the test data too.