Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions internal/pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,16 @@ func (p *Pipeline) distributeToChannels(input <-chan *record.Record, outputs []c
}()

for rec := range input {
for _, ch := range outputs {
for i, ch := range outputs {
if ch != nil {
ch <- rec
if i == 0 {
// First branch gets the original record
ch <- rec
} else {
// Other branches get cloned records to prevent shared references
// and allow independent GC of records in each branch
ch <- rec.Clone()
}
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions internal/pkg/pipeline/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package record

import (
"encoding/json"
"maps"
)

type Record struct {
Expand All @@ -28,3 +29,27 @@ func (r *Record) Bytes() []byte {
return data

}

// Clone creates a deep copy of the record to prevent shared references across parallel pipeline branches.
func (r *Record) Clone() *Record {
if r == nil {
return nil
}

newRec := &Record{
ID: r.ID,
Origin: r.Origin,
}

if r.Data != nil {
newRec.Data = make([]byte, len(r.Data))
copy(newRec.Data, r.Data)
}

if r.Meta != nil {
newRec.Meta = make(map[string]string, len(r.Meta))
maps.Copy(newRec.Meta, r.Meta)
}

return newRec
}
17 changes: 10 additions & 7 deletions internal/pkg/pipeline/task/converter/sst.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package converter

import (
"bytes"
"errors"
"os"
"sort"
"strings"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand All @@ -19,25 +19,28 @@ const (
)

func (c *sst) convert(data []byte, d string) ([]converterOutput, error) {
lines := strings.Split(string(data), "\n")
delimBytes := []byte(d)
newline := []byte("\n")
lines := bytes.Split(data, newline)
values := map[string]string{}
for _, line := range lines {
if line == "" {
if len(line) == 0 {
continue
}
kv := strings.SplitN(line, d, 2)
kv := bytes.SplitN(line, delimBytes, 2)
if len(kv) != 2 {
return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + line + ")")
return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + string(line) + ")")
}
values[kv[0]] = kv[1]

values[string(kv[0])] = string(kv[1])
}

fileName, err := c.createSST(values)
if err != nil {
return nil, err
}
defer os.Remove(fileName)

sstData, err := os.ReadFile(fileName)
if err != nil {
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions internal/pkg/pipeline/task/join/join.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package join

import (
"bytes"
"context"
"fmt"
"strings"
"time"

"github.com/patterninc/caterpillar/internal/pkg/duration"
Expand Down Expand Up @@ -88,21 +88,25 @@ func (j *join) Run(ctx context.Context, input <-chan *record.Record, output chan
func (j *join) flushBuffer(output chan<- *record.Record) {
if len(j.buffer) > 0 {
j.sendJoinedRecords(output)
// clear the buffer, explicitly nil out the elements to help GC
for i := range j.buffer {
j.buffer[i] = nil
}
j.buffer = j.buffer[:0]
}
}

func (j *join) sendJoinedRecords(output chan<- *record.Record) {

// Join all data with the specified delimiter
var joinedData strings.Builder
var joinedData bytes.Buffer
delimBytes := []byte(j.Delimiter)
for i, r := range j.buffer {
if i > 0 {
joinedData.WriteString(j.Delimiter)
joinedData.Write(delimBytes)
}
joinedData.Write(r.Data)
}

j.SendData(nil, []byte(joinedData.String()), output)
j.SendData(nil, joinedData.Bytes(), output)

}
4 changes: 3 additions & 1 deletion internal/pkg/pipeline/task/replace/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ func (r *replace) Run(ctx context.Context, input <-chan *record.Record, output c
return err
}

replacementBytes := []byte(r.Replacement)

if output != nil {
for {
record, ok := r.GetRecord(input)
if !ok {
break
}
r.SendData(record.Meta, []byte(rx.ReplaceAllString(string(record.Data), r.Replacement)), output)
r.SendData(record.Meta, rx.ReplaceAll(record.Data, replacementBytes), output)
}
}

Expand Down
13 changes: 9 additions & 4 deletions internal/pkg/pipeline/task/split/split.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package split

import (
"bytes"
"context"
"strings"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
Expand All @@ -22,16 +22,21 @@ func New() (task.Task, error) {
Delimiter: defaultDelimiter,
}, nil
}

func (s *split) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error {

delimBytes := []byte(s.Delimiter)

for {
r, ok := s.GetRecord(input)
if !ok {
break
}
lines := strings.Split(strings.TrimSuffix(string(r.Data), s.Delimiter), s.Delimiter)
for _, line := range lines {
s.SendData(r.Meta, []byte(line), output)

data := bytes.TrimSuffix(r.Data, delimBytes)
lines := bytes.SplitSeq(data, delimBytes)
for line := range lines {
s.SendData(r.Meta, line, output)
}
}

Expand Down
37 changes: 31 additions & 6 deletions internal/pkg/pipeline/task/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package task

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -25,6 +26,12 @@ var (
ErrPresentInputOutput = fmt.Errorf(`either input or output must be set, not both`)
)

var byteBufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

type Task interface {
Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error
GetName() string
Expand Down Expand Up @@ -119,13 +126,26 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we
output <- r
}()

// get a buffer from the pool
buf := byteBufferPool.Get().(*bytes.Buffer)
defer byteBufferPool.Put(buf)
buf.Reset()

// before we set context, let's serialize the whole record
data, err := json.Marshal(r)
if err != nil {
// TODO: do prom metrics / log event to syslog
if err := json.NewEncoder(buf).Encode(r); err != nil {
fmt.Println(`ERROR (marshal):`, err)
return
}

data := buf.Bytes()

// get another buffer for the query results
ctxBuf := byteBufferPool.Get().(*bytes.Buffer)
defer byteBufferPool.Put(ctxBuf)
ctxBuf.Reset()

ctxEncoder := json.NewEncoder(ctxBuf)

// Set the context values for the record
for name, query := range b.Context {
queryResult, err := query.Execute(data)
Expand All @@ -135,13 +155,18 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we
return
}
// now, let's marshal it to json and set in the context...
contextValueJson, err := json.Marshal(queryResult)
if err != nil {
if err := ctxEncoder.Encode(queryResult); err != nil {
// TODO: do prom metrics / log event to syslog
fmt.Println(`ERROR (result):`, err)
return
}
r.SetMetaValue(name, string(contextValueJson))
// Remove trailing newline added by json.Encoder.Encode()
value := ctxBuf.String()
if len(value) > 0 && value[len(value)-1] == '\n' {
value = value[:len(value)-1]
}
r.SetMetaValue(name, value)
ctxBuf.Reset()
}

}
Loading