From 7ba245a663c0a94358568e38b47b770fd70016b2 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 2 Oct 2024 18:46:42 +0200 Subject: [PATCH] Add missing context to api ingest pipelines (#2137) --- .../benchrunner/runners/pipeline/runner.go | 2 +- internal/benchrunner/runners/rally/runner.go | 2 +- internal/benchrunner/runners/system/runner.go | 2 +- internal/elasticsearch/ingest/datastream.go | 30 +++++++++++-------- .../testrunner/runners/pipeline/tester.go | 4 +-- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/internal/benchrunner/runners/pipeline/runner.go b/internal/benchrunner/runners/pipeline/runner.go index c43c47e05..17b6532d9 100644 --- a/internal/benchrunner/runners/pipeline/runner.go +++ b/internal/benchrunner/runners/pipeline/runner.go @@ -48,7 +48,7 @@ func (r *runner) SetUp(ctx context.Context) error { return errors.New("data stream root not found") } - r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.options.API, dataStreamPath) + r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.options.API, dataStreamPath) if err != nil { return fmt.Errorf("installing ingest pipelines failed: %w", err) } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index e4b9e3799..95744a99a 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -956,7 +956,7 @@ func (r *runner) reindexData(ctx context.Context) error { logger.Debug("starting reindexing of data...") - logger.Debug("getting orignal mappings...") + logger.Debug("getting original mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( r.options.ESAPI.Indices.GetMapping.WithContext(ctx), diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 887fbcc4d..9f33cb45f 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -748,7 +748,7 @@ func (r *runner) reindexData(ctx context.Context) error { logger.Debug("starting reindexing of data...") - logger.Debug("getting orignal mappings...") + logger.Debug("getting original mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( r.options.ESAPI.Indices.GetMapping.WithContext(ctx), diff --git a/internal/elasticsearch/ingest/datastream.go b/internal/elasticsearch/ingest/datastream.go index 213e0aa70..6d4848446 100644 --- a/internal/elasticsearch/ingest/datastream.go +++ b/internal/elasticsearch/ingest/datastream.go @@ -6,6 +6,7 @@ package ingest import ( "bytes" + "context" "fmt" "io" "net/http" @@ -45,7 +46,7 @@ type RerouteProcessor struct { Namespace []string `yaml:"namespace"` } -func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) { +func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) { dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile)) if err != nil { return "", nil, fmt.Errorf("reading data stream manifest failed: %w", err) @@ -59,7 +60,7 @@ func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) ( return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err) } - err = installPipelinesInElasticsearch(api, pipelines) + err = installPipelinesInElasticsearch(ctx, api, pipelines) if err != nil { return "", nil, err } @@ -232,9 +233,9 @@ func convertValue(value interface{}, label string) ([]string, error) { } } -func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []Pipeline) error { +func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { - if err := installPipeline(api, p); err != nil { + if err := installPipeline(ctx, api, p); err != nil { return err } } @@ -251,20 +252,22 @@ func pipelineError(err error, pipeline Pipeline, format string, args ...interfac return fmt.Errorf("%s: %w", errorStr, err) } -func installPipeline(api *elasticsearch.API, pipeline Pipeline) error { - if err := putIngestPipeline(api, pipeline); err != nil { +func installPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { + if err := putIngestPipeline(ctx, api, pipeline); err != nil { return err } // Just to be sure the pipeline has been uploaded. - return getIngestPipeline(api, pipeline) + return getIngestPipeline(ctx, api, pipeline) } -func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error { +func putIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { source, err := pipeline.MarshalJSON() if err != nil { return err } - r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source)) + r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source), + api.Ingest.PutPipeline.WithContext(ctx), + ) if err != nil { return pipelineError(err, pipeline, "PutPipeline API call failed") } @@ -283,10 +286,11 @@ func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error { return nil } -func getIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error { - r, err := api.Ingest.GetPipeline(func(request *elasticsearch.IngestGetPipelineRequest) { - request.PipelineID = pipeline.Name - }) +func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error { + r, err := api.Ingest.GetPipeline( + api.Ingest.GetPipeline.WithContext(ctx), + api.Ingest.GetPipeline.WithPipelineID(pipeline.Name), + ) if err != nil { return pipelineError(err, pipeline, "GetPipeline API call failed") } diff --git a/internal/testrunner/runners/pipeline/tester.go b/internal/testrunner/runners/pipeline/tester.go index 6789cec5d..ad6f53b70 100644 --- a/internal/testrunner/runners/pipeline/tester.go +++ b/internal/testrunner/runners/pipeline/tester.go @@ -168,7 +168,7 @@ func (r *tester) run(ctx context.Context) ([]testrunner.TestResult, error) { startTesting := time.Now() var entryPipeline string - entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.esAPI, dataStreamPath) + entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.esAPI, dataStreamPath) if err != nil { return nil, fmt.Errorf("installing ingest pipelines failed: %w", err) } @@ -278,7 +278,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T return nil }) - if err != nil { return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err) } @@ -297,7 +296,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T } return []testrunner.TestResult{tr}, nil - } func (r *tester) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) ([]testrunner.TestResult, error) {