diff --git a/internal/pkg/pipeline/pipeline.go b/internal/pkg/pipeline/pipeline.go index 4665c06..8fba732 100644 --- a/internal/pkg/pipeline/pipeline.go +++ b/internal/pkg/pipeline/pipeline.go @@ -196,9 +196,11 @@ func (p *Pipeline) distributeToChannels(input <-chan *record.Record, outputs []c }() for rec := range input { + // Send clones to all channels to avoid data races + // and ensure GC can clean up records after processing for _, ch := range outputs { if ch != nil { - ch <- rec + ch <- rec.Clone() } } } diff --git a/internal/pkg/pipeline/record/record.go b/internal/pkg/pipeline/record/record.go index 0f406bb..525aa7d 100644 --- a/internal/pkg/pipeline/record/record.go +++ b/internal/pkg/pipeline/record/record.go @@ -2,6 +2,7 @@ package record import ( "encoding/json" + "maps" ) type Record struct { @@ -28,3 +29,27 @@ func (r *Record) Bytes() []byte { return data } + +// Clone creates a deep copy of the record to prevent shared references across parallel pipeline branches. +func (r *Record) Clone() *Record { + if r == nil { + return nil + } + + newRec := &Record{ + ID: r.ID, + Origin: r.Origin, + } + + if r.Data != nil { + newRec.Data = make([]byte, len(r.Data)) + copy(newRec.Data, r.Data) + } + + if r.Meta != nil { + newRec.Meta = make(map[string]string, len(r.Meta)) + maps.Copy(newRec.Meta, r.Meta) + } + + return newRec +} diff --git a/internal/pkg/pipeline/task/converter/sst.go b/internal/pkg/pipeline/task/converter/sst.go index 5cc2af4..c24ddaf 100644 --- a/internal/pkg/pipeline/task/converter/sst.go +++ b/internal/pkg/pipeline/task/converter/sst.go @@ -1,10 +1,10 @@ package converter import ( + "bytes" "errors" "os" "sort" - "strings" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" @@ -19,17 +19,20 @@ const ( ) func (c *sst) convert(data []byte, d string) ([]converterOutput, error) { - lines := strings.Split(string(data), "\n") + delimBytes := []byte(d) + newline := []byte("\n") + lines := bytes.Split(data, newline) values := map[string]string{} for _, line := range lines { - if line == "" { + if len(line) == 0 { continue } - kv := strings.SplitN(line, d, 2) + kv := bytes.SplitN(line, delimBytes, 2) if len(kv) != 2 { - return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + line + ")") + return nil, errors.New("invalid input for sst converter: expected 'key" + d + "value' format (got: " + string(line) + ")") } - values[kv[0]] = kv[1] + + values[string(kv[0])] = string(kv[1]) } fileName, err := c.createSST(values) @@ -37,7 +40,7 @@ func (c *sst) convert(data []byte, d string) ([]converterOutput, error) { return nil, err } defer os.Remove(fileName) - + sstData, err := os.ReadFile(fileName) if err != nil { return nil, err diff --git a/internal/pkg/pipeline/task/join/join.go b/internal/pkg/pipeline/task/join/join.go index 994f2e2..5146f04 100644 --- a/internal/pkg/pipeline/task/join/join.go +++ b/internal/pkg/pipeline/task/join/join.go @@ -1,9 +1,9 @@ package join import ( + "bytes" "context" "fmt" - "strings" "time" "github.com/patterninc/caterpillar/internal/pkg/duration" @@ -88,21 +88,25 @@ func (j *join) Run(ctx context.Context, input <-chan *record.Record, output chan func (j *join) flushBuffer(output chan<- *record.Record) { if len(j.buffer) > 0 { j.sendJoinedRecords(output) + // clear the buffer, explicitly nil out the elements to help GC + for i := range j.buffer { + j.buffer[i] = nil + } j.buffer = j.buffer[:0] } } func (j *join) sendJoinedRecords(output chan<- *record.Record) { - // Join all data with the specified delimiter - var joinedData strings.Builder + var joinedData bytes.Buffer + delimBytes := []byte(j.Delimiter) for i, r := range j.buffer { if i > 0 { - joinedData.WriteString(j.Delimiter) + joinedData.Write(delimBytes) } joinedData.Write(r.Data) } - j.SendData(nil, []byte(joinedData.String()), output) - + data := bytes.Clone(joinedData.Bytes()) + j.SendData(nil, data, output) } diff --git a/internal/pkg/pipeline/task/replace/replace.go b/internal/pkg/pipeline/task/replace/replace.go index bd2dabb..5587a57 100644 --- a/internal/pkg/pipeline/task/replace/replace.go +++ b/internal/pkg/pipeline/task/replace/replace.go @@ -25,13 +25,15 @@ func (r *replace) Run(ctx context.Context, input <-chan *record.Record, output c return err } + replacementBytes := []byte(r.Replacement) + if output != nil { for { record, ok := r.GetRecord(input) if !ok { break } - r.SendData(record.Meta, []byte(rx.ReplaceAllString(string(record.Data), r.Replacement)), output) + r.SendData(record.Meta, rx.ReplaceAll(record.Data, replacementBytes), output) } } diff --git a/internal/pkg/pipeline/task/split/split.go b/internal/pkg/pipeline/task/split/split.go index d29dd87..d53863d 100644 --- a/internal/pkg/pipeline/task/split/split.go +++ b/internal/pkg/pipeline/task/split/split.go @@ -1,8 +1,8 @@ package split import ( + "bytes" "context" - "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" @@ -22,16 +22,21 @@ func New() (task.Task, error) { Delimiter: defaultDelimiter, }, nil } + func (s *split) Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error { + delimBytes := []byte(s.Delimiter) + for { r, ok := s.GetRecord(input) if !ok { break } - lines := strings.Split(strings.TrimSuffix(string(r.Data), s.Delimiter), s.Delimiter) + + data := bytes.TrimSuffix(r.Data, delimBytes) + lines := bytes.Split(data, delimBytes) for _, line := range lines { - s.SendData(r.Meta, []byte(line), output) + s.SendData(r.Meta, line, output) } } diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 781ef07..d6e5f12 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -1,6 +1,7 @@ package task import ( + "bytes" "context" "encoding/json" "fmt" @@ -25,6 +26,12 @@ var ( ErrPresentInputOutput = fmt.Errorf(`either input or output must be set, not both`) ) +var byteBufferPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + type Task interface { Run(ctx context.Context, input <-chan *record.Record, output chan<- *record.Record) error GetName() string @@ -119,13 +126,26 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we output <- r }() + // get a buffer from the pool + buf := byteBufferPool.Get().(*bytes.Buffer) + defer byteBufferPool.Put(buf) + buf.Reset() + // before we set context, let's serialize the whole record - data, err := json.Marshal(r) - if err != nil { - // TODO: do prom metrics / log event to syslog + if err := json.NewEncoder(buf).Encode(r); err != nil { fmt.Println(`ERROR (marshal):`, err) return } + + data := buf.Bytes() + + // get another buffer for the query results + ctxBuf := byteBufferPool.Get().(*bytes.Buffer) + defer byteBufferPool.Put(ctxBuf) + ctxBuf.Reset() + + ctxEncoder := json.NewEncoder(ctxBuf) + // Set the context values for the record for name, query := range b.Context { queryResult, err := query.Execute(data) @@ -135,13 +155,18 @@ func (b *Base) SendRecord(r *record.Record, output chan<- *record.Record) /* we return } // now, let's marshal it to json and set in the context... - contextValueJson, err := json.Marshal(queryResult) - if err != nil { + if err := ctxEncoder.Encode(queryResult); err != nil { // TODO: do prom metrics / log event to syslog fmt.Println(`ERROR (result):`, err) return } - r.SetMetaValue(name, string(contextValueJson)) + // Remove trailing newline added by json.Encoder.Encode() + value := ctxBuf.String() + if len(value) > 0 && value[len(value)-1] == '\n' { + value = value[:len(value)-1] + } + r.SetMetaValue(name, value) + ctxBuf.Reset() } }