diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md new file mode 100644 index 0000000..90a8020 --- /dev/null +++ b/internal/pkg/pipeline/task/archive/README.md @@ -0,0 +1,85 @@ +# Archive Task + +The `archive` task packages or extracts files using various archive formats, enabling efficient file bundling and extraction for data processing pipelines. + +## Function + +The archive task can operate in two modes: +- **Pack mode**: Combines multiple files into a single archive +- **Unpack mode**: Extracts files from an archive + +## Behavior + +The archive task processes data based on the `action` field: +- **Pack**: Receives individual files and creates an archive file containing them +- **Unpack**: Receives an archive file and extracts its contents, outputting each file individually + +The task receives records from its input channel, applies the archiving operation, and sends the processed records to its output channel. + +## Configuration Fields + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `name` | string | - | Task name for identification | +| `type` | string | `archive` | Must be "archive" | +| `format` | string | `zip` | Archive format (zip, tar) | +| `action` | string | `pack` | Action type (pack or unpack) | + +## Supported Formats + +The task supports the following archive formats: +- **zip**: Standard ZIP format, widely compatible +- **tar**: TAR format, commonly used in Unix/Linux environments + +## Example Configurations + +### Pack files into a ZIP archive: +```yaml +tasks: + - name: create_zip + type: archive + format: zip + action: pack +``` + +### Unpack a ZIP archive: +```yaml +tasks: + - name: extract_zip + type: archive + format: zip + action: unpack +``` + +### Pack files into a TAR archive: +```yaml +tasks: + - name: create_tar + type: archive + format: tar + action: pack +``` + +### Unpack a TAR archive: +```yaml +tasks: + - name: extract_tar + type: archive + format: tar + action: unpack +``` + +## Sample Pipelines + +- `test/pipelines/zip_pack_test.yaml` - ZIP packing example +- `test/pipelines/zip_unpack_test.yaml` - ZIP unpacking example +- `test/pipelines/tar_unpack_multifile_test.yaml` - TAR unpacking with multiple files + +## Use Cases + +- **File bundling**: Package multiple files into a single archive for distribution +- **Data consolidation**: Combine separate data files into archives for storage +- **Archive extraction**: Extract files from archives for processing +- **Backup operations**: Create archives of processed data for backup +- **Format conversion**: Convert between archive formats +- **Multi-file handling**: Process multiple files as a single archive unit diff --git a/internal/pkg/pipeline/task/archive/archive.go b/internal/pkg/pipeline/task/archive/archive.go new file mode 100644 index 0000000..f093e58 --- /dev/null +++ b/internal/pkg/pipeline/task/archive/archive.go @@ -0,0 +1,103 @@ +package archive + +import ( + "fmt" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type actionType string + +const ( + actionPack actionType = `pack` + actionUnpack actionType = `unpack` +) + +const ( + defaultFormat = `zip` + defaultAction = `pack` +) + +type archiver interface { + Read() + Write() +} + +type channelStruct struct { + InputChan <-chan *record.Record + OutputChan chan<- *record.Record +} + +var ( + supportedFormats = map[string]func(bs *task.Base, chStruct *channelStruct) archiver{ + "zip": func(bs *task.Base, ch *channelStruct) archiver { + return &zipArchive{ + Base: bs, + channelStruct: ch, + } + }, + "tar": func(bs *task.Base, ch *channelStruct) archiver { + return &tarArchive{ + Base: bs, + channelStruct: ch, + } + }, + } + + supportedActions = map[string]func(archiver) func(){ + `pack`: func(a archiver) func() { return a.Write }, + `unpack`: func(a archiver) func() { return a.Read }, + } +) + +type core struct { + task.Base `yaml:",inline" json:",inline"` + Format string `yaml:"format,omitempty" json:"format,omitempty"` + Action actionType `yaml:"action,omitempty" json:"action,omitempty"` +} + +func New() (task.Task, error) { + return &core{ + Format: defaultFormat, + Action: defaultAction, + }, nil +} + +func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error { + type raw core + obj := raw{ + Format: defaultFormat, + Action: defaultAction, + } + if err := unmarshal(&obj); err != nil { + return err + } + + if obj.Action != actionPack && obj.Action != actionUnpack { + return fmt.Errorf("invalid action: %s (must be 'pack' or 'unpack')", obj.Action) + } + + *c = core(obj) + + return nil +} + +func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) { + + if input == nil { + return task.ErrNilInput + } + + var archiv archiver + + archiv = supportedFormats[c.Format](&c.Base, &channelStruct{ + InputChan: input, + OutputChan: output, + }) + + actionFunc := supportedActions[string(c.Action)](archiv) + actionFunc() + + return nil +} diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go new file mode 100644 index 0000000..2e9a194 --- /dev/null +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -0,0 +1,108 @@ +package archive + +import ( + "archive/tar" + "bytes" + "io" + "log" + "path/filepath" + "strings" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type tarArchive struct { + *task.Base + *channelStruct +} + +func (t *tarArchive) Read() { + + for { + rc, ok := t.GetRecord(t.InputChan) + if !ok { + break + } + + if len(rc.Data) == 0 { + continue + } + + b := rc.Data + + r := tar.NewReader(bytes.NewReader(b)) + + for { + header, err := r.Next() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + + // check the file type is regular file + if header.Typeflag == tar.TypeReg { + buf := make([]byte, header.Size) + if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { + log.Fatal(err) + } + rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(header.Name)) + t.SendData(rc.Context, buf, t.OutputChan) + } + + } + } +} + +func (t *tarArchive) Write() { + + var buf bytes.Buffer + tw := tar.NewWriter(&buf) + var rc record.Record + + for { + rec, ok := t.GetRecord(t.InputChan) + if !ok { + break + } + b := rec.Data + + if len(b) == 0 { + continue + } + + filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite)) + if !found { + log.Fatal("filepath not set in context") + } + + if filePath == "" { + log.Fatal("empty filepath in context") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + header := &tar.Header{ + Name: filePath, + Mode: 0600, + Size: int64(len(b)), + } + if err := tw.WriteHeader(header); err != nil { + log.Fatal(err) + } + + if _, err := tw.Write(b); err != nil { + log.Fatal(err) + } + + rc.Context = rec.Context + } + + if err := tw.Close(); err != nil { + log.Fatal(err) + } + + t.SendData(rc.Context, buf.Bytes(), t.OutputChan) +} diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go new file mode 100644 index 0000000..53b10bd --- /dev/null +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -0,0 +1,106 @@ +package archive + +import ( + "archive/zip" + "bytes" + "io" + "log" + "path/filepath" + "strings" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type zipArchive struct { + *task.Base + *channelStruct +} + +func (z *zipArchive) Read() { + for { + rc, ok := z.GetRecord(z.InputChan) + if !ok { + break + } + + if len(rc.Data) == 0 { + continue + } + + b := rc.Data + + r, err := zip.NewReader(bytes.NewReader(b), int64(len(b))) + if err != nil { + log.Fatal(err) + } + for _, f := range r.File { + + // check the file type is regular file + if f.FileInfo().Mode().IsRegular() { + + rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(f.Name)) + + fs, err := f.Open() + if err != nil { + log.Fatal(err) + } + + buf := make([]byte, f.FileInfo().Size()) + + _, err = fs.Read(buf) + if err != nil && err != io.EOF { + log.Fatal(err) + } + + fs.Close() + + z.SendData(rc.Context, buf, z.OutputChan) + } + } + } +} + +func (z *zipArchive) Write() { + + zipBuf := new(bytes.Buffer) + zipWriter := zip.NewWriter(zipBuf) + var rc record.Record + + for { + rec, ok := z.GetRecord(z.InputChan) + if !ok { + break + } + + filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite)) + if !found { + log.Fatal("filepath not set in context") + } + + if filePath == "" { + log.Fatal("empty filepath in context") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + w, err := zipWriter.Create(filePath) + if err != nil { + log.Fatal(err) + } + _, err = w.Write(rec.Data) + if err != nil { + log.Fatal(err) + } + + rc.Context = rec.Context + } + + if err := zipWriter.Close(); err != nil { + log.Fatal(err) + } + + // Send the complete ZIP archive + z.SendData(rc.Context, zipBuf.Bytes(), z.OutputChan) + +} diff --git a/internal/pkg/pipeline/task/compress/formats.go b/internal/pkg/pipeline/task/compress/formats.go index 5424d0f..8ca7b5a 100644 --- a/internal/pkg/pipeline/task/compress/formats.go +++ b/internal/pkg/pipeline/task/compress/formats.go @@ -26,7 +26,7 @@ var ( return io.NopCloser(snappy.NewReader(r)), nil }, NewWriter: func(w io.Writer) io.WriteCloser { - return snappy.NewWriter(w) + return snappy.NewBufferedWriter(w) }}, } ) diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 50caed6..722e315 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -5,7 +5,9 @@ import ( "context" "fmt" "io" + "log" "net/url" + "path/filepath" "strings" "github.com/patterninc/caterpillar/internal/pkg/config" @@ -134,6 +136,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} + rc.SetContextValue(string(task.CtxKeyFileNameWrite), filepath.Base(path)) // let's write content to output channel f.SendData(rc.Context, content, output) @@ -168,11 +171,25 @@ func (f *file) writeFile(input <-chan *record.Record) error { pathScheme = fileScheme } + var fs file + + fs = *f + filePath, found := rc.GetContextValue(string(task.CtxKeyArchiveFileNameWrite)) + if found { + if filePath == "" { + log.Fatal("required file path") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + fs.Path = f.Path + config.String(filePath) + } + writerFunction, found := writers[pathScheme] if !found { return unknownSchemeError(pathScheme) } - if err := writerFunction(f, rc, bytes.NewReader(rc.Data)); err != nil { + if err := writerFunction(&fs, rc, bytes.NewReader(rc.Data)); err != nil { return err } } diff --git a/internal/pkg/pipeline/task/file/local.go b/internal/pkg/pipeline/task/file/local.go index f24bac9..0ee232a 100644 --- a/internal/pkg/pipeline/task/file/local.go +++ b/internal/pkg/pipeline/task/file/local.go @@ -3,6 +3,7 @@ package file import ( "io" "os" + "path/filepath" "strings" "github.com/bmatcuk/doublestar" @@ -52,6 +53,12 @@ func writeLocalFile(f *file, rec *record.Record, reader io.Reader) error { return err } + dir := filepath.Dir(path) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + outputFile, err := os.Create((path[getPathIndex(path):])) if err != nil { return err diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 688b3c0..749c36b 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -21,6 +21,13 @@ var ( ErrPresentInputOutput = fmt.Errorf(`either input or output must be set, not both`) ) +type contextKeyFile string + +const ( + CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE" + CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" +) + type Task interface { Run(<-chan *record.Record, chan<- *record.Record) error GetName() string diff --git a/internal/pkg/pipeline/tasks.go b/internal/pkg/pipeline/tasks.go index 3098816..f54575d 100644 --- a/internal/pkg/pipeline/tasks.go +++ b/internal/pkg/pipeline/tasks.go @@ -7,6 +7,7 @@ import ( "gopkg.in/yaml.v3" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/archive" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/aws/parameter_store" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/compress" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/converter" @@ -33,6 +34,7 @@ type tasks []task.Task var ( validate = validator.New() supportedTasks = map[string]func() (task.Task, error){ + `archive`: archive.New, `aws_parameter_store`: parameter_store.New, `compress`: compress.New, `converter`: converter.New, diff --git a/test/pipelines/birds.txt b/test/pipelines/birds.txt new file mode 100644 index 0000000..a4d4284 --- /dev/null +++ b/test/pipelines/birds.txt @@ -0,0 +1,124 @@ +Albatross +Acorn Woodpecker +American Kestrel +Anna's Hummingbird +Bald Eagle +Baltimore Oriole +Barn Swallow +Belted Kingfisher +Bicolored Antbird +Black Capped Chickadee +Black Skimmer +Blue Jay +Bluebird +Bobolink +Bohemian Waxwing +Brown Creeper +Brown Pelican +Burrowing Owl +California Condor +California Quail +Canada Goose +Cardinal +Caspian Tern +Cedar Waxwing +Chestnut Sided Warbler +Chimney Swift +Chipping Sparrow +Clark's Nutcracker +Clay Colored Sparrow +Cliff Swallow +Columbiformes +Common Eider +Common Goldeneye +Common Grackle +Common Loon +Common Merganser +Common Raven +Common Tern +Common Yellowthroat +Coopers Hawk +Cory's Shearwater +Crested Flycatcher +Curve Billed Thrasher +Dark Eyed Junco +Dickcissel +Dovekie +Downy Woodpecker +Drab Seedeater +Dunnock +Eastern Bluebird +Eastern Meadowlark +Eastern Phoebe +Eastern Screech Owl +Eastern Towhee +Eastern Wood Pewee +Eared Grebe +Egyptian Plover +Elanus leucurus +Evening Grosbeak +Eared Quetzal +Eurasian Wigeon +European Starling +Fabulous Flamingo +Ferruginous Hawk +Fiscal Flycatcher +Flammulated Owl +Flatbill +Flesh Footed Shearwater +Florida Jay +Fringilla coelebs +Fulmar +Gadwall +Gambel's Quail +Gannet +Garden Warbler +Gnatcatcher +Godwit +Golden Eagle +Golden Winged Warbler +Goldeneye +Goldfinch +Goosander +Goshawk +Grace's Warbler +Grasshopper Sparrow +Gray Catbird +Great Black Backed Gull +Great Blue Heron +Great Crested Flycatcher +Great Horned Owl +Great Kiskadee +Great Spotted Woodpecker +Great Tit +Grebe +Greenbul +Green Heron +Green Tailed Towhee +Green Winged Teal +Greenlet +Grey Kingbird +Grey Owl +Grosbeaks +Grouse +Gull +Hairy Woodpecker +Hammond's Flycatcher +Harris Hawk +Harris Sparrow +Hawaiian Creeper +Hawaiian Goose +Hawfinch +Heathland Francolin +Herring Gull +Hoary Puffleg +Hooded Merganser +Hooded Oriole +Hooded Warbler +Hoopoe +Horned Auk +Horned Grebe +Horned Lark +House Finch +House Sparrow +House Wren \ No newline at end of file diff --git a/test/pipelines/planets.tar.gz b/test/pipelines/planets.tar.gz new file mode 100644 index 0000000..756a4e8 Binary files /dev/null and b/test/pipelines/planets.tar.gz differ diff --git a/test/pipelines/tar_unpack_multifile_test.yaml b/test/pipelines/tar_unpack_multifile_test.yaml new file mode 100644 index 0000000..0a4ad03 --- /dev/null +++ b/test/pipelines/tar_unpack_multifile_test.yaml @@ -0,0 +1,15 @@ +tasks: + - name: planet_tar_file + type: file + path: test/pipelines/planets.tar.gz + - name: decompress_gzip + type: compress + format: gzip + action: decompress + - name: unpack_planets_file + type: archive + format: tar + action: unpack + - name: store_unpacked_file + type: file + path: unpacked_planets/ \ No newline at end of file diff --git a/test/pipelines/zip_pack_test.yaml b/test/pipelines/zip_pack_test.yaml new file mode 100644 index 0000000..8fa8fd2 --- /dev/null +++ b/test/pipelines/zip_pack_test.yaml @@ -0,0 +1,9 @@ +tasks: + - name: birds_file + type: file + path: test/pipelines/birds.txt + - name: pack_file + type: archive + - name: store_packed_file + type: file + path: packed_birds.zip \ No newline at end of file diff --git a/test/pipelines/zip_unpack_test.yaml b/test/pipelines/zip_unpack_test.yaml new file mode 100644 index 0000000..b95f4d8 --- /dev/null +++ b/test/pipelines/zip_unpack_test.yaml @@ -0,0 +1,10 @@ +tasks: + - name: birds_zip_file + type: file + path: test/pipelines/packed_birds.zip + - name: unpack_birds_file + type: archive + action: unpack + - name: store_unpacked_file + type: file + path: unpacked_birds.txt \ No newline at end of file