diff --git a/plugin.go b/plugin.go index f9e2c5c..c0326a8 100644 --- a/plugin.go +++ b/plugin.go @@ -34,6 +34,7 @@ const ( pipelines string = "pipelines" pipelinesRegisterConcurrencyLimit = 10 + pipelinesDestroyConcurrencyLimit = 10 // v2.7 and newer config key cfgKey string = "config" diff --git a/rpc.go b/rpc.go index 2175b81..86fb8a8 100644 --- a/rpc.go +++ b/rpc.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "golang.org/x/sync/errgroup" "time" jobsProto "github.com/roadrunner-server/api/v4/build/jobs/v1" @@ -137,21 +138,37 @@ func (r *rpc) Declare(req *jobsProto.DeclareRequest, _ *jobsProto.Empty) error { func (r *rpc) Destroy(req *jobsProto.Pipelines, resp *jobsProto.Pipelines) error { const op = errors.Op("rpc_destroy_pipeline") + eg := &errgroup.Group{} + eg.SetLimit(pipelinesDestroyConcurrencyLimit) + var destroyed []string //nolint:prealloc + for i := 0; i < len(req.GetPipelines()); i++ { - ctx, span := r.p.tracer.Tracer(spanName).Start(context.Background(), "destroy_pipeline", trace.WithSpanKind(trace.SpanKindServer)) - err := r.p.Destroy(ctx, req.GetPipelines()[i]) + ii := i + eg.Go(func() error { + ctx, span := r.p.tracer.Tracer(spanName).Start(context.Background(), "destroy_pipeline", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + err := r.p.Destroy(ctx, req.GetPipelines()[ii]) - if err != nil { - span.SetAttributes(attribute.KeyValue{ - Key: "error", - Value: attribute.StringValue(err.Error()), - }) - span.End() - return errors.E(op, err) - } - destroyed = append(destroyed, req.GetPipelines()[i]) - span.End() + if err != nil { + span.SetAttributes(attribute.KeyValue{ + Key: "error", + Value: attribute.StringValue(err.Error()), + }) + + return errors.E(op, err) + } + + destroyed = append(destroyed, req.GetPipelines()[ii]) + + return nil + }) + } + + err := eg.Wait() + + if err != nil { + return err } // return destroyed pipelines