Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
75badd8
Support for ZIP file compression and decompression.
ma-gk Jan 28, 2026
ff59ec8
Merge remote-tracking branch 'origin/main' into zip_support
ma-gk Jan 28, 2026
0a6eeca
Merge remote-tracking branch 'origin' into zip_support
ma-gk Jan 29, 2026
a52ae4f
Support for tar and zip archive
ma-gk Jan 29, 2026
e735358
refactor: remove zip format support from compression handlers
ma-gk Jan 29, 2026
1e654d2
feat: add zip packing and unpacking test pipelines
ma-gk Jan 29, 2026
073193a
fix: correct tar archive writing logic and buffer handling
ma-gk Jan 30, 2026
9e0cf2b
test file for the tar with multi file output
ma-gk Jan 30, 2026
7a94408
feat: add comprehensive README for archive task with ZIP and TAR support
ma-gk Jan 30, 2026
7c136db
feat: add birds file and update zip pack/unpack test configurations
ma-gk Jan 30, 2026
8496ea5
refactor: rename extraction tasks for clarity and consistency in README
ma-gk Jan 30, 2026
297abdb
refactor: rename action types for clarity in archiving process
ma-gk Jan 30, 2026
c3a74fa
fix: correct error handling in tar archive read function and improve …
ma-gk Jan 30, 2026
829ac05
fix: improve error handling in zip archive read function
ma-gk Jan 30, 2026
018786a
removed duplicate file
ma-gk Jan 30, 2026
309c1dc
multi file support with proper naming conventions
ma-gk Jan 30, 2026
5b8246a
Refactored code used map instead of switch case
ma-gk Feb 2, 2026
972476b
refactor: replace string literals with context keys for file path han…
ma-gk Feb 4, 2026
f3d603c
Merge branch 'main' into zip_support
Mayureshpawar29 Feb 4, 2026
d6d4331
fix: update log message for empty filepath in context
ma-gk Feb 5, 2026
8430e13
Merge remote-tracking branch 'origin' into zip_support
ma-gk Feb 5, 2026
782ab23
Merge branch 'zip_support' of ssh://github.com/patterninc/caterpillar…
ma-gk Feb 5, 2026
e2370c0
refactor: rename context keys for file path handling in archive and f…
ma-gk Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions internal/pkg/pipeline/task/archive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Archive Task

The `archive` task packages or extracts files using various archive formats, enabling efficient file bundling and extraction for data processing pipelines.

## Function

The archive task can operate in two modes:
- **Pack mode**: Combines multiple files into a single archive
- **Unpack mode**: Extracts files from an archive

## Behavior

The archive task processes data based on the `action` field:
- **Pack**: Receives individual files and creates an archive file containing them
- **Unpack**: Receives an archive file and extracts its contents, outputting each file individually

The task receives records from its input channel, applies the archiving operation, and sends the processed records to its output channel.

## Configuration Fields

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `name` | string | - | Task name for identification |
| `type` | string | `archive` | Must be "archive" |
| `format` | string | `zip` | Archive format (zip, tar) |
| `action` | string | `pack` | Action type (pack or unpack) |

## Supported Formats

The task supports the following archive formats:
- **zip**: Standard ZIP format, widely compatible
- **tar**: TAR format, commonly used in Unix/Linux environments

## Example Configurations

### Pack files into a ZIP archive:
```yaml
tasks:
- name: create_zip
type: archive
format: zip
action: pack
```

### Unpack a ZIP archive:
```yaml
tasks:
- name: extract_zip
type: archive
format: zip
action: unpack
```

### Pack files into a TAR archive:
```yaml
tasks:
- name: create_tar
type: archive
format: tar
action: pack
```

### Unpack a TAR archive:
```yaml
tasks:
- name: extract_tar
type: archive
format: tar
action: unpack
```

## Sample Pipelines

- `test/pipelines/zip_pack_test.yaml` - ZIP packing example
- `test/pipelines/zip_unpack_test.yaml` - ZIP unpacking example
- `test/pipelines/tar_unpack_multifile_test.yaml` - TAR unpacking with multiple files

## Use Cases

- **File bundling**: Package multiple files into a single archive for distribution
- **Data consolidation**: Combine separate data files into archives for storage
- **Archive extraction**: Extract files from archives for processing
- **Backup operations**: Create archives of processed data for backup
- **Format conversion**: Convert between archive formats
- **Multi-file handling**: Process multiple files as a single archive unit
92 changes: 92 additions & 0 deletions internal/pkg/pipeline/task/archive/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package archive

import (
"fmt"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type actionType string

const (
actionPack actionType = `pack`
actionUnpack actionType = `unpack`
)

const (
defaultFormat = `zip`
defaultAction = `pack`
)

type archiver interface {
Read()
Write()
}
Comment on lines +22 to +25
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archiver interface methods Read and Write don't return errors, which prevents proper error handling. These methods should return error values so that errors can be propagated back through the Run method to the caller. Compare with the compress task where compress/decompress methods return errors.

Copilot uses AI. Check for mistakes.

type core struct {
task.Base `yaml:",inline" json:",inline"`
Format string `yaml:"format,omitempty" json:"format,omitempty"`
Action actionType `yaml:"action,omitempty" json:"action,omitempty"`
}

func New() (task.Task, error) {
return &core{
Format: defaultFormat,
Action: defaultAction,
}, nil
}

func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw core
obj := raw{
Format: defaultFormat,
Action: defaultAction,
}
if err := unmarshal(&obj); err != nil {
return err
}

if obj.Action != actionPack && obj.Action != actionUnpack {
return fmt.Errorf("invalid action: %s (must be 'pack' or 'unpack')", obj.Action)
}

*c = core(obj)

return nil
}
Comment on lines +67 to +84
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Format field is not validated in UnmarshalYAML. Unlike the compress task which validates the format against a map of supported formats, this code only validates the action. Invalid formats like "rar" or "7z" would only be caught at runtime in the Run method's switch statement. Format validation should happen during configuration unmarshaling for early error detection.

Copilot uses AI. Check for mistakes.

func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) {

if input == nil {
return task.ErrNilInput
}

var archiv archiver

switch c.Format {
case "tar":
archiv = &tarArchive{
Base: &c.Base,
OutputChan: output,
InputChan: input,
}
case "zip":
archiv = &zipArchive{
Base: &c.Base,
OutputChan: output,
InputChan: input,
}
default:
return fmt.Errorf("unsupported format: %s", c.Format)
}

switch c.Action {
case actionPack:
archiv.Write()
case actionUnpack:
archiv.Read()
}

return nil
}
Comment on lines +1 to +103
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new archive task implementation lacks unit tests. Given that the repository has comprehensive test coverage for other components (e.g., internal/pkg/pipeline/dag_test.go), unit tests should be added to verify the pack and unpack operations for both ZIP and TAR formats, error handling paths, and edge cases such as empty archives, corrupted archives, and files with various sizes.

Copilot uses AI. Check for mistakes.
108 changes: 108 additions & 0 deletions internal/pkg/pipeline/task/archive/tar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package archive

import (
"archive/tar"
"bytes"
"io"
"log"
"strings"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type tarArchive struct {
*task.Base
OutputChan chan<- *record.Record
InputChan <-chan *record.Record
}

func (t *tarArchive) Read() {

for {
rc, ok := t.GetRecord(t.InputChan)
if !ok {
break
}

if len(rc.Data) == 0 {
continue
}

b := rc.Data

r := tar.NewReader(bytes.NewReader(b))

for {
header, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using log.Fatal in library/task code will abruptly terminate the entire application, which is inappropriate for a pipeline task. Errors should be propagated back through the Run method's return value to allow the calling code to handle them gracefully. The archiver interface methods should return errors, and the Run method should handle those errors appropriately.

Copilot uses AI. Check for mistakes.
}

// check the file type is regular file
if header.Typeflag == tar.TypeReg {
buf := make([]byte, header.Size)
if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF {
log.Fatal(err)
}
rc.SetContextValue("CATERPILLER_FILE_PATH_READ", header.Name)
t.SendData(rc.Context, buf, t.OutputChan)
}

}
}
}

func (t *tarArchive) Write() {

var buf bytes.Buffer
tw := tar.NewWriter(&buf)
var rc record.Record

for {
rec, ok := t.GetRecord(t.InputChan)
if !ok {
break
}
b := rec.Data

if len(b) == 0 {
continue
}

filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH")
if !found {
log.Fatal("filepath not set in context")
}

if filePath == "" {
log.Fatal("file_name is required when filepath is not in context")
}

filePath = strings.ReplaceAll(filePath, "\\", "/")

header := &tar.Header{
Name: filePath,
Mode: 0600,
Size: int64(len(b)),
}
if err := tw.WriteHeader(header); err != nil {
log.Fatal(err)
}

if _, err := tw.Write(b); err != nil {
log.Fatal(err)
}

rc.Context = rec.Context
}

if err := tw.Close(); err != nil {
log.Fatal(err)
}

t.SendData(rc.Context, buf.Bytes(), t.OutputChan)
}
106 changes: 106 additions & 0 deletions internal/pkg/pipeline/task/archive/zip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package archive

import (
"archive/zip"
"bytes"
"io"
"log"
"strings"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
)

type zipArchive struct {
*task.Base
OutputChan chan<- *record.Record
InputChan <-chan *record.Record
}

func (z *zipArchive) Read() {
for {
rc, ok := z.GetRecord(z.InputChan)
if !ok {
break
}

if len(rc.Data) == 0 {
continue
}

b := rc.Data

r, err := zip.NewReader(bytes.NewReader(b), int64(len(b)))
if err != nil {
log.Fatal(err)
}
for _, f := range r.File {

// check the file type is regular file
if f.FileInfo().Mode().IsRegular() {

rc.SetContextValue("CATERPILLER_FILE_PATH_READ", f.Name)

fs, err := f.Open()
if err != nil {
log.Fatal(err)
}

buf := make([]byte, f.FileInfo().Size())

_, err = fs.Read(buf)
if err != nil && err != io.EOF {
log.Fatal(err)
}

fs.Close()

z.SendData(rc.Context, buf, z.OutputChan)
}
}
}
}

func (z *zipArchive) Write() {

zipBuf := new(bytes.Buffer)
zipWriter := zip.NewWriter(zipBuf)
var rc record.Record

for {
rec, ok := z.GetRecord(z.InputChan)
if !ok {
break
}

filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH")
if !found {
log.Fatal("filepath not set in context")
}

if filePath == "" {
log.Fatal("file_name is required when filepath is not in context")
}

filePath = strings.ReplaceAll(filePath, "\\", "/")

w, err := zipWriter.Create(filePath)
if err != nil {
log.Fatal(err)
}
_, err = w.Write(rec.Data)
if err != nil {
log.Fatal(err)
}

rc.Context = rec.Context
}

if err := zipWriter.Close(); err != nil {
log.Fatal(err)
}

// Send the complete ZIP archive
z.SendData(rc.Context, zipBuf.Bytes(), z.OutputChan)

}
Loading