Skip to content

Commit

Permalink
Parallel destroy of pipelines
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Plakhotnikov <[email protected]>
  • Loading branch information
Kaspiman committed Aug 8, 2023
1 parent d1b85ca commit 583a5ae
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
1 change: 1 addition & 0 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
pipelines string = "pipelines"

pipelinesRegisterConcurrencyLimit = 10
pipelinesDestroyConcurrencyLimit = 10

// v2.7 and newer config key
cfgKey string = "config"
Expand Down
41 changes: 29 additions & 12 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jobs

import (
"context"
"golang.org/x/sync/errgroup"

Check failure on line 5 in rpc.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

File is not `goimports`-ed (goimports)
"time"

jobsProto "github.com/roadrunner-server/api/v4/build/jobs/v1"
Expand Down Expand Up @@ -137,21 +138,37 @@ func (r *rpc) Declare(req *jobsProto.DeclareRequest, _ *jobsProto.Empty) error {
func (r *rpc) Destroy(req *jobsProto.Pipelines, resp *jobsProto.Pipelines) error {
const op = errors.Op("rpc_destroy_pipeline")

eg := &errgroup.Group{}
eg.SetLimit(pipelinesDestroyConcurrencyLimit)

var destroyed []string //nolint:prealloc

Check failure on line 144 in rpc.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

directive `//nolint:prealloc` is unused for linter "prealloc" (nolintlint)

for i := 0; i < len(req.GetPipelines()); i++ {
ctx, span := r.p.tracer.Tracer(spanName).Start(context.Background(), "destroy_pipeline", trace.WithSpanKind(trace.SpanKindServer))
err := r.p.Destroy(ctx, req.GetPipelines()[i])
ii := i
eg.Go(func() error {
ctx, span := r.p.tracer.Tracer(spanName).Start(context.Background(), "destroy_pipeline", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
err := r.p.Destroy(ctx, req.GetPipelines()[ii])

if err != nil {
span.SetAttributes(attribute.KeyValue{
Key: "error",
Value: attribute.StringValue(err.Error()),
})
span.End()
return errors.E(op, err)
}
destroyed = append(destroyed, req.GetPipelines()[i])
span.End()
if err != nil {
span.SetAttributes(attribute.KeyValue{
Key: "error",
Value: attribute.StringValue(err.Error()),
})

return errors.E(op, err)
}

destroyed = append(destroyed, req.GetPipelines()[ii])

return nil
})
}

err := eg.Wait()

if err != nil {
return err
}

// return destroyed pipelines
Expand Down

0 comments on commit 583a5ae

Please sign in to comment.