From 75badd890b88088da94baa7f19876588f97e9e0c Mon Sep 17 00:00:00 2001 From: Mahesh Date: Wed, 28 Jan 2026 12:38:38 +0530 Subject: [PATCH 01/18] Support for ZIP file compression and decompression. --- .../pkg/pipeline/task/compress/formats.go | 11 +- test/pipelines/birds.txt | 124 ++++++++++++++++++ test/pipelines/zip_compress_test.yaml | 10 ++ test/pipelines/zip_decompress_test.yaml | 11 ++ 4 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 test/pipelines/birds.txt create mode 100644 test/pipelines/zip_compress_test.yaml create mode 100644 test/pipelines/zip_decompress_test.yaml diff --git a/internal/pkg/pipeline/task/compress/formats.go b/internal/pkg/pipeline/task/compress/formats.go index 5424d0f..f781003 100644 --- a/internal/pkg/pipeline/task/compress/formats.go +++ b/internal/pkg/pipeline/task/compress/formats.go @@ -1,6 +1,7 @@ package compress import ( + "compress/flate" "compress/gzip" "io" @@ -26,7 +27,15 @@ var ( return io.NopCloser(snappy.NewReader(r)), nil }, NewWriter: func(w io.Writer) io.WriteCloser { - return snappy.NewWriter(w) + return snappy.NewBufferedWriter(w) + }}, + `zip`: { + NewReader: func(r io.Reader) (io.ReadCloser, error) { + return flate.NewReader(r), nil + }, + NewWriter: func(w io.Writer) io.WriteCloser { + writer, _ := flate.NewWriter(w, flate.DefaultCompression) + return writer }}, } ) diff --git a/test/pipelines/birds.txt b/test/pipelines/birds.txt new file mode 100644 index 0000000..b103723 --- /dev/null +++ b/test/pipelines/birds.txt @@ -0,0 +1,124 @@ +Albatross +Acorn Woodpecker +American Kestrel +Anna's Hummingbird +Bald Eagle +Baltimore Oriole +Barn Swallow +Belted Kingfisher +Bicolored Antbird +Black Capped Chickadee +Black Skimmer +Blue Jay +Bluebird +Bobolink +Bohemian Waxwing +Brown Creeper +Brown Pelican +Burrowing Owl +California Condor +California Quail +Canada Goose +Cardinal +Caspian Tern +Cedar Waxwing +Chestnut Sided Warbler +Chimney Swift +Chipping Sparrow +Clark's Nutcracker +Clay Colored Sparrow +Cliff Swallow +Columbiformes +Common Eider +Common Goldeneye +Common Grackle +Common Loon +Common Merganser +Common Raven +Common Tern +Common Yellowthroat +Coopers Hawk +Cory's Shearwater +Crested Flycatcher +Curve Billed Thrasher +Dark Eyed Junco +Dickcissel +Dovekie +Downy Woodpecker +Drab Seedeater +Dunnock +Eastern Bluebird +Eastern Meadowlark +Eastern Phoebe +Eastern Screech Owl +Eastern Towhee +Eastern Wood Pewee +Eared Grebe +Egyptian Plover +Elanus leucurus +Evening Grosbeak +Eared Quetzal +Eurasian Wigeon +Eurpean Starling +Fabulous Flamingo +Ferruginous Hawk +Fiscal Flycatcher +Flammulated Owl +Flatbill +Flesh Footed Shearwater +Florida Jay +Fringilla coelebs +Fulmar +Gadwall +Gambel's Quail +Gannet +Garden Warbler +Gnatcatcher +Godwit +Golden Eagle +Golden Winged Warbler +Goldeneye +Goldfinch +Goosander +Goshawk +Grace's Warbler +Grasshopper Sparrow +Gray Catbird +Great Black Backed Gull +Great Blue Heron +Great Crested Flycatcher +Great Horned Owl +Great Kiskadee +Great Spotted Woodpecker +Great Tit +Grebe +Greenbul +Green Heron +Green Tailed Towhee +Green Winged Teal +Greenlet +Grey Kingbird +Grey Owl +Grosbeaks +Grouse +Gull +Hairy Woodpecker +Hammond's Flycatcher +Harris Hawk +Harris Sparrow +Hawaiian Creeper +Hawaiian Goose +Hawfinch +Heathland Francolin +Herring Gull +Hoary Puffleg +Hooded Merganser +Hooded Oriole +Hooded Warbler +Hoopoe +Horned Auk +Horned Grebe +Horned Lark +House Finch +House Sparrow +House Wren \ No newline at end of file diff --git a/test/pipelines/zip_compress_test.yaml b/test/pipelines/zip_compress_test.yaml new file mode 100644 index 0000000..d58fc7b --- /dev/null +++ b/test/pipelines/zip_compress_test.yaml @@ -0,0 +1,10 @@ +tasks: + - name: pull_names_from_s3 + type: file + path: test/pipelines/birds.txt + - name: compress_file + type: compress + format: zip + - name: store_compressed_file + type: file + path: compressed_birds.zip \ No newline at end of file diff --git a/test/pipelines/zip_decompress_test.yaml b/test/pipelines/zip_decompress_test.yaml new file mode 100644 index 0000000..4bb572e --- /dev/null +++ b/test/pipelines/zip_decompress_test.yaml @@ -0,0 +1,11 @@ +tasks: + - name: pull_names_from_s3 + type: file + path: test/pipelines/compressed_birds.zip + - name: compress_file + type: compress + format: zip + action: decompress + - name: store_compressed_file + type: file + path: uncompressed_birds.txt \ No newline at end of file From a52ae4fe10b871a4cfeee58ca1052082812a2d67 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Thu, 29 Jan 2026 21:33:55 +0530 Subject: [PATCH 02/18] Support for tar and zip archive --- internal/pkg/pipeline/task/archive/archive.go | 111 ++++++++++++++++++ internal/pkg/pipeline/task/archive/tar.go | 65 ++++++++++ internal/pkg/pipeline/task/archive/zip.go | 66 +++++++++++ internal/pkg/pipeline/tasks.go | 2 + test/pipelines/zip_compress_test.yaml | 12 +- test/pipelines/zip_decompress_test.yaml | 15 ++- 6 files changed, 257 insertions(+), 14 deletions(-) create mode 100644 internal/pkg/pipeline/task/archive/archive.go create mode 100644 internal/pkg/pipeline/task/archive/tar.go create mode 100644 internal/pkg/pipeline/task/archive/zip.go diff --git a/internal/pkg/pipeline/task/archive/archive.go b/internal/pkg/pipeline/task/archive/archive.go new file mode 100644 index 0000000..8e8791c --- /dev/null +++ b/internal/pkg/pipeline/task/archive/archive.go @@ -0,0 +1,111 @@ +package archive + +import ( + "fmt" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type actionType string + +const ( + actionCompress actionType = `pack` + actionDecompress actionType = `unpack` +) + +const ( + defaultFormat = `zip` + defaultAction = `pack` +) + +type archiver interface { + Read(b []byte) + Write(b []byte) +} + +type core struct { + task.Base `yaml:",inline" json:",inline"` + Format string `yaml:"format,omitempty" json:"format,omitempty"` + Action actionType `yaml:"action,omitempty" json:"action,omitempty"` + FileName string `yaml:"file_name,omitempty" json:"file_name,omitempty"` +} + +func New() (task.Task, error) { + return &core{ + Format: defaultFormat, + Action: defaultAction, + }, nil +} + +func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error { + type raw core + obj := raw{ + Format: defaultFormat, + Action: defaultAction, + } + if err := unmarshal(&obj); err != nil { + return err + } + + if obj.Action != actionCompress && obj.Action != actionDecompress { + return fmt.Errorf("invalid action: %s (must be 'compress' or 'decompress')", obj.Action) + } + + if obj.Action == actionCompress { + if obj.FileName == "" { + return fmt.Errorf("file_name must be specified when action is 'pack'") + } + } + + *c = core(obj) + + return nil +} + +func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (err error) { + + if input == nil { + return task.ErrNilInput + } + + for { + r, ok := c.GetRecord(input) + if !ok { + break + } + + if len(r.Data) == 0 { + continue + } + + var archiv archiver + + switch c.Format { + case "tar": + archiv = &tarArchive{ + Base: &c.Base, + FileName: c.FileName, + Record: r, + OutputChan: output, + } + case "zip": + archiv = &zipArchive{ + Base: &c.Base, + FileName: c.FileName, + Record: r, + OutputChan: output, + } + default: + return fmt.Errorf("unsupported format: %s", c.Format) + } + + switch c.Action { + case actionCompress: + archiv.Write(r.Data) + case actionDecompress: + archiv.Read(r.Data) + } + } + return nil +} diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go new file mode 100644 index 0000000..202fbdb --- /dev/null +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -0,0 +1,65 @@ +package archive + +import ( + "archive/tar" + "bytes" + "io" + "log" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type tarArchive struct { + *task.Base + FileName string + Record *record.Record + OutputChan chan<- *record.Record +} + +func (t *tarArchive) Read(b []byte) { + r := tar.NewReader(bytes.NewReader(b)) + + for { + header, err := r.Next() + if err != nil { + break + } + + // check the file type is regular file + if header.Typeflag == tar.TypeReg { + buf := make([]byte, header.Size) + if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { + log.Fatal(err) + } + t.SendData(t.Record.Context, buf, t.OutputChan) + } + + } +} + +func (t *tarArchive) Write(b []byte) { + + if t.FileName == "" { + log.Fatal("file name is required to create tar archive") + } + + tw := tar.NewWriter(bytes.NewBuffer(b)) + + tw.WriteHeader( + &tar.Header{Name: t.FileName, + Mode: 0777, + Size: int64(len(b))}) + + _, err := tw.Write(b) + if err != nil { + log.Fatal(err) + } + + err = tw.Close() + if err != nil { + log.Fatal(err) + } + + t.SendData(t.Record.Context, b, t.OutputChan) +} diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go new file mode 100644 index 0000000..1c3b2b9 --- /dev/null +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -0,0 +1,66 @@ +package archive + +import ( + "archive/zip" + "bytes" + "errors" + "io" + "log" + + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type zipArchive struct { + *task.Base + FileName string + Record *record.Record + OutputChan chan<- *record.Record +} + +func (z *zipArchive) Read(b []byte) { + r, err := zip.NewReader(bytes.NewReader(b), int64(len(b))) + if err != nil { + log.Fatal(err) + } + for _, f := range r.File { + + // check the file type is regular file + if f.FileInfo().Mode().IsRegular() { + + rc, err := f.Open() + if err != nil { + log.Fatal(err) + } + + buf := make([]byte, f.FileInfo().Size()) + + _, err = rc.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + log.Fatal(err) + } + + rc.Close() + + z.SendData(z.Record.Context, buf, z.OutputChan) + } + } +} + +func (z *zipArchive) Write(b []byte) { + + zipBuf := new(bytes.Buffer) + zipWriter := zip.NewWriter(zipBuf) + + if z.FileName == "" { + log.Fatal("file name is required to create zip archive") + } + + w, _ := zipWriter.Create(z.FileName) + w.Write(b) + + zipWriter.Close() + + z.SendData(z.Record.Context, zipBuf.Bytes(), z.OutputChan) + +} diff --git a/internal/pkg/pipeline/tasks.go b/internal/pkg/pipeline/tasks.go index 3098816..f54575d 100644 --- a/internal/pkg/pipeline/tasks.go +++ b/internal/pkg/pipeline/tasks.go @@ -7,6 +7,7 @@ import ( "gopkg.in/yaml.v3" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/archive" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/aws/parameter_store" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/compress" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/converter" @@ -33,6 +34,7 @@ type tasks []task.Task var ( validate = validator.New() supportedTasks = map[string]func() (task.Task, error){ + `archive`: archive.New, `aws_parameter_store`: parameter_store.New, `compress`: compress.New, `converter`: converter.New, diff --git a/test/pipelines/zip_compress_test.yaml b/test/pipelines/zip_compress_test.yaml index d58fc7b..a75418d 100644 --- a/test/pipelines/zip_compress_test.yaml +++ b/test/pipelines/zip_compress_test.yaml @@ -1,10 +1,10 @@ tasks: - - name: pull_names_from_s3 + - name: birds_file type: file path: test/pipelines/birds.txt - - name: compress_file - type: compress - format: zip - - name: store_compressed_file + - name: pack_file + type: archive + file_name: birds.txt + - name: store_packed_file type: file - path: compressed_birds.zip \ No newline at end of file + path: packed_birds.zip \ No newline at end of file diff --git a/test/pipelines/zip_decompress_test.yaml b/test/pipelines/zip_decompress_test.yaml index 4bb572e..fff1325 100644 --- a/test/pipelines/zip_decompress_test.yaml +++ b/test/pipelines/zip_decompress_test.yaml @@ -1,11 +1,10 @@ tasks: - - name: pull_names_from_s3 + - name: birds_zip_file type: file - path: test/pipelines/compressed_birds.zip - - name: compress_file - type: compress - format: zip - action: decompress - - name: store_compressed_file + path: test/pipelines/packed_birds.zip + - name: unpack_birds_file + type: archive + action: unpack + - name: store_unpacked_file type: file - path: uncompressed_birds.txt \ No newline at end of file + path: unpacked_birds.txt \ No newline at end of file From e735358c7c0d7a0782bda644df68d00cd29ef0bc Mon Sep 17 00:00:00 2001 From: Mahesh Date: Thu, 29 Jan 2026 21:34:57 +0530 Subject: [PATCH 03/18] refactor: remove zip format support from compression handlers --- internal/pkg/pipeline/task/compress/formats.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/internal/pkg/pipeline/task/compress/formats.go b/internal/pkg/pipeline/task/compress/formats.go index f781003..8ca7b5a 100644 --- a/internal/pkg/pipeline/task/compress/formats.go +++ b/internal/pkg/pipeline/task/compress/formats.go @@ -1,7 +1,6 @@ package compress import ( - "compress/flate" "compress/gzip" "io" @@ -29,13 +28,5 @@ var ( NewWriter: func(w io.Writer) io.WriteCloser { return snappy.NewBufferedWriter(w) }}, - `zip`: { - NewReader: func(r io.Reader) (io.ReadCloser, error) { - return flate.NewReader(r), nil - }, - NewWriter: func(w io.Writer) io.WriteCloser { - writer, _ := flate.NewWriter(w, flate.DefaultCompression) - return writer - }}, } ) From 1e654d246671fa4da24a2e4923f3d84702406efe Mon Sep 17 00:00:00 2001 From: Mahesh Date: Thu, 29 Jan 2026 21:39:11 +0530 Subject: [PATCH 04/18] feat: add zip packing and unpacking test pipelines --- test/pipelines/{zip_compress_test.yaml => zip_pack_test.yaml} | 0 test/pipelines/{zip_decompress_test.yaml => zip_unpack_test.yaml} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename test/pipelines/{zip_compress_test.yaml => zip_pack_test.yaml} (100%) rename test/pipelines/{zip_decompress_test.yaml => zip_unpack_test.yaml} (100%) diff --git a/test/pipelines/zip_compress_test.yaml b/test/pipelines/zip_pack_test.yaml similarity index 100% rename from test/pipelines/zip_compress_test.yaml rename to test/pipelines/zip_pack_test.yaml diff --git a/test/pipelines/zip_decompress_test.yaml b/test/pipelines/zip_unpack_test.yaml similarity index 100% rename from test/pipelines/zip_decompress_test.yaml rename to test/pipelines/zip_unpack_test.yaml From 073193a20aa351ee9d29623f6e0a3e67f9e39ef1 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 11:39:01 +0530 Subject: [PATCH 05/18] fix: correct tar archive writing logic and buffer handling --- internal/pkg/pipeline/task/archive/tar.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index 202fbdb..1b7581b 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -44,22 +44,25 @@ func (t *tarArchive) Write(b []byte) { log.Fatal("file name is required to create tar archive") } - tw := tar.NewWriter(bytes.NewBuffer(b)) + var buf bytes.Buffer + tw := tar.NewWriter(&buf) - tw.WriteHeader( - &tar.Header{Name: t.FileName, - Mode: 0777, - Size: int64(len(b))}) + header := &tar.Header{ + Name: t.FileName, + Mode: 0600, + Size: int64(len(b)), + } + if err := tw.WriteHeader(header); err != nil { + log.Fatal(err) + } - _, err := tw.Write(b) - if err != nil { + if _, err := tw.Write(b); err != nil { log.Fatal(err) } - err = tw.Close() - if err != nil { + if err := tw.Close(); err != nil { log.Fatal(err) } - t.SendData(t.Record.Context, b, t.OutputChan) + t.SendData(t.Record.Context, buf.Bytes(), t.OutputChan) } From 9e0cf2b7d30ddd38554f8cb246e639112de1d5ae Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 12:03:41 +0530 Subject: [PATCH 06/18] test file for the tar with multi file output --- internal/pkg/pipeline/task/archive/tar.go | 2 +- test/pipelines/packed_birds.tar | Bin 0 -> 3584 bytes test/pipelines/planets.tar.gz | Bin 0 -> 1245 bytes test/pipelines/tar_unpack_multifile_test.yaml | 17 +++++++++++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 test/pipelines/packed_birds.tar create mode 100644 test/pipelines/planets.tar.gz create mode 100644 test/pipelines/tar_unpack_multifile_test.yaml diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index 1b7581b..b1cd9f5 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -22,7 +22,7 @@ func (t *tarArchive) Read(b []byte) { for { header, err := r.Next() - if err != nil { + if err != nil || err != io.EOF { break } diff --git a/test/pipelines/packed_birds.tar b/test/pipelines/packed_birds.tar new file mode 100644 index 0000000000000000000000000000000000000000..f179b0d564748226e8344e7674a3c2e12bcd6347 GIT binary patch literal 3584 zcmeHIO>-MJ4E346g16o}c9ZE{lJzlh(m0i7JUL=_AuUH-a)(@cqyG9na6hDWddf`F zi*@i4AV2^Q4{%{Z6@Tr%b$37RcaPui_xE@5#qZzWckdtGO}gR!;o<)MZ+H6s$E9?A zImM0w!e2Gtcgm&R6!(3v{B*B{EvO4VMztt?aQemjs^#*`K`j~%rX;7IICg=pS~y3) zM7>K*W8A3#bJUV-rPp+_EXU3?KCu2AjL!W6tjw}5H$D@fgo?n{MTbNgR z;kw~5OXXRwsBNKK)uudC#dtWY_h-`}AxMe!2fF0L;N}Z&jXSHQuemWu_=Udp2&9(5 z_fD?@b1R-%dS)v{sbva~VWmI!R;|dIBkDJ#SKd`V+-klgGGgMWBE9uKGK5eWN1~0b zggkI?YQ+_WIr^#wZZ36tZz>e@LZPro1BEq?FF?i|JCWK}ve>sI38|H(a0beMq^=CW zLp&^9kk$a!)y^D`GrYJ>O(7*T9M#Gij3Y@_aj#I#_;X>26mCPpHLv;0%ILe})h)C}UqUg`PTjD*z7wv9Xtux>7i> zO5v6D(pU@qp$?Rp@dyO;`U2I5S3VoYw|AFYvOWeX^q#rm9P2ST z=gYHN6GDQ9!kSE$N%4uP^1Wa^+dS8v3!cS&3Eq@-rpM%U@O=$NPNEEpe&#-_gvr~G zUpQUbPDt@=k#10HOD;uiIh84-sMY{eTE4~H6-<(WIDbjp{RK*`6L82hHYY|?Ah(Ry z_E=Ql`AKc4NY>-UjU|~xuQnV)IvFR*jQ+;NlI$(iV%sDObgEnuWK5m4YGXOpdgHy^ z+_Y;0z8Mf+W^EG?2F9dI&z6g*Hpw;=)Rrn4aD1DBE&4so%9fnt4%z_D&CK6Aw0-RS z)>plOMP}u=K}O*Pv5?9%FIVEq>1bS8!_7x>!tSk)wa{$~HZ$VSUcfNc9;+nGqXiqx zk(7%ABElNh%rj*fKi~fPYZ@2YH8!9k#{@>7aNraoAgfxNm(Y$lfg0 z;o>0WXTIPsJG4|pz%_@(`GCy>=aF~A{fN(j!IiAB0(HPjk?lGacgbCzC>CJ=ar%u=X5+;KB*lB3guq+&!`yxqm0tjV~DM1IOPs3(s?=p3=Hz9|0pxQlj8Lz elV>>3rR1BJ!0xC2?|lpShQJ#FZwUOK2>cD0&y&pn literal 0 HcmV?d00001 diff --git a/test/pipelines/planets.tar.gz b/test/pipelines/planets.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..756a4e8ad416c3bc98ca8e260face1d601b66940 GIT binary patch literal 1245 zcmV<31S0z%iwFQbOnhko1MQs6kK05Pz*8hZgfpDyzyUJ(M`QGLx(8D1J3G zuE*JjU0%N9brv55-Ch1)c>mK6Tko}6-$s4({R{L9S~A<$`p9n7Zm;b&yZ!riFWEgm zf4;mrd&X~e`?&Cs2dn*`^piM<#^a$0#%c1#jG|HBv>w>^`{1)b{`yCJwSx}{tmB91 z=BbG;O!~0Cd0=DwV^xTMNeOGAn;MR#M8o5sY7}RK*gPUiY9*KyIN+KQ#R+Q*g`S;# z-F^P}>DkZU1UFHZrGYD2A9a5`>vp>@uV1HczWnYyY;%oX*e5-E`K~9u72@rEe+`Xg z8TAkAe2-1>KZ}>b`p<}HAyMZ+=L3d|{uc&JQObCTB?#crlgEF3tN-TsCrTC9 ze`^6ll0Xfi-tXW_aM1AMRuM+7tw8< zRYzbOR{n2i<>};5{GT%h@$Uuj|3*{x|J~06Rs3HP@P8k;{GStosb(q!|37;2_^)sE z-^TxWas8*1!20il`M9t5f9&#q9N0lBrT_%e8aQZ)0m=J{(`2W$9$A5jR|2Fpu;h^?w}{{9oXXqGkU-AzHZ-fRLkG6Qz&|oX~1yt~V zB?QF37r_4;P1*l>;(`(Qcl z%LL#_G`XJuXyHl#MA82VryU)-5&(j20)Rg650mKy%98OHrDVVkJ|qKsW99#`9fSSk z{|^7B4E)~@;Qx)L?Ekx;2R6n(F82Q_rJ(=E3+3}a!+?bZOQxax-vILXuW$9Q)c=nu zod4&A*|@K7|Dn$Ee<1GLzw9Wb9sU1m*8hLvYypz?PG|u-0N(O{llG_S?Sa()CG`LK z0sOzwIQ)O-^FXEgzXJdFfwTUPsaBdXsQ({7cm7}7>c36?U+VuC9M*p?EXVynp8>%1 z{TzUILRStz6q_XhoVxpfLP!Gk%*y{KvoRYTO8-BG{y#r}|2LYl|L=YtsNnx>_WK|B zfBiWB|3^zALKaf+|Dz|5|N2({O8q}fLjB(hi*aAq{zJHQ>Ax=i0zfG4*tx^u|9oTr z57p5Bvp-h;KQiNNIx+`R{}+(|`vLsF(Ukpv_w&H!`kyS<|Ka~Xy+A{k|5MC4{QFN6 z$m74h)xXmDzk Date: Fri, 30 Jan 2026 12:49:23 +0530 Subject: [PATCH 07/18] feat: add comprehensive README for archive task with ZIP and TAR support --- internal/pkg/pipeline/task/archive/README.md | 235 +++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 internal/pkg/pipeline/task/archive/README.md diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md new file mode 100644 index 0000000..dad7afc --- /dev/null +++ b/internal/pkg/pipeline/task/archive/README.md @@ -0,0 +1,235 @@ +# Archive Task + +The `archive` task pack and unpack file data in various archive formats (TAR, ZIP), enabling efficient data packaging and extraction within pipelines. + +## Function + +The archive task handles two primary operations: +- **Pack**: Creates archives from input data (e.g., create a ZIP or TAR file) +- **Unpack**: Extract archives to retrieve individual files + +## Behavior + +The archive task operates in two modes depending on the specified action: + +- **Pack mode** (`action: pack`): Takes input data records and creates an archive file. Each record's data is packaged into the specified archive format with the configured filename. The task outputs the complete archive data. + +- **Unpack mode** (`action: unpack`): Takes archive file data as input and extracts individual files. For each file found in the archive, the task outputs a separate record containing that file's data. + +## Configuration Fields + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `name` | string | - | Task name for identification | +| `type` | string | `archive` | Must be "archive" | +| `format` | string | `zip` | Archive format: `zip` or `tar` | +| `action` | string | `pack` | Operation to perform: `pack` or `unpack` | +| `file_name` | string | - | Name of the file within the archive (required for `pack` action) | + +### File Name Format + +The `file_name` field specifies how files are stored within archives. Different formats have specific requirements: + +#### ZIP Archives +- **Paths**: Filenames can include directory paths, represented using forward slashes (/) + - Example: `docs/readme.txt` +- **Separators**: Only forward slashes (/) are allowed as folder separators, regardless of platform +- **Relative Paths**: Filenames must be relative (no drive letters like C: and no leading slash /) +- **Case Sensitivity**: ZIP stores filenames as is, but whether they are case-sensitive depends on the extraction platform +- **Directories**: End directory names with a trailing slash (/) to indicate a folder +- **Duplicates**: Duplicate names are allowed, but may cause confusion for some zip tools +- **Allowed Characters**: Supports Unicode, but stick to common, portable characters for best compatibility + +#### TAR Archives +- **Paths**: Filenames can include paths separated by forward slashes (/) + - Example: `src/main.c` +- **Relative and Absolute Paths**: Both relative (foo.txt) and absolute paths (/foo.txt) can technically be stored, but using relative paths is strongly recommended for portability and to avoid extraction issues +- **Case Sensitivity**: Tar files store names as is; case sensitivity depends on the underlying filesystem +- **Long Paths**: Traditional tar limits path length to 100 bytes for the filename, but modern tar formats (ustar, pax) allow longer names +- **Directories**: Represented as entries ending in a slash (/) +- **Duplicates**: Duplicate filenames are possible; later entries usually overwrite earlier ones on extraction +- **Allowed Characters**: Generally supports any characters, but best practice is to stick to ASCII (letters, digits, underscores, dashes, periods, slashes) for maximum compatibility + +## Supported Formats + +### ZIP +- **Extension**: `.zip` +- **Use case**: Cross-platform, widely supported compression format +- **Features**: Individual file pack, preserves file structure +- **Pack**: Creates a ZIP archive with single or multiple files +- **Unpack**: Extracts all regular files from ZIP archive + +### TAR +- **Extension**: `.tar` or `.tar.gz` +- **Use case**: Unix/Linux native format, streaming support +- **Features**: Preserves file metadata, supports packing (with gzip) +- **Pack**: Creates a TAR archive with file metadata +- **Unpack**: Extracts all regular files from TAR archive (including gzip-compressed) + +## Example Configurations + +### Pack a file into ZIP +```yaml +tasks: + - name: create_zip + type: archive + format: zip + action: pack + file_name: output.txt +``` + +### Unpack ZIP archive +```yaml +tasks: + - name: extract_zip + type: archive + format: zip + action: unpack +``` + +### Pack a file into TAR +```yaml +tasks: + - name: create_tar + type: archive + format: tar + action: pack + file_name: data.txt +``` + +### Unpack TAR.GZ archive +```yaml +tasks: + - name: extract_tar_gz + type: archive + format: tar + action: unpack +``` + +## Complete Pipeline Examples + +### Read files, compress to ZIP, write to file +```yaml +tasks: + - name: read_source + type: file + path: source/*.txt + + - name: compress_to_zip + type: archive + format: zip + action: pack + file_name: archive.txt + + - name: write_archive + type: file + path: output/archive.zip +``` + +### Extract TAR.GZ and write individual files +```yaml +tasks: + - name: read_archive + type: file + path: data.tar.gz + + - name: extract_files + type: archive + format: tar + action: unpack + + - name: write_extracted + type: file + path: /output/{{ context "filename" }} +``` + +### Multi-step compression pipeline +```yaml +tasks: + - name: read_data + type: file + path: test/pipelines/birds.txt + + - name: compress_zip + type: archive + format: zip + action: pack + file_name: birds.zip + + - name: decompress_zip + type: archive + format: zip + action: unpack + + - name: write_result + type: file + path: unpacked_birds/birds.txt +``` + +## Data Flow + +### Pack Operation +``` +Input Records + ↓ +[Record Data] → Archive Creation → [Archive Bytes] → Output +``` + +### Unpack Operation +``` +Input Records + ↓ +[Archive Bytes] → File Extraction → [File 1], [File 2], ... → Output +``` + +## Use Cases + +- **Data compression**: Reduce data size for transmission or storage +- **Data packaging**: Bundle multiple files into a single archive +- **Data extraction**: Process archived data within pipelines +- **Archive conversion**: Convert between ZIP and TAR formats +- **Backup workflows**: Create and manage compressed backups +- **Data distribution**: Package files for downstream consumption + +## Error Handling + +- **Missing file_name**: Throws error if `file_name` is not specified for `pack` action +- **Invalid format**: Throws error if format is not `zip` or `tar` +- **Invalid action**: Throws error if action is not `pack` or `unpack` +- **Corrupt archive**: May throw error when unpacking malformed archives +- **Empty data**: Skips processing of empty records + +## Technical Details + +### ZIP Format +- Uses Go's `archive/zip` package +- Supports standard ZIP compression +- Preserves file metadata (size, modification time) +- Regular files only (directories not included in unpacking) + +### TAR Format +- Uses Go's `archive/tar` package +- Supports raw TAR and gzip-compressed TAR files +- Automatic format detection for gzip compression +- Preserves tar header information +- Regular files only (directories and special files filtered) + +## Performance Considerations + +- **Memory usage**: Entire archive loaded into memory for processing +- **Compression ratio**: ZIP typically provides better compression than TAR alone +- **Processing speed**: TAR is generally faster than ZIP due to simpler format +- **Large files**: For very large archives, consider chunking or streaming approaches + +## Sample Pipelines + +- `test/pipelines/zip_pack_test.yaml` - Create ZIP archives +- `test/pipelines/zip_unpack_test.yaml` - Extract from ZIP archives +- `test/pipelines/tar_unpack_multifile_test.yaml` - Extract multiple files from TAR archive + +## Security Considerations + +- Archives are processed in-memory; ensure sufficient memory for large files +- ZIP bomb protection: Be cautious with untrusted archive sources +- Path traversal: Archive extraction validates file paths to prevent escaping base directory +- File permissions: TAR format supports Unix permissions; ZIP has limited permission support From 7c136db58a720fc70f5a09c6008fbf6471b3d07f Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 12:57:19 +0530 Subject: [PATCH 08/18] feat: add birds file and update zip pack/unpack test configurations --- birds_file.txt | 124 ++++++++++++++++++++++++++++ test/pipelines/packed_birds.tar | Bin 3584 -> 0 bytes test/pipelines/zip_pack_test.yaml | 2 +- test/pipelines/zip_unpack_test.yaml | 2 +- 4 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 birds_file.txt delete mode 100644 test/pipelines/packed_birds.tar diff --git a/birds_file.txt b/birds_file.txt new file mode 100644 index 0000000..a4d4284 --- /dev/null +++ b/birds_file.txt @@ -0,0 +1,124 @@ +Albatross +Acorn Woodpecker +American Kestrel +Anna's Hummingbird +Bald Eagle +Baltimore Oriole +Barn Swallow +Belted Kingfisher +Bicolored Antbird +Black Capped Chickadee +Black Skimmer +Blue Jay +Bluebird +Bobolink +Bohemian Waxwing +Brown Creeper +Brown Pelican +Burrowing Owl +California Condor +California Quail +Canada Goose +Cardinal +Caspian Tern +Cedar Waxwing +Chestnut Sided Warbler +Chimney Swift +Chipping Sparrow +Clark's Nutcracker +Clay Colored Sparrow +Cliff Swallow +Columbiformes +Common Eider +Common Goldeneye +Common Grackle +Common Loon +Common Merganser +Common Raven +Common Tern +Common Yellowthroat +Coopers Hawk +Cory's Shearwater +Crested Flycatcher +Curve Billed Thrasher +Dark Eyed Junco +Dickcissel +Dovekie +Downy Woodpecker +Drab Seedeater +Dunnock +Eastern Bluebird +Eastern Meadowlark +Eastern Phoebe +Eastern Screech Owl +Eastern Towhee +Eastern Wood Pewee +Eared Grebe +Egyptian Plover +Elanus leucurus +Evening Grosbeak +Eared Quetzal +Eurasian Wigeon +European Starling +Fabulous Flamingo +Ferruginous Hawk +Fiscal Flycatcher +Flammulated Owl +Flatbill +Flesh Footed Shearwater +Florida Jay +Fringilla coelebs +Fulmar +Gadwall +Gambel's Quail +Gannet +Garden Warbler +Gnatcatcher +Godwit +Golden Eagle +Golden Winged Warbler +Goldeneye +Goldfinch +Goosander +Goshawk +Grace's Warbler +Grasshopper Sparrow +Gray Catbird +Great Black Backed Gull +Great Blue Heron +Great Crested Flycatcher +Great Horned Owl +Great Kiskadee +Great Spotted Woodpecker +Great Tit +Grebe +Greenbul +Green Heron +Green Tailed Towhee +Green Winged Teal +Greenlet +Grey Kingbird +Grey Owl +Grosbeaks +Grouse +Gull +Hairy Woodpecker +Hammond's Flycatcher +Harris Hawk +Harris Sparrow +Hawaiian Creeper +Hawaiian Goose +Hawfinch +Heathland Francolin +Herring Gull +Hoary Puffleg +Hooded Merganser +Hooded Oriole +Hooded Warbler +Hoopoe +Horned Auk +Horned Grebe +Horned Lark +House Finch +House Sparrow +House Wren \ No newline at end of file diff --git a/test/pipelines/packed_birds.tar b/test/pipelines/packed_birds.tar deleted file mode 100644 index f179b0d564748226e8344e7674a3c2e12bcd6347..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3584 zcmeHIO>-MJ4E346g16o}c9ZE{lJzlh(m0i7JUL=_AuUH-a)(@cqyG9na6hDWddf`F zi*@i4AV2^Q4{%{Z6@Tr%b$37RcaPui_xE@5#qZzWckdtGO}gR!;o<)MZ+H6s$E9?A zImM0w!e2Gtcgm&R6!(3v{B*B{EvO4VMztt?aQemjs^#*`K`j~%rX;7IICg=pS~y3) zM7>K*W8A3#bJUV-rPp+_EXU3?KCu2AjL!W6tjw}5H$D@fgo?n{MTbNgR z;kw~5OXXRwsBNKK)uudC#dtWY_h-`}AxMe!2fF0L;N}Z&jXSHQuemWu_=Udp2&9(5 z_fD?@b1R-%dS)v{sbva~VWmI!R;|dIBkDJ#SKd`V+-klgGGgMWBE9uKGK5eWN1~0b zggkI?YQ+_WIr^#wZZ36tZz>e@LZPro1BEq?FF?i|JCWK}ve>sI38|H(a0beMq^=CW zLp&^9kk$a!)y^D`GrYJ>O(7*T9M#Gij3Y@_aj#I#_;X>26mCPpHLv;0%ILe})h)C}UqUg`PTjD*z7wv9Xtux>7i> zO5v6D(pU@qp$?Rp@dyO;`U2I5S3VoYw|AFYvOWeX^q#rm9P2ST z=gYHN6GDQ9!kSE$N%4uP^1Wa^+dS8v3!cS&3Eq@-rpM%U@O=$NPNEEpe&#-_gvr~G zUpQUbPDt@=k#10HOD;uiIh84-sMY{eTE4~H6-<(WIDbjp{RK*`6L82hHYY|?Ah(Ry z_E=Ql`AKc4NY>-UjU|~xuQnV)IvFR*jQ+;NlI$(iV%sDObgEnuWK5m4YGXOpdgHy^ z+_Y;0z8Mf+W^EG?2F9dI&z6g*Hpw;=)Rrn4aD1DBE&4so%9fnt4%z_D&CK6Aw0-RS z)>plOMP}u=K}O*Pv5?9%FIVEq>1bS8!_7x>!tSk)wa{$~HZ$VSUcfNc9;+nGqXiqx zk(7%ABElNh%rj*fKi~fPYZ@2YH8!9k#{@>7aNraoAgfxNm(Y$lfg0 z;o>0WXTIPsJG4|pz%_@(`GCy>=aF~A{fN(j!IiAB0(HPjk?lGacgbCzC>CJ=ar%u=X5+;KB*lB3guq+&!`yxqm0tjV~DM1IOPs3(s?=p3=Hz9|0pxQlj8Lz elV>>3rR1BJ!0xC2?|lpShQJ#FZwUOK2>cD0&y&pn diff --git a/test/pipelines/zip_pack_test.yaml b/test/pipelines/zip_pack_test.yaml index a75418d..8085352 100644 --- a/test/pipelines/zip_pack_test.yaml +++ b/test/pipelines/zip_pack_test.yaml @@ -6,5 +6,5 @@ tasks: type: archive file_name: birds.txt - name: store_packed_file - type: file + type: file path: packed_birds.zip \ No newline at end of file diff --git a/test/pipelines/zip_unpack_test.yaml b/test/pipelines/zip_unpack_test.yaml index fff1325..b95f4d8 100644 --- a/test/pipelines/zip_unpack_test.yaml +++ b/test/pipelines/zip_unpack_test.yaml @@ -6,5 +6,5 @@ tasks: type: archive action: unpack - name: store_unpacked_file - type: file + type: file path: unpacked_birds.txt \ No newline at end of file From 8496ea56aaaaddd1af02ccd212cab56e3b1154dd Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 13:05:52 +0530 Subject: [PATCH 09/18] refactor: rename extraction tasks for clarity and consistency in README --- internal/pkg/pipeline/task/archive/README.md | 26 +++++++++++--------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md index dad7afc..9b4a9a8 100644 --- a/internal/pkg/pipeline/task/archive/README.md +++ b/internal/pkg/pipeline/task/archive/README.md @@ -81,7 +81,7 @@ tasks: ### Unpack ZIP archive ```yaml tasks: - - name: extract_zip + - name: unpack_zip type: archive format: zip action: unpack @@ -100,7 +100,7 @@ tasks: ### Unpack TAR.GZ archive ```yaml tasks: - - name: extract_tar_gz + - name: unpack_tar_gz type: archive format: tar action: unpack @@ -108,14 +108,14 @@ tasks: ## Complete Pipeline Examples -### Read files, compress to ZIP, write to file +### Read files, pack to ZIP, write to file ```yaml tasks: - name: read_source type: file path: source/*.txt - - name: compress_to_zip + - name: pack_to_zip type: archive format: zip action: pack @@ -132,31 +132,36 @@ tasks: - name: read_archive type: file path: data.tar.gz + + - name: decompress_file + type: compress + format: gzip + action: decompress - - name: extract_files + - name: unpack_files type: archive format: tar action: unpack - - name: write_extracted + - name: write_unpacked type: file - path: /output/{{ context "filename" }} + path: /output/data.txt ``` -### Multi-step compression pipeline +### Multi-step packing pipeline ```yaml tasks: - name: read_data type: file path: test/pipelines/birds.txt - - name: compress_zip + - name: pack_zip type: archive format: zip action: pack file_name: birds.zip - - name: decompress_zip + - name: unpack_zip type: archive format: zip action: unpack @@ -184,7 +189,6 @@ Input Records ## Use Cases -- **Data compression**: Reduce data size for transmission or storage - **Data packaging**: Bundle multiple files into a single archive - **Data extraction**: Process archived data within pipelines - **Archive conversion**: Convert between ZIP and TAR formats From 297abdb27bf18c822cb7bb58974dba4445e2ed4f Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 13:10:52 +0530 Subject: [PATCH 10/18] refactor: rename action types for clarity in archiving process --- internal/pkg/pipeline/task/archive/archive.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/archive.go b/internal/pkg/pipeline/task/archive/archive.go index 8e8791c..cfdb646 100644 --- a/internal/pkg/pipeline/task/archive/archive.go +++ b/internal/pkg/pipeline/task/archive/archive.go @@ -10,8 +10,8 @@ import ( type actionType string const ( - actionCompress actionType = `pack` - actionDecompress actionType = `unpack` + actionPack actionType = `pack` + actionUnpack actionType = `unpack` ) const ( @@ -48,11 +48,11 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } - if obj.Action != actionCompress && obj.Action != actionDecompress { - return fmt.Errorf("invalid action: %s (must be 'compress' or 'decompress')", obj.Action) + if obj.Action != actionPack && obj.Action != actionUnpack { + return fmt.Errorf("invalid action: %s (must be 'pack' or 'unpack')", obj.Action) } - if obj.Action == actionCompress { + if obj.Action == actionPack { if obj.FileName == "" { return fmt.Errorf("file_name must be specified when action is 'pack'") } @@ -101,9 +101,9 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e } switch c.Action { - case actionCompress: + case actionPack: archiv.Write(r.Data) - case actionDecompress: + case actionUnpack: archiv.Read(r.Data) } } From c3a74fabbe9039b0076772d1b348632222496f4d Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 14:34:58 +0530 Subject: [PATCH 11/18] fix: correct error handling in tar archive read function and improve README clarity --- internal/pkg/pipeline/task/archive/README.md | 4 ++-- internal/pkg/pipeline/task/archive/tar.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md index 9b4a9a8..7b1c837 100644 --- a/internal/pkg/pipeline/task/archive/README.md +++ b/internal/pkg/pipeline/task/archive/README.md @@ -1,12 +1,12 @@ # Archive Task -The `archive` task pack and unpack file data in various archive formats (TAR, ZIP), enabling efficient data packaging and extraction within pipelines. +The `archive` task packs and unpacks file data in various archive formats (TAR, ZIP), enabling efficient data packaging and extraction within pipelines. ## Function The archive task handles two primary operations: - **Pack**: Creates archives from input data (e.g., create a ZIP or TAR file) -- **Unpack**: Extract archives to retrieve individual files +- **Unpack**: Extracts archives to retrieve individual files ## Behavior diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index b1cd9f5..f314cb7 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -22,9 +22,12 @@ func (t *tarArchive) Read(b []byte) { for { header, err := r.Next() - if err != nil || err != io.EOF { + if err == io.EOF { break } + if err != nil { + log.Fatal(err) + } // check the file type is regular file if header.Typeflag == tar.TypeReg { From 829ac05e3f1777eb095c7889c2cc0e931600ec09 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 14:47:51 +0530 Subject: [PATCH 12/18] fix: improve error handling in zip archive read function --- internal/pkg/pipeline/task/archive/zip.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 1c3b2b9..0d8dafc 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -3,7 +3,6 @@ package archive import ( "archive/zip" "bytes" - "errors" "io" "log" @@ -36,7 +35,10 @@ func (z *zipArchive) Read(b []byte) { buf := make([]byte, f.FileInfo().Size()) _, err = rc.Read(buf) - if err != nil && !errors.Is(err, io.EOF) { + if err == io.EOF { + break + } + if err != nil { log.Fatal(err) } From 018786abfacf7b05b312c41bcfd3f061658d3237 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 16:20:17 +0530 Subject: [PATCH 13/18] removed duplicate file --- birds_file.txt | 124 --------------------------------------- test/pipelines/birds.txt | 2 +- 2 files changed, 1 insertion(+), 125 deletions(-) delete mode 100644 birds_file.txt diff --git a/birds_file.txt b/birds_file.txt deleted file mode 100644 index a4d4284..0000000 --- a/birds_file.txt +++ /dev/null @@ -1,124 +0,0 @@ -Albatross -Acorn Woodpecker -American Kestrel -Anna's Hummingbird -Bald Eagle -Baltimore Oriole -Barn Swallow -Belted Kingfisher -Bicolored Antbird -Black Capped Chickadee -Black Skimmer -Blue Jay -Bluebird -Bobolink -Bohemian Waxwing -Brown Creeper -Brown Pelican -Burrowing Owl -California Condor -California Quail -Canada Goose -Cardinal -Caspian Tern -Cedar Waxwing -Chestnut Sided Warbler -Chimney Swift -Chipping Sparrow -Clark's Nutcracker -Clay Colored Sparrow -Cliff Swallow -Columbiformes -Common Eider -Common Goldeneye -Common Grackle -Common Loon -Common Merganser -Common Raven -Common Tern -Common Yellowthroat -Coopers Hawk -Cory's Shearwater -Crested Flycatcher -Curve Billed Thrasher -Dark Eyed Junco -Dickcissel -Dovekie -Downy Woodpecker -Drab Seedeater -Dunnock -Eastern Bluebird -Eastern Meadowlark -Eastern Phoebe -Eastern Screech Owl -Eastern Towhee -Eastern Wood Pewee -Eared Grebe -Egyptian Plover -Elanus leucurus -Evening Grosbeak -Eared Quetzal -Eurasian Wigeon -European Starling -Fabulous Flamingo -Ferruginous Hawk -Fiscal Flycatcher -Flammulated Owl -Flatbill -Flesh Footed Shearwater -Florida Jay -Fringilla coelebs -Fulmar -Gadwall -Gambel's Quail -Gannet -Garden Warbler -Gnatcatcher -Godwit -Golden Eagle -Golden Winged Warbler -Goldeneye -Goldfinch -Goosander -Goshawk -Grace's Warbler -Grasshopper Sparrow -Gray Catbird -Great Black Backed Gull -Great Blue Heron -Great Crested Flycatcher -Great Horned Owl -Great Kiskadee -Great Spotted Woodpecker -Great Tit -Grebe -Greenbul -Green Heron -Green Tailed Towhee -Green Winged Teal -Greenlet -Grey Kingbird -Grey Owl -Grosbeaks -Grouse -Gull -Hairy Woodpecker -Hammond's Flycatcher -Harris Hawk -Harris Sparrow -Hawaiian Creeper -Hawaiian Goose -Hawfinch -Heathland Francolin -Herring Gull -Hoary Puffleg -Hooded Merganser -Hooded Oriole -Hooded Warbler -Hoopoe -Horned Auk -Horned Grebe -Horned Lark -House Finch -House Sparrow -House Wren \ No newline at end of file diff --git a/test/pipelines/birds.txt b/test/pipelines/birds.txt index b103723..a4d4284 100644 --- a/test/pipelines/birds.txt +++ b/test/pipelines/birds.txt @@ -59,7 +59,7 @@ Elanus leucurus Evening Grosbeak Eared Quetzal Eurasian Wigeon -Eurpean Starling +European Starling Fabulous Flamingo Ferruginous Hawk Fiscal Flycatcher From 309c1dc5324b56c30b8673e68a603313b3000550 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Fri, 30 Jan 2026 20:51:38 +0530 Subject: [PATCH 14/18] multi file support with proper naming conventions --- internal/pkg/pipeline/task/archive/README.md | 212 +++--------------- internal/pkg/pipeline/task/archive/archive.go | 65 ++---- internal/pkg/pipeline/task/archive/tar.go | 97 +++++--- internal/pkg/pipeline/task/archive/zip.go | 102 ++++++--- internal/pkg/pipeline/task/file/file.go | 18 +- internal/pkg/pipeline/task/file/local.go | 7 + test/pipelines/tar_unpack_multifile_test.yaml | 8 +- test/pipelines/zip_pack_test.yaml | 1 - 8 files changed, 216 insertions(+), 294 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/README.md b/internal/pkg/pipeline/task/archive/README.md index 7b1c837..90a8020 100644 --- a/internal/pkg/pipeline/task/archive/README.md +++ b/internal/pkg/pipeline/task/archive/README.md @@ -1,20 +1,20 @@ # Archive Task -The `archive` task packs and unpacks file data in various archive formats (TAR, ZIP), enabling efficient data packaging and extraction within pipelines. +The `archive` task packages or extracts files using various archive formats, enabling efficient file bundling and extraction for data processing pipelines. ## Function -The archive task handles two primary operations: -- **Pack**: Creates archives from input data (e.g., create a ZIP or TAR file) -- **Unpack**: Extracts archives to retrieve individual files +The archive task can operate in two modes: +- **Pack mode**: Combines multiple files into a single archive +- **Unpack mode**: Extracts files from an archive ## Behavior -The archive task operates in two modes depending on the specified action: +The archive task processes data based on the `action` field: +- **Pack**: Receives individual files and creates an archive file containing them +- **Unpack**: Receives an archive file and extracts its contents, outputting each file individually -- **Pack mode** (`action: pack`): Takes input data records and creates an archive file. Each record's data is packaged into the specified archive format with the configured filename. The task outputs the complete archive data. - -- **Unpack mode** (`action: unpack`): Takes archive file data as input and extracts individual files. For each file found in the archive, the task outputs a separate record containing that file's data. +The task receives records from its input channel, applies the archiving operation, and sends the processed records to its output channel. ## Configuration Fields @@ -22,218 +22,64 @@ The archive task operates in two modes depending on the specified action: |-------|------|---------|-------------| | `name` | string | - | Task name for identification | | `type` | string | `archive` | Must be "archive" | -| `format` | string | `zip` | Archive format: `zip` or `tar` | -| `action` | string | `pack` | Operation to perform: `pack` or `unpack` | -| `file_name` | string | - | Name of the file within the archive (required for `pack` action) | - -### File Name Format - -The `file_name` field specifies how files are stored within archives. Different formats have specific requirements: - -#### ZIP Archives -- **Paths**: Filenames can include directory paths, represented using forward slashes (/) - - Example: `docs/readme.txt` -- **Separators**: Only forward slashes (/) are allowed as folder separators, regardless of platform -- **Relative Paths**: Filenames must be relative (no drive letters like C: and no leading slash /) -- **Case Sensitivity**: ZIP stores filenames as is, but whether they are case-sensitive depends on the extraction platform -- **Directories**: End directory names with a trailing slash (/) to indicate a folder -- **Duplicates**: Duplicate names are allowed, but may cause confusion for some zip tools -- **Allowed Characters**: Supports Unicode, but stick to common, portable characters for best compatibility - -#### TAR Archives -- **Paths**: Filenames can include paths separated by forward slashes (/) - - Example: `src/main.c` -- **Relative and Absolute Paths**: Both relative (foo.txt) and absolute paths (/foo.txt) can technically be stored, but using relative paths is strongly recommended for portability and to avoid extraction issues -- **Case Sensitivity**: Tar files store names as is; case sensitivity depends on the underlying filesystem -- **Long Paths**: Traditional tar limits path length to 100 bytes for the filename, but modern tar formats (ustar, pax) allow longer names -- **Directories**: Represented as entries ending in a slash (/) -- **Duplicates**: Duplicate filenames are possible; later entries usually overwrite earlier ones on extraction -- **Allowed Characters**: Generally supports any characters, but best practice is to stick to ASCII (letters, digits, underscores, dashes, periods, slashes) for maximum compatibility +| `format` | string | `zip` | Archive format (zip, tar) | +| `action` | string | `pack` | Action type (pack or unpack) | ## Supported Formats -### ZIP -- **Extension**: `.zip` -- **Use case**: Cross-platform, widely supported compression format -- **Features**: Individual file pack, preserves file structure -- **Pack**: Creates a ZIP archive with single or multiple files -- **Unpack**: Extracts all regular files from ZIP archive - -### TAR -- **Extension**: `.tar` or `.tar.gz` -- **Use case**: Unix/Linux native format, streaming support -- **Features**: Preserves file metadata, supports packing (with gzip) -- **Pack**: Creates a TAR archive with file metadata -- **Unpack**: Extracts all regular files from TAR archive (including gzip-compressed) +The task supports the following archive formats: +- **zip**: Standard ZIP format, widely compatible +- **tar**: TAR format, commonly used in Unix/Linux environments ## Example Configurations -### Pack a file into ZIP +### Pack files into a ZIP archive: ```yaml tasks: - name: create_zip type: archive format: zip action: pack - file_name: output.txt ``` -### Unpack ZIP archive +### Unpack a ZIP archive: ```yaml tasks: - - name: unpack_zip + - name: extract_zip type: archive format: zip action: unpack ``` -### Pack a file into TAR +### Pack files into a TAR archive: ```yaml tasks: - name: create_tar type: archive format: tar action: pack - file_name: data.txt ``` -### Unpack TAR.GZ archive +### Unpack a TAR archive: ```yaml tasks: - - name: unpack_tar_gz + - name: extract_tar type: archive format: tar action: unpack ``` -## Complete Pipeline Examples - -### Read files, pack to ZIP, write to file -```yaml -tasks: - - name: read_source - type: file - path: source/*.txt - - - name: pack_to_zip - type: archive - format: zip - action: pack - file_name: archive.txt - - - name: write_archive - type: file - path: output/archive.zip -``` - -### Extract TAR.GZ and write individual files -```yaml -tasks: - - name: read_archive - type: file - path: data.tar.gz - - - name: decompress_file - type: compress - format: gzip - action: decompress - - - name: unpack_files - type: archive - format: tar - action: unpack - - - name: write_unpacked - type: file - path: /output/data.txt -``` - -### Multi-step packing pipeline -```yaml -tasks: - - name: read_data - type: file - path: test/pipelines/birds.txt - - - name: pack_zip - type: archive - format: zip - action: pack - file_name: birds.zip - - - name: unpack_zip - type: archive - format: zip - action: unpack - - - name: write_result - type: file - path: unpacked_birds/birds.txt -``` - -## Data Flow - -### Pack Operation -``` -Input Records - ↓ -[Record Data] → Archive Creation → [Archive Bytes] → Output -``` - -### Unpack Operation -``` -Input Records - ↓ -[Archive Bytes] → File Extraction → [File 1], [File 2], ... → Output -``` - -## Use Cases - -- **Data packaging**: Bundle multiple files into a single archive -- **Data extraction**: Process archived data within pipelines -- **Archive conversion**: Convert between ZIP and TAR formats -- **Backup workflows**: Create and manage compressed backups -- **Data distribution**: Package files for downstream consumption - -## Error Handling - -- **Missing file_name**: Throws error if `file_name` is not specified for `pack` action -- **Invalid format**: Throws error if format is not `zip` or `tar` -- **Invalid action**: Throws error if action is not `pack` or `unpack` -- **Corrupt archive**: May throw error when unpacking malformed archives -- **Empty data**: Skips processing of empty records - -## Technical Details - -### ZIP Format -- Uses Go's `archive/zip` package -- Supports standard ZIP compression -- Preserves file metadata (size, modification time) -- Regular files only (directories not included in unpacking) - -### TAR Format -- Uses Go's `archive/tar` package -- Supports raw TAR and gzip-compressed TAR files -- Automatic format detection for gzip compression -- Preserves tar header information -- Regular files only (directories and special files filtered) - -## Performance Considerations - -- **Memory usage**: Entire archive loaded into memory for processing -- **Compression ratio**: ZIP typically provides better compression than TAR alone -- **Processing speed**: TAR is generally faster than ZIP due to simpler format -- **Large files**: For very large archives, consider chunking or streaming approaches - ## Sample Pipelines -- `test/pipelines/zip_pack_test.yaml` - Create ZIP archives -- `test/pipelines/zip_unpack_test.yaml` - Extract from ZIP archives -- `test/pipelines/tar_unpack_multifile_test.yaml` - Extract multiple files from TAR archive +- `test/pipelines/zip_pack_test.yaml` - ZIP packing example +- `test/pipelines/zip_unpack_test.yaml` - ZIP unpacking example +- `test/pipelines/tar_unpack_multifile_test.yaml` - TAR unpacking with multiple files -## Security Considerations +## Use Cases -- Archives are processed in-memory; ensure sufficient memory for large files -- ZIP bomb protection: Be cautious with untrusted archive sources -- Path traversal: Archive extraction validates file paths to prevent escaping base directory -- File permissions: TAR format supports Unix permissions; ZIP has limited permission support +- **File bundling**: Package multiple files into a single archive for distribution +- **Data consolidation**: Combine separate data files into archives for storage +- **Archive extraction**: Extract files from archives for processing +- **Backup operations**: Create archives of processed data for backup +- **Format conversion**: Convert between archive formats +- **Multi-file handling**: Process multiple files as a single archive unit diff --git a/internal/pkg/pipeline/task/archive/archive.go b/internal/pkg/pipeline/task/archive/archive.go index cfdb646..08001e1 100644 --- a/internal/pkg/pipeline/task/archive/archive.go +++ b/internal/pkg/pipeline/task/archive/archive.go @@ -20,15 +20,14 @@ const ( ) type archiver interface { - Read(b []byte) - Write(b []byte) + Read() + Write() } type core struct { task.Base `yaml:",inline" json:",inline"` Format string `yaml:"format,omitempty" json:"format,omitempty"` Action actionType `yaml:"action,omitempty" json:"action,omitempty"` - FileName string `yaml:"file_name,omitempty" json:"file_name,omitempty"` } func New() (task.Task, error) { @@ -52,12 +51,6 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error { return fmt.Errorf("invalid action: %s (must be 'pack' or 'unpack')", obj.Action) } - if obj.Action == actionPack { - if obj.FileName == "" { - return fmt.Errorf("file_name must be specified when action is 'pack'") - } - } - *c = core(obj) return nil @@ -69,43 +62,31 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e return task.ErrNilInput } - for { - r, ok := c.GetRecord(input) - if !ok { - break - } + var archiv archiver - if len(r.Data) == 0 { - continue + switch c.Format { + case "tar": + archiv = &tarArchive{ + Base: &c.Base, + OutputChan: output, + InputChan: input, } - - var archiv archiver - - switch c.Format { - case "tar": - archiv = &tarArchive{ - Base: &c.Base, - FileName: c.FileName, - Record: r, - OutputChan: output, - } - case "zip": - archiv = &zipArchive{ - Base: &c.Base, - FileName: c.FileName, - Record: r, - OutputChan: output, - } - default: - return fmt.Errorf("unsupported format: %s", c.Format) + case "zip": + archiv = &zipArchive{ + Base: &c.Base, + OutputChan: output, + InputChan: input, } + default: + return fmt.Errorf("unsupported format: %s", c.Format) + } - switch c.Action { - case actionPack: - archiv.Write(r.Data) - case actionUnpack: - archiv.Read(r.Data) - } + switch c.Action { + case actionPack: + archiv.Write() + case actionUnpack: + archiv.Read() } + return nil } diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index f314cb7..27f66a9 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "log" + "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" @@ -12,60 +13,96 @@ import ( type tarArchive struct { *task.Base - FileName string - Record *record.Record OutputChan chan<- *record.Record + InputChan <-chan *record.Record } -func (t *tarArchive) Read(b []byte) { - r := tar.NewReader(bytes.NewReader(b)) +func (t *tarArchive) Read() { for { - header, err := r.Next() - if err == io.EOF { + rc, ok := t.GetRecord(t.InputChan) + if !ok { break } - if err != nil { - log.Fatal(err) + + if len(rc.Data) == 0 { + continue } - // check the file type is regular file - if header.Typeflag == tar.TypeReg { - buf := make([]byte, header.Size) - if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { + b := rc.Data + + r := tar.NewReader(bytes.NewReader(b)) + + for { + header, err := r.Next() + if err == io.EOF { + break + } + if err != nil { log.Fatal(err) } - t.SendData(t.Record.Context, buf, t.OutputChan) - } + // check the file type is regular file + if header.Typeflag == tar.TypeReg { + buf := make([]byte, header.Size) + if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { + log.Fatal(err) + } + rc.SetContextValue("CATERPILLER_FILE_PATH_READ", header.Name) + t.SendData(rc.Context, buf, t.OutputChan) + } + + } } } -func (t *tarArchive) Write(b []byte) { - - if t.FileName == "" { - log.Fatal("file name is required to create tar archive") - } +func (t *tarArchive) Write() { var buf bytes.Buffer tw := tar.NewWriter(&buf) + var rc record.Record - header := &tar.Header{ - Name: t.FileName, - Mode: 0600, - Size: int64(len(b)), - } - if err := tw.WriteHeader(header); err != nil { - log.Fatal(err) - } + for { + rec, ok := t.GetRecord(t.InputChan) + if !ok { + break + } + b := rec.Data - if _, err := tw.Write(b); err != nil { - log.Fatal(err) + if len(b) == 0 { + continue + } + + filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH") + if !found { + log.Fatal("filepath not set in context") + } + + if filePath == "" { + log.Fatal("file_name is required when filepath is not in context") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + header := &tar.Header{ + Name: filePath, + Mode: 0600, + Size: int64(len(b)), + } + if err := tw.WriteHeader(header); err != nil { + log.Fatal(err) + } + + if _, err := tw.Write(b); err != nil { + log.Fatal(err) + } + + rc.Context = rec.Context } if err := tw.Close(); err != nil { log.Fatal(err) } - t.SendData(t.Record.Context, buf.Bytes(), t.OutputChan) + t.SendData(rc.Context, buf.Bytes(), t.OutputChan) } diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 0d8dafc..09018a8 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "log" + "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" @@ -12,57 +13,94 @@ import ( type zipArchive struct { *task.Base - FileName string - Record *record.Record OutputChan chan<- *record.Record + InputChan <-chan *record.Record } -func (z *zipArchive) Read(b []byte) { - r, err := zip.NewReader(bytes.NewReader(b), int64(len(b))) - if err != nil { - log.Fatal(err) - } - for _, f := range r.File { +func (z *zipArchive) Read() { + for { + rc, ok := z.GetRecord(z.InputChan) + if !ok { + break + } - // check the file type is regular file - if f.FileInfo().Mode().IsRegular() { + if len(rc.Data) == 0 { + continue + } - rc, err := f.Open() - if err != nil { - log.Fatal(err) - } + b := rc.Data - buf := make([]byte, f.FileInfo().Size()) + r, err := zip.NewReader(bytes.NewReader(b), int64(len(b))) + if err != nil { + log.Fatal(err) + } + for _, f := range r.File { - _, err = rc.Read(buf) - if err == io.EOF { - break - } - if err != nil { - log.Fatal(err) - } + // check the file type is regular file + if f.FileInfo().Mode().IsRegular() { + + rc.SetContextValue("CATERPILLER_FILE_PATH_READ", f.Name) + + fs, err := f.Open() + if err != nil { + log.Fatal(err) + } - rc.Close() + buf := make([]byte, f.FileInfo().Size()) - z.SendData(z.Record.Context, buf, z.OutputChan) + _, err = fs.Read(buf) + if err != nil && err != io.EOF { + log.Fatal(err) + } + + fs.Close() + + z.SendData(rc.Context, buf, z.OutputChan) + } } } } -func (z *zipArchive) Write(b []byte) { +func (z *zipArchive) Write() { zipBuf := new(bytes.Buffer) zipWriter := zip.NewWriter(zipBuf) + var rc record.Record - if z.FileName == "" { - log.Fatal("file name is required to create zip archive") - } + for { + rec, ok := z.GetRecord(z.InputChan) + if !ok { + break + } - w, _ := zipWriter.Create(z.FileName) - w.Write(b) + filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH") + if !found { + log.Fatal("filepath not set in context") + } - zipWriter.Close() + if filePath == "" { + log.Fatal("file_name is required when filepath is not in context") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + w, err := zipWriter.Create(filePath) + if err != nil { + log.Fatal(err) + } + _, err = w.Write(rec.Data) + if err != nil { + log.Fatal(err) + } + + rc.Context = rec.Context + } + + if err := zipWriter.Close(); err != nil { + log.Fatal(err) + } - z.SendData(z.Record.Context, zipBuf.Bytes(), z.OutputChan) + // Send the complete ZIP archive + z.SendData(rc.Context, zipBuf.Bytes(), z.OutputChan) } diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 50caed6..6062527 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "log" "net/url" "strings" @@ -134,6 +135,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} + rc.SetContextValue("CATERPILLER_FILE_PATH", path) // let's write content to output channel f.SendData(rc.Context, content, output) @@ -168,11 +170,25 @@ func (f *file) writeFile(input <-chan *record.Record) error { pathScheme = fileScheme } + var fs file + + fs = *f + filePath, found := rc.GetContextValue("CATERPILLER_FILE_PATH_READ") + if found { + if filePath == "" { + log.Fatal("file_name is required when filepath is not in context") + } + + filePath = strings.ReplaceAll(filePath, "\\", "/") + + fs.Path = f.Path + config.String(filePath) + } + writerFunction, found := writers[pathScheme] if !found { return unknownSchemeError(pathScheme) } - if err := writerFunction(f, rc, bytes.NewReader(rc.Data)); err != nil { + if err := writerFunction(&fs, rc, bytes.NewReader(rc.Data)); err != nil { return err } } diff --git a/internal/pkg/pipeline/task/file/local.go b/internal/pkg/pipeline/task/file/local.go index f24bac9..0ee232a 100644 --- a/internal/pkg/pipeline/task/file/local.go +++ b/internal/pkg/pipeline/task/file/local.go @@ -3,6 +3,7 @@ package file import ( "io" "os" + "path/filepath" "strings" "github.com/bmatcuk/doublestar" @@ -52,6 +53,12 @@ func writeLocalFile(f *file, rec *record.Record, reader io.Reader) error { return err } + dir := filepath.Dir(path) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + outputFile, err := os.Create((path[getPathIndex(path):])) if err != nil { return err diff --git a/test/pipelines/tar_unpack_multifile_test.yaml b/test/pipelines/tar_unpack_multifile_test.yaml index 97398b8..0a4ad03 100644 --- a/test/pipelines/tar_unpack_multifile_test.yaml +++ b/test/pipelines/tar_unpack_multifile_test.yaml @@ -2,16 +2,14 @@ tasks: - name: planet_tar_file type: file path: test/pipelines/planets.tar.gz - - type: decompress + - name: decompress_gzip + type: compress format: gzip action: decompress - name: unpack_planets_file type: archive format: tar action: unpack - - name: combine - type: join - size: 20 - name: store_unpacked_file type: file - path: unpacked_planets/{{ macro "uuid" }}.txt \ No newline at end of file + path: unpacked_planets/ \ No newline at end of file diff --git a/test/pipelines/zip_pack_test.yaml b/test/pipelines/zip_pack_test.yaml index 8085352..8fa8fd2 100644 --- a/test/pipelines/zip_pack_test.yaml +++ b/test/pipelines/zip_pack_test.yaml @@ -4,7 +4,6 @@ tasks: path: test/pipelines/birds.txt - name: pack_file type: archive - file_name: birds.txt - name: store_packed_file type: file path: packed_birds.zip \ No newline at end of file From 5b8246adf98b2ae5e967ba993134c797c44d2117 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Mon, 2 Feb 2026 12:26:46 +0530 Subject: [PATCH 15/18] Refactored code used map instead of switch case --- internal/pkg/pipeline/task/archive/archive.go | 55 +++++++++++-------- internal/pkg/pipeline/task/archive/tar.go | 3 +- internal/pkg/pipeline/task/archive/zip.go | 3 +- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/archive.go b/internal/pkg/pipeline/task/archive/archive.go index 08001e1..f093e58 100644 --- a/internal/pkg/pipeline/task/archive/archive.go +++ b/internal/pkg/pipeline/task/archive/archive.go @@ -24,6 +24,33 @@ type archiver interface { Write() } +type channelStruct struct { + InputChan <-chan *record.Record + OutputChan chan<- *record.Record +} + +var ( + supportedFormats = map[string]func(bs *task.Base, chStruct *channelStruct) archiver{ + "zip": func(bs *task.Base, ch *channelStruct) archiver { + return &zipArchive{ + Base: bs, + channelStruct: ch, + } + }, + "tar": func(bs *task.Base, ch *channelStruct) archiver { + return &tarArchive{ + Base: bs, + channelStruct: ch, + } + }, + } + + supportedActions = map[string]func(archiver) func(){ + `pack`: func(a archiver) func() { return a.Write }, + `unpack`: func(a archiver) func() { return a.Read }, + } +) + type core struct { task.Base `yaml:",inline" json:",inline"` Format string `yaml:"format,omitempty" json:"format,omitempty"` @@ -64,29 +91,13 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e var archiv archiver - switch c.Format { - case "tar": - archiv = &tarArchive{ - Base: &c.Base, - OutputChan: output, - InputChan: input, - } - case "zip": - archiv = &zipArchive{ - Base: &c.Base, - OutputChan: output, - InputChan: input, - } - default: - return fmt.Errorf("unsupported format: %s", c.Format) - } + archiv = supportedFormats[c.Format](&c.Base, &channelStruct{ + InputChan: input, + OutputChan: output, + }) - switch c.Action { - case actionPack: - archiv.Write() - case actionUnpack: - archiv.Read() - } + actionFunc := supportedActions[string(c.Action)](archiv) + actionFunc() return nil } diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index 27f66a9..390f9ce 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -13,8 +13,7 @@ import ( type tarArchive struct { *task.Base - OutputChan chan<- *record.Record - InputChan <-chan *record.Record + *channelStruct } func (t *tarArchive) Read() { diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 09018a8..10ec194 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -13,8 +13,7 @@ import ( type zipArchive struct { *task.Base - OutputChan chan<- *record.Record - InputChan <-chan *record.Record + *channelStruct } func (z *zipArchive) Read() { From 972476b7eb2095667f2c9625d8e23586c4d0ae90 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Wed, 4 Feb 2026 13:11:09 +0530 Subject: [PATCH 16/18] refactor: replace string literals with context keys for file path handling --- internal/pkg/pipeline/task/archive/tar.go | 5 +++-- internal/pkg/pipeline/task/archive/zip.go | 5 +++-- internal/pkg/pipeline/task/file/file.go | 7 ++++--- internal/pkg/pipeline/task/task.go | 7 +++++++ 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index 390f9ce..be8179b 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "log" + "path/filepath" "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" @@ -47,7 +48,7 @@ func (t *tarArchive) Read() { if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { log.Fatal(err) } - rc.SetContextValue("CATERPILLER_FILE_PATH_READ", header.Name) + rc.SetContextValue(string(task.CtxKeyFilePathRead), filepath.Base(header.Name)) t.SendData(rc.Context, buf, t.OutputChan) } @@ -72,7 +73,7 @@ func (t *tarArchive) Write() { continue } - filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH") + filePath, found := rec.GetContextValue(string(task.CtxKeyFilePath)) if !found { log.Fatal("filepath not set in context") } diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 10ec194..8c531e9 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "log" + "path/filepath" "strings" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" @@ -38,7 +39,7 @@ func (z *zipArchive) Read() { // check the file type is regular file if f.FileInfo().Mode().IsRegular() { - rc.SetContextValue("CATERPILLER_FILE_PATH_READ", f.Name) + rc.SetContextValue(string(task.CtxKeyFilePathRead), filepath.Base(f.Name)) fs, err := f.Open() if err != nil { @@ -72,7 +73,7 @@ func (z *zipArchive) Write() { break } - filePath, found := rec.GetContextValue("CATERPILLER_FILE_PATH") + filePath, found := rec.GetContextValue(string(task.CtxKeyFilePath)) if !found { log.Fatal("filepath not set in context") } diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 6062527..ec0ddaa 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -7,6 +7,7 @@ import ( "io" "log" "net/url" + "path/filepath" "strings" "github.com/patterninc/caterpillar/internal/pkg/config" @@ -135,7 +136,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} - rc.SetContextValue("CATERPILLER_FILE_PATH", path) + rc.SetContextValue(string(task.CtxKeyFilePath), filepath.Base(path)) // let's write content to output channel f.SendData(rc.Context, content, output) @@ -173,10 +174,10 @@ func (f *file) writeFile(input <-chan *record.Record) error { var fs file fs = *f - filePath, found := rc.GetContextValue("CATERPILLER_FILE_PATH_READ") + filePath, found := rc.GetContextValue(string(task.CtxKeyFilePathRead)) if found { if filePath == "" { - log.Fatal("file_name is required when filepath is not in context") + log.Fatal("required file path") } filePath = strings.ReplaceAll(filePath, "\\", "/") diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index 688b3c0..fc39a7d 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -21,6 +21,13 @@ var ( ErrPresentInputOutput = fmt.Errorf(`either input or output must be set, not both`) ) +type contextKeyFile string + +const ( + CtxKeyFilePath contextKeyFile = "CATERPILLAR_FILE_PATH" + CtxKeyFilePathRead contextKeyFile = "CATERPILLAR_FILE_PATH_READ" +) + type Task interface { Run(<-chan *record.Record, chan<- *record.Record) error GetName() string From d6d4331da0d55747ccfd5a111fc5e98af28ebaed Mon Sep 17 00:00:00 2001 From: Mahesh Date: Thu, 5 Feb 2026 11:27:27 +0530 Subject: [PATCH 17/18] fix: update log message for empty filepath in context --- internal/pkg/pipeline/task/archive/tar.go | 2 +- internal/pkg/pipeline/task/archive/zip.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index be8179b..4d410b0 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -79,7 +79,7 @@ func (t *tarArchive) Write() { } if filePath == "" { - log.Fatal("file_name is required when filepath is not in context") + log.Fatal("empty filepath in context") } filePath = strings.ReplaceAll(filePath, "\\", "/") diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 8c531e9..2d76441 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -79,7 +79,7 @@ func (z *zipArchive) Write() { } if filePath == "" { - log.Fatal("file_name is required when filepath is not in context") + log.Fatal("empty filepath in context") } filePath = strings.ReplaceAll(filePath, "\\", "/") From e2370c074cdfb34614f342abc9a9d811d7712ed9 Mon Sep 17 00:00:00 2001 From: Mahesh Date: Thu, 5 Feb 2026 14:37:54 +0530 Subject: [PATCH 18/18] refactor: rename context keys for file path handling in archive and file tasks --- internal/pkg/pipeline/task/archive/tar.go | 4 ++-- internal/pkg/pipeline/task/archive/zip.go | 4 ++-- internal/pkg/pipeline/task/file/file.go | 4 ++-- internal/pkg/pipeline/task/task.go | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/pkg/pipeline/task/archive/tar.go b/internal/pkg/pipeline/task/archive/tar.go index 4d410b0..2e9a194 100644 --- a/internal/pkg/pipeline/task/archive/tar.go +++ b/internal/pkg/pipeline/task/archive/tar.go @@ -48,7 +48,7 @@ func (t *tarArchive) Read() { if _, err := io.ReadFull(r, buf); err != nil && err != io.EOF { log.Fatal(err) } - rc.SetContextValue(string(task.CtxKeyFilePathRead), filepath.Base(header.Name)) + rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(header.Name)) t.SendData(rc.Context, buf, t.OutputChan) } @@ -73,7 +73,7 @@ func (t *tarArchive) Write() { continue } - filePath, found := rec.GetContextValue(string(task.CtxKeyFilePath)) + filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite)) if !found { log.Fatal("filepath not set in context") } diff --git a/internal/pkg/pipeline/task/archive/zip.go b/internal/pkg/pipeline/task/archive/zip.go index 2d76441..53b10bd 100644 --- a/internal/pkg/pipeline/task/archive/zip.go +++ b/internal/pkg/pipeline/task/archive/zip.go @@ -39,7 +39,7 @@ func (z *zipArchive) Read() { // check the file type is regular file if f.FileInfo().Mode().IsRegular() { - rc.SetContextValue(string(task.CtxKeyFilePathRead), filepath.Base(f.Name)) + rc.SetContextValue(string(task.CtxKeyArchiveFileNameWrite), filepath.Base(f.Name)) fs, err := f.Open() if err != nil { @@ -73,7 +73,7 @@ func (z *zipArchive) Write() { break } - filePath, found := rec.GetContextValue(string(task.CtxKeyFilePath)) + filePath, found := rec.GetContextValue(string(task.CtxKeyFileNameWrite)) if !found { log.Fatal("filepath not set in context") } diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index ec0ddaa..722e315 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -136,7 +136,7 @@ func (f *file) readFile(output chan<- *record.Record) error { // Create a default record with context rc := &record.Record{Context: ctx} - rc.SetContextValue(string(task.CtxKeyFilePath), filepath.Base(path)) + rc.SetContextValue(string(task.CtxKeyFileNameWrite), filepath.Base(path)) // let's write content to output channel f.SendData(rc.Context, content, output) @@ -174,7 +174,7 @@ func (f *file) writeFile(input <-chan *record.Record) error { var fs file fs = *f - filePath, found := rc.GetContextValue(string(task.CtxKeyFilePathRead)) + filePath, found := rc.GetContextValue(string(task.CtxKeyArchiveFileNameWrite)) if found { if filePath == "" { log.Fatal("required file path") diff --git a/internal/pkg/pipeline/task/task.go b/internal/pkg/pipeline/task/task.go index fc39a7d..749c36b 100644 --- a/internal/pkg/pipeline/task/task.go +++ b/internal/pkg/pipeline/task/task.go @@ -24,8 +24,8 @@ var ( type contextKeyFile string const ( - CtxKeyFilePath contextKeyFile = "CATERPILLAR_FILE_PATH" - CtxKeyFilePathRead contextKeyFile = "CATERPILLAR_FILE_PATH_READ" + CtxKeyFileNameWrite contextKeyFile = "CATERPILLAR_FILE_NAME_WRITE" + CtxKeyArchiveFileNameWrite contextKeyFile = "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" ) type Task interface {