Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
75badd8
Support for ZIP file compression and decompression.
ma-gk Jan 28, 2026
ff59ec8
Merge remote-tracking branch 'origin/main' into zip_support
ma-gk Jan 28, 2026
0a6eeca
Merge remote-tracking branch 'origin' into zip_support
ma-gk Jan 29, 2026
a52ae4f
Support for tar and zip archive
ma-gk Jan 29, 2026
e735358
refactor: remove zip format support from compression handlers
ma-gk Jan 29, 2026
1e654d2
feat: add zip packing and unpacking test pipelines
ma-gk Jan 29, 2026
073193a
fix: correct tar archive writing logic and buffer handling
ma-gk Jan 30, 2026
9e0cf2b
test file for the tar with multi file output
ma-gk Jan 30, 2026
7a94408
feat: add comprehensive README for archive task with ZIP and TAR support
ma-gk Jan 30, 2026
7c136db
feat: add birds file and update zip pack/unpack test configurations
ma-gk Jan 30, 2026
8496ea5
refactor: rename extraction tasks for clarity and consistency in README
ma-gk Jan 30, 2026
297abdb
refactor: rename action types for clarity in archiving process
ma-gk Jan 30, 2026
c3a74fa
fix: correct error handling in tar archive read function and improve …
ma-gk Jan 30, 2026
829ac05
fix: improve error handling in zip archive read function
ma-gk Jan 30, 2026
018786a
removed duplicate file
ma-gk Jan 30, 2026
309c1dc
multi file support with proper naming conventions
ma-gk Jan 30, 2026
5b8246a
Refactored code used map instead of switch case
ma-gk Feb 2, 2026
972476b
refactor: replace string literals with context keys for file path han…
ma-gk Feb 4, 2026
f3d603c
Merge branch 'main' into zip_support
Mayureshpawar29 Feb 4, 2026
d6d4331
fix: update log message for empty filepath in context
ma-gk Feb 5, 2026
8430e13
Merge remote-tracking branch 'origin' into zip_support
ma-gk Feb 5, 2026
782ab23
Merge branch 'zip_support' of ssh://github.com/patterninc/caterpillar…
ma-gk Feb 5, 2026
e2370c0
refactor: rename context keys for file path handling in archive and f…
ma-gk Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions internal/pkg/pipeline/task/archive/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package archive

import (
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type actionType string

const (
actionCompress actionType = `pack`
actionDecompress actionType = `unpack`
)

const (
defaultFormat = `zip`
defaultAction = `pack`
)

type archiver interface {
Read(b []byte)
Write(b []byte)
}
Comment on lines +22 to +25
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archiver interface methods Read and Write don't return errors, which prevents proper error handling. These methods should return error values so that errors can be propagated back through the Run method to the caller. Compare with the compress task where compress/decompress methods return errors.

Copilot uses AI. Check for mistakes.

type core struct {
task.Base `yaml:",inline" json:",inline"`
Format string `yaml:"format,omitempty" json:"format,omitempty"`
Action actionType `yaml:"action,omitempty" json:"action,omitempty"`
FileName string `yaml:"file_name,omitempty" json:"file_name,omitempty"`
}

func New() (task.Task, error) {
return &core{
Format: defaultFormat,
Action: defaultAction,
}, nil
}

func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw core
obj := raw{
Format: defaultFormat,
Action: defaultAction,
}
if err := unmarshal(&obj); err != nil {
return err
}

if obj.Action != actionCompress && obj.Action != actionDecompress {
return fmt.Errorf("invalid action: %s (must be 'compress' or 'decompress')", obj.Action)
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message mentions 'compress' or 'decompress' but the actual valid action values are 'pack' and 'unpack' as defined in the constants at lines 13-14. This inconsistency between the error message and the actual values will confuse users.

Suggested change
return fmt.Errorf("invalid action: %s (must be 'compress' or 'decompress')", obj.Action)
return fmt.Errorf("invalid action: %s (must be 'pack' or 'unpack')", obj.Action)

Copilot uses AI. Check for mistakes.
}

if obj.Action == actionCompress {
if obj.FileName == "" {
return fmt.Errorf("file_name must be specified when action is 'pack'")
}
}

*c = core(obj)

return nil
}
Comment on lines +67 to +84
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Format field is not validated in UnmarshalYAML. Unlike the compress task which validates the format against a map of supported formats, this code only validates the action. Invalid formats like "rar" or "7z" would only be caught at runtime in the Run method's switch statement. Format validation should happen during configuration unmarshaling for early error detection.

Copilot uses AI. Check for mistakes.

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
}

for {
r, ok := c.GetRecord(input)
if !ok {
break
}

if len(r.Data) == 0 {
continue
}

var archiv archiver

switch c.Format {
case "tar":
archiv = &tarArchive{
Base: &c.Base,
FileName: c.FileName,
Record: r,
OutputChan: output,
}
case "zip":
archiv = &zipArchive{
Base: &c.Base,
FileName: c.FileName,
Record: r,
OutputChan: output,
}
default:
return fmt.Errorf("unsupported format: %s", c.Format)
}

switch c.Action {
case actionCompress:
archiv.Write(r.Data)
case actionDecompress:
archiv.Read(r.Data)
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Read and Write calls don't capture or handle any errors because the archiver interface methods don't return errors. If an error occurs in Read or Write, it will either terminate the program via log.Fatal or be silently ignored, but cannot be properly handled by the Run method. This needs to be addressed by updating the archiver interface to return errors.

Copilot uses AI. Check for mistakes.
}
return nil
}
Comment on lines +1 to +103
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new archive task implementation lacks unit tests. Given that the repository has comprehensive test coverage for other components (e.g., internal/pkg/pipeline/dag_test.go), unit tests should be added to verify the pack and unpack operations for both ZIP and TAR formats, error handling paths, and edge cases such as empty archives, corrupted archives, and files with various sizes.

Copilot uses AI. Check for mistakes.
65 changes: 65 additions & 0 deletions internal/pkg/pipeline/task/archive/tar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package archive

import (
"archive/tar"
"bytes"
"io"
"log"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type tarArchive struct {
*task.Base
FileName string
Record *record.Record
OutputChan chan<- *record.Record
}

func (t *tarArchive) Read(b []byte) {
r := tar.NewReader(bytes.NewReader(b))

for {
header, err := r.Next()
if err != nil {
break
}

// check the file type is regular file
if header.Typeflag == tar.TypeReg {
buf := make([]byte, header.Size)
if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF {
log.Fatal(err)
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal will terminate the entire program. This should return an error instead so the pipeline can handle it according to the FailOnError configuration. The Read and Write methods should return errors.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal in library/task code will abruptly terminate the entire application, which is inappropriate for a pipeline task. Errors should be propagated back through the Run method's return value to allow the calling code to handle them gracefully. The archiver interface methods should return errors, and the Run method should handle those errors appropriately.

Copilot uses AI. Check for mistakes.
}
t.SendData(t.Record.Context, buf, t.OutputChan)
}

}
}

func (t *tarArchive) Write(b []byte) {

if t.FileName == "" {
log.Fatal("file name is required to create tar archive")
}

Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error check should not use log.Fatal as it terminates the entire program. The validation should occur in UnmarshalYAML during configuration parsing, not during runtime execution. This check is redundant since validation already happens in archive.go at line 55-58.

Suggested change
if t.FileName == "" {
log.Fatal("file name is required to create tar archive")
}

Copilot uses AI. Check for mistakes.
tw := tar.NewWriter(bytes.NewBuffer(b))
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tar.NewWriter is incorrectly initialized with bytes.NewBuffer(b), which creates a new buffer containing a copy of the input data. This should be an empty buffer like bytes.NewBuffer(nil) or new(bytes.Buffer). The current implementation would prepend the raw input data before the tar archive data, resulting in a corrupted output.

Copilot uses AI. Check for mistakes.

tw.WriteHeader(
&tar.Header{Name: t.FileName,
Mode: 0777,
Size: int64(len(b))})

_, err := tw.Write(b)
if err != nil {
log.Fatal(err)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal in library/task code will abruptly terminate the entire application, which is inappropriate for a pipeline task. Errors should be propagated back through the Run method's return value to allow the calling code to handle them gracefully. The archiver interface methods should return errors, and the Run method should handle those errors appropriately.

Copilot uses AI. Check for mistakes.
}

err = tw.Close()
if err != nil {
log.Fatal(err)
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal will terminate the entire program. These errors should be returned instead for proper pipeline error handling.

Copilot uses AI. Check for mistakes.

t.SendData(t.Record.Context, b, t.OutputChan)
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After writing to the tar archive, this sends the original input data (b) instead of the tar archive buffer. This should send the tar writer's buffer contents (e.g., tarBuf.Bytes()) instead. The current implementation would send the uncompressed data rather than the tar archive.

Copilot uses AI. Check for mistakes.
}
66 changes: 66 additions & 0 deletions internal/pkg/pipeline/task/archive/zip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package archive

import (
"archive/zip"
"bytes"
"errors"
"io"
"log"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type zipArchive struct {
*task.Base
FileName string
Record *record.Record
OutputChan chan<- *record.Record
}

func (z *zipArchive) Read(b []byte) {
r, err := zip.NewReader(bytes.NewReader(b), int64(len(b)))
if err != nil {
log.Fatal(err)
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal here will terminate the entire program instead of allowing the error to be handled gracefully by the pipeline. Based on the pattern used in similar tasks (like compress), errors should be returned to the caller so the pipeline can handle them according to the FailOnError configuration. The Read and Write methods should return errors that can be propagated up through the Run method.

Copilot uses AI. Check for mistakes.
}
for _, f := range r.File {

// check the file type is regular file
if f.FileInfo().Mode().IsRegular() {

rc, err := f.Open()
if err != nil {
log.Fatal(err)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal in library/task code will abruptly terminate the entire application, which is inappropriate for a pipeline task. Errors should be propagated back through the Run method's return value to allow the calling code to handle them gracefully. The archiver interface methods should return errors, and the Run method should handle those errors appropriately.

Copilot uses AI. Check for mistakes.
}

buf := make([]byte, f.FileInfo().Size())
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archive task has no protection against ZIP bomb attacks or excessively large archives. When unpacking, the code allocates memory based on the file size reported in the archive header without any validation. A malicious archive could claim extremely large file sizes, causing out-of-memory errors or system crashes. Consider adding limits on individual file sizes and total archive size, or at least document the risk more explicitly with recommended mitigations.

Copilot uses AI. Check for mistakes.

_, err = rc.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using rc.Read may not read the entire file content in a single call, especially for larger files. The current code assumes all data is read in one Read call, which could result in truncated file extraction. Use io.ReadFull or io.ReadAll to ensure the complete file content is read.

Suggested change
buf := make([]byte, f.FileInfo().Size())
_, err = rc.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
buf, err := io.ReadAll(rc)
if err != nil {

Copilot uses AI. Check for mistakes.
log.Fatal(err)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal in library/task code will abruptly terminate the entire application, which is inappropriate for a pipeline task. Errors should be propagated back through the Run method's return value to allow the calling code to handle them gracefully. The archiver interface methods should return errors, and the Run method should handle those errors appropriately.

Copilot uses AI. Check for mistakes.
}

rc.Close()
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal will terminate the entire program. This error should be returned instead. Additionally, rc.Close() on line 43 should be deferred to ensure the file is closed even if an error occurs during Read. The pattern should be similar to other tasks in the codebase that return errors for proper error handling.

Copilot uses AI. Check for mistakes.

z.SendData(z.Record.Context, buf, z.OutputChan)
}
}
}

func (z *zipArchive) Write(b []byte) {

zipBuf := new(bytes.Buffer)
zipWriter := zip.NewWriter(zipBuf)

if z.FileName == "" {
log.Fatal("file name is required to create zip archive")
}

Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error check should not use log.Fatal as it terminates the entire program. Additionally, the validation should occur in UnmarshalYAML during configuration parsing, not during runtime execution. This check is redundant since validation already happens in archive.go at line 55-58.

Suggested change
if z.FileName == "" {
log.Fatal("file name is required to create zip archive")
}

Copilot uses AI. Check for mistakes.
w, _ := zipWriter.Create(z.FileName)
w.Write(b)

zipWriter.Close()

Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple error handling issues: (1) The error from zipWriter.Create is being silently ignored with the blank identifier. (2) The error from w.Write is being ignored. (3) The error from zipWriter.Close is being ignored. All of these errors should be checked and handled appropriately, similar to how the compress task handles errors.

Suggested change
w, _ := zipWriter.Create(z.FileName)
w.Write(b)
zipWriter.Close()
w, err := zipWriter.Create(z.FileName)
if err != nil {
log.Fatal(err)
}
if _, err := w.Write(b); err != nil {
log.Fatal(err)
}
if err := zipWriter.Close(); err != nil {
log.Fatal(err)
}

Copilot uses AI. Check for mistakes.
z.SendData(z.Record.Context, zipBuf.Bytes(), z.OutputChan)

}
2 changes: 1 addition & 1 deletion internal/pkg/pipeline/task/compress/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
return io.NopCloser(snappy.NewReader(r)), nil
},
NewWriter: func(w io.Writer) io.WriteCloser {
return snappy.NewWriter(w)
return snappy.NewBufferedWriter(w)
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change from snappy.NewWriter(w) to snappy.NewBufferedWriter(w) appears to be unrelated to adding ZIP support. While NewBufferedWriter may provide better performance by buffering writes, this change should either:

  1. Be mentioned in the PR description as an additional improvement
  2. Be separated into its own PR
  3. Be reverted if it was changed unintentionally

Please clarify if this change is intentional and document the reason for it.

Suggested change
return snappy.NewBufferedWriter(w)
return snappy.NewWriter(w)

Copilot uses AI. Check for mistakes.
}},
}
)
2 changes: 2 additions & 0 deletions internal/pkg/pipeline/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task/archive"
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description states "Support for ZIP file compression and decompression" and mentions only ZIP/DEFLATE format, but the code actually implements support for both ZIP and TAR archive formats. The description should be updated to accurately reflect that both archive formats are being added, or the TAR implementation should be removed if it's out of scope for this PR.

Copilot uses AI. Check for mistakes.
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task/aws/parameter_store"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task/compress"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task/converter"
Expand All @@ -33,6 +34,7 @@ type tasks []task.Task
var (
validate = validator.New()
supportedTasks = map[string]func() (task.Task, error){
`archive`: archive.New,
`aws_parameter_store`: parameter_store.New,
`compress`: compress.New,
`converter`: converter.New,
Expand Down
124 changes: 124 additions & 0 deletions test/pipelines/birds.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
Albatross
Acorn Woodpecker
American Kestrel
Anna's Hummingbird
Bald Eagle
Baltimore Oriole
Barn Swallow
Belted Kingfisher
Bicolored Antbird
Black Capped Chickadee
Black Skimmer
Blue Jay
Bluebird
Bobolink
Bohemian Waxwing
Brown Creeper
Brown Pelican
Burrowing Owl
California Condor
California Quail
Canada Goose
Cardinal
Caspian Tern
Cedar Waxwing
Chestnut Sided Warbler
Chimney Swift
Chipping Sparrow
Clark's Nutcracker
Clay Colored Sparrow
Cliff Swallow
Columbiformes
Common Eider
Common Goldeneye
Common Grackle
Common Loon
Common Merganser
Common Raven
Common Tern
Common Yellowthroat
Coopers Hawk
Cory's Shearwater
Crested Flycatcher
Curve Billed Thrasher
Dark Eyed Junco
Dickcissel
Dovekie
Downy Woodpecker
Drab Seedeater
Dunnock
Eastern Bluebird
Eastern Meadowlark
Eastern Phoebe
Eastern Screech Owl
Eastern Towhee
Eastern Wood Pewee
Eared Grebe
Egyptian Plover
Elanus leucurus
Evening Grosbeak
Eared Quetzal
Eurasian Wigeon
Eurpean Starling
Fabulous Flamingo
Ferruginous Hawk
Fiscal Flycatcher
Flammulated Owl
Flatbill
Flesh Footed Shearwater
Florida Jay
Fringilla coelebs
Fulmar
Gadwall
Gambel's Quail
Gannet
Garden Warbler
Gnatcatcher
Godwit
Golden Eagle
Golden Winged Warbler
Goldeneye
Goldfinch
Goosander
Goshawk
Grace's Warbler
Grasshopper Sparrow
Gray Catbird
Great Black Backed Gull
Great Blue Heron
Great Crested Flycatcher
Great Horned Owl
Great Kiskadee
Great Spotted Woodpecker
Great Tit
Grebe
Greenbul
Green Heron
Green Tailed Towhee
Green Winged Teal
Greenlet
Grey Kingbird
Grey Owl
Grosbeaks
Grouse
Gull
Hairy Woodpecker
Hammond's Flycatcher
Harris Hawk
Harris Sparrow
Hawaiian Creeper
Hawaiian Goose
Hawfinch
Heathland Francolin
Herring Gull
Hoary Puffleg
Hooded Merganser
Hooded Oriole
Hooded Warbler
Hoopoe
Horned Auk
Horned Grebe
Horned Lark
House Finch
House Sparrow
House Wren
Loading