Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s String) Get(r *record.Record) (string, error) {
return resolveMacro, nil
}

return evaluateContext(resolveMacro, r)
return evaluateMeta(resolveMacro, r)

}

Expand Down Expand Up @@ -63,7 +63,7 @@ func Load(configFile string, obj interface{}) error {
"env": getEnvironmentVariable, // returns environment variable
"macro": setMacroPlaceholder, // set placeholder string for macro replacement
"secret": getSecret, // we use this template function to inject secrets from parameter store
"context": setContextPlaceholder, // set placeholder string for context replacement
"context": SetMetaPlaceholder, // set placeholder string for context replacement. Maintaining "context" as a template function name for backward compatibility
// indent: add `n` spaces after every newline in the value (useful when
// injecting multiline values into YAML block scalars)
"indent": func(n int, v string) string {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/config/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ var (
)

// return placeholder string for context key
func setContextPlaceholder(key string) (string, error) {
func SetMetaPlaceholder(key string) (string, error) {
return fmt.Sprintf(contextPlaceholderString, key), nil
}

func evaluateContext(data string, record *record.Record) (string, error) {
func evaluateMeta(data string, record *record.Record) (string, error) {

// Find all context template patterns
matches := contextTemplateRegex.FindAllStringSubmatch(data, -1)
Expand All @@ -45,7 +45,7 @@ func evaluateContext(data string, record *record.Record) (string, error) {
key := match[1]

// Get the context value
value, ok := record.GetContextValue(key)
value, ok := record.GetMetaValue(key)
if !ok {
missingKeys = append(missingKeys, key)
continue
Expand Down
12 changes: 9 additions & 3 deletions internal/pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"context"
"fmt"
"sync"

Expand All @@ -22,6 +23,8 @@ type Pipeline struct {
wg *sync.WaitGroup
locker *sync.Mutex
errors map[string]error
ctx context.Context
cancel context.CancelFunc
}

func (p *Pipeline) Init() error {
Expand Down Expand Up @@ -61,6 +64,9 @@ func (p *Pipeline) Run() error {
p.ChannelSize = defaultChannelSize
}

// Create pipeline-level context for cancellation
p.ctx, p.cancel = context.WithCancel(context.Background())

// sync
if p.DAG == nil {
// data streams
Expand Down Expand Up @@ -237,18 +243,18 @@ func (p *Pipeline) runTaskConcurrently(t task.Task, input <-chan *record.Record,
taskWg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func(task task.Task, in <-chan *record.Record, out chan<- *record.Record) {
go func(ctx context.Context, task task.Task, in <-chan *record.Record, out chan<- *record.Record) {
defer taskWg.Done()

if err := task.Run(in, out); err != nil {
if err := task.Run(ctx, in, out); err != nil {
fmt.Printf("error in %s: %s\n", task.GetName(), err)
if task.GetFailOnError() {
p.locker.Lock()
p.errors[task.GetName()] = err
p.locker.Unlock()
}
}
}(t, input, output)
}(p.ctx, t, input, output)
}

go func(wg *sync.WaitGroup, out chan<- *record.Record) {
Expand Down
27 changes: 0 additions & 27 deletions internal/pkg/pipeline/record/context.go

This file was deleted.

16 changes: 16 additions & 0 deletions internal/pkg/pipeline/record/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package record

func (r *Record) SetMetaValue(key string, value string) {
if r.Meta == nil {
r.Meta = make(map[string]string)
}
r.Meta[key] = value
}

func (r *Record) GetMetaValue(key string) (string, bool) {
if r.Meta == nil {
return "", false
}
v, ok := r.Meta[key]
return v, ok
}
9 changes: 4 additions & 5 deletions internal/pkg/pipeline/record/record.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package record

import (
"context"
"encoding/json"
)

type Record struct {
ID int `yaml:"id,omitempty" json:"id,omitempty"`
Origin string `yaml:"origin,omitempty" json:"origin,omitempty"`
Data []byte `yaml:"data,omitempty" json:"data,omitempty"`
Context context.Context `yaml:"-" json:"-"`
ID int `yaml:"id,omitempty" json:"id,omitempty"`
Origin string `yaml:"origin,omitempty" json:"origin,omitempty"`
Data []byte `yaml:"data,omitempty" json:"data,omitempty"`
Meta map[string]string `yaml:"context,omitempty" json:"context,omitempty"` // keeping json key as context for backward compatibility
}

func (m Record) MarshalJSON() ([]byte, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/archive/archive.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package archive

import (
"context"
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/pipeline/task/archive/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (t *tarArchive) Read() {
if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF {
log.Fatal(err)
}
rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(header.Name))
t.SendData(rc.Context, buf, t.OutputChan)
rc.SetMetaValue(task.MetaKeyArchiveFileNameWrite, filepath.Base(header.Name))
t.SendData(rc.Meta, buf, t.OutputChan)
}

}
Expand All @@ -73,7 +73,7 @@ func (t *tarArchive) Write() {
continue
}

filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite))
filePath, found := rec.GetMetaValue(task.MetaKeyFileNameWrite)
if !found {
log.Fatal("filepath not set in context")
}
Expand All @@ -97,12 +97,12 @@ func (t *tarArchive) Write() {
log.Fatal(err)
}

rc.Context = rec.Context
rc.Meta = rec.Meta
}

if err := tw.Close(); err != nil {
log.Fatal(err)
}

t.SendData(rc.Context, buf.Bytes(), t.OutputChan)
t.SendData(rc.Meta, buf.Bytes(), t.OutputChan)
}
10 changes: 5 additions & 5 deletions internal/pkg/pipeline/task/archive/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (z *zipArchive) Read() {
// check the file type is regular file
if f.FileInfo().Mode().IsRegular() {

rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(f.Name))
rc.SetMetaValue(task.MetaKeyArchiveFileNameWrite, filepath.Base(f.Name))

fs, err := f.Open()
if err != nil {
Expand All @@ -55,7 +55,7 @@ func (z *zipArchive) Read() {

fs.Close()

z.SendData(rc.Context, buf, z.OutputChan)
z.SendData(rc.Meta, buf, z.OutputChan)
}
}
}
Expand All @@ -73,7 +73,7 @@ func (z *zipArchive) Write() {
break
}

filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite))
filePath, found := rec.GetMetaValue(task.MetaKeyFileNameWrite)
if !found {
log.Fatal("filepath not set in context")
}
Expand All @@ -93,14 +93,14 @@ func (z *zipArchive) Write() {
log.Fatal(err)
}

rc.Context = rec.Context
rc.Meta = rec.Meta
}

if err := zipWriter.Close(); err != nil {
log.Fatal(err)
}

// Send the complete ZIP archive
z.SendData(rc.Context, zipBuf.Bytes(), z.OutputChan)
z.SendData(rc.Meta, zipBuf.Bytes(), z.OutputChan)

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *parameterStore) Init() error {
return nil
}

func (p *parameterStore) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (p *parameterStore) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

for {
r, ok := p.GetRecord(input)
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/pipeline/task/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compress

import (
"bytes"
"context"
"fmt"
"io"

Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e
}

if output != nil {
c.SendData(r.Context, transformedData, output)
c.SendData(r.Meta, transformedData, output)
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/pipeline/task/converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package converter

import (
"context"
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {

}

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (c *core) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

for {
r, ok := c.GetRecord(input)
Expand All @@ -83,10 +84,10 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) er
if out.Data != nil {
// Add metadata to context
for k, v := range out.Metadata {
r.SetContextValue(k, v)
r.SetMetaValue(k, v)
}

c.SendData(r.Context, out.Data, output)
c.SendData(r.Meta, out.Data, output)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/delay/delay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package delay

import (
"context"
"time"

"github.com/patterninc/caterpillar/internal/pkg/duration"
Expand All @@ -23,7 +24,7 @@ func New() (task.Task, error) {
}, nil
}

func (d *delay) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (d *delay) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

for {
r, ok := d.GetRecord(input)
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/pipeline/task/echo/echo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package echo

import (
"context"
"fmt"
"time"

Expand All @@ -21,7 +22,7 @@ func New() (task.Task, error) {
return &echo{}, nil
}

func (e *echo) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {
func (e *echo) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) (err error) {

for {
r, ok := e.GetRecord(input)
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New() (task.Task, error) {
}, nil
}

func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) error {
func (f *file) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

// let's check if we read file or we write file...
if input != nil && output != nil {
Expand Down Expand Up @@ -134,12 +134,12 @@ func (f *file) readFile(output chan<- *record.Record) error {
return err
}

// Create a default record with context
rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), filepath.Base(path))
// create a default record with file name in context
rc := &record.Record{}
rc.SetMetaValue(task.MetaKeyFileNameWrite, filepath.Base(path))

// let's write content to output channel
f.SendData(rc.Context, content, output)
f.SendData(rc.Meta, content, output)

}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (f *file) writeFile(input <-chan *record.Record) error {
var fs file

fs = *f
filePath, found := rc.GetContextValue(string(task.CtxKeyArchiveFileNameWrite))
filePath, found := rc.GetMetaValue(task.MetaKeyArchiveFileNameWrite)
if found {
if filePath == "" {
log.Fatal("required file path")
Expand Down
Loading
Loading