Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/go-playground/validator/v10 v10.27.0
github.com/google/uuid v1.6.0
github.com/itchyny/gojq v0.12.17
github.com/jhillyerd/enmime v1.3.0
github.com/segmentio/kafka-go v0.4.49
github.com/stretchr/testify v1.11.1
github.com/xuri/excelize/v2 v2.10.0
Expand All @@ -28,6 +29,7 @@ require (
github.com/DataDog/zstd v1.5.7 // indirect
github.com/antchfx/xpath v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.12.0 // indirect
github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect
Expand All @@ -40,12 +42,16 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -55,7 +61,9 @@ require (
github.com/prometheus/procfs v0.17.0 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.4 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect
github.com/tiendc/go-deepcopy v1.7.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0=
github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE=
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a h1:MISbI8sU/PSK/ztvmWKFcI7UGb5/HQT7B+i3a2myKgI=
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a/go.mod h1:2GxOXOlEPAMFPfp014mK1SWq8G8BN8o7/dfYqJrVGn8=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
Expand Down Expand Up @@ -88,8 +90,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f h1:3BSP1Tbs2djlpprl7wCLuiqMaUh5SJkkzI2gDs+FgLs=
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f/go.mod h1:Pcatq5tYkCW2Q6yrR2VRHlbHpZ/R4/7qyL1TCF7vl14=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand All @@ -106,6 +112,10 @@ github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg=
github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 h1:iCHtR9CQyktQ5+f3dMVZfwD2KWJUgm7M0gdL9NGr8KA=
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056/go.mod h1:CVKlgaMiht+LXvHG173ujK6JUhZXKb2u/BQtjPDIvyk=
github.com/jhillyerd/enmime v1.3.0 h1:LV5kzfLidiOr8qRGIpYYmUZCnhrPbcFAnAFUnWn99rw=
github.com/jhillyerd/enmime v1.3.0/go.mod h1:6c6jg5HdRRV2FtvVL69LjiX1M8oE0xDX9VEhV3oy4gs=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand All @@ -116,8 +126,13 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand All @@ -140,11 +155,16 @@ github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7
github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg=
github.com/richardlehane/msoleps v1.0.4 h1:WuESlvhX3gH2IHcd8UqyCuFY5yiq/GR/yqaSM/9/g00=
github.com/richardlehane/msoleps v1.0.4/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo=
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tiendc/go-deepcopy v1.7.1 h1:LnubftI6nYaaMOcaz0LphzwraqN8jiWTwm416sitff4=
Expand Down
32 changes: 30 additions & 2 deletions internal/pkg/pipeline/task/converter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Converter Task

The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), and other data format transformations.
The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), EML (Email), and other data format transformations.

## Function

Expand All @@ -21,7 +21,7 @@ The converter task transforms data between different formats. It receives record
|-------|------|---------|-------------|
| `name` | string | - | Task name for identification |
| `type` | string | `converter` | Must be "converter" |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx) |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx, eml) |
| `delimiter` | string| \t | Used only in sst converter for spliting key and value|

### CSV Format Options
Expand All @@ -39,6 +39,20 @@ The converter task transforms data between different formats. It receives record
|-------|------|---------|-------------|
| `container` | string | - | XPath expression to select specific container elements |

### EML Format Options

The EML converter does not have specific configuration options. It automatically parses the email content and extracts:

- **HTML Body**: Saved as `body.html`
- **Text Body**: Saved as `body.txt`
- **Attachments**: Saved with their original filenames (or sanitized/truncated if necessary)
- **Inline Images**: Saved with confirmable filenames

Metadata generated for each output:

- `filename`: The name of the output file
- `content_type`: The MIME type of the content

### SST Format Options
Convert a single line to the SSTable which could be stored on s3 or via file. It expects a single line as input

Expand All @@ -58,6 +72,7 @@ The converter supports the following formats:
- **CSV**: Converts CSV data to JSON with column mapping and type conversion
- **HTML**: Converts HTML to JSON representation with element structure
- **XLSX**: Converts Excel files to CSV format. **Note:** Each sheet in the Excel file is emitted as a separate record with the sheet name stored in the context (key: `xlsx_sheet_name`)
- **EML**: Converts EML (Email) files to their constituent parts (HTML body, Text body, Attachments)

## Example Configurations

Expand All @@ -84,6 +99,18 @@ tasks:
type: converter
format: html
container: "//div[@class='content']"

### EML processing:
```yaml
tasks:
- name: read_email
type: file
path: test_email
- name: process_email
type: converter
format: eml
# Output will correspond to body parts and attachments
```
```

### Excel to CSV conversion (all sheets):
Expand Down Expand Up @@ -140,6 +167,7 @@ tasks:
## Use Cases

- **Data format conversion**: Convert between different data formats
- **Email processing**: Parse MIME/EML files to extract bodies and attachments
- **Excel processing**: Extract and process data from Excel spreadsheets, with separate handling for each sheet
- **API integration**: Transform data for different API requirements
- **Database migration**: Convert data for different database systems
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
`html`: new(html),
`sst`: new(sst),
`xlsx`: new(xlsx),
`eml`: new(eml),
}

// let's figure out what converter we'll use
Expand Down
73 changes: 73 additions & 0 deletions internal/pkg/pipeline/task/converter/eml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package converter

import (
"bytes"
"fmt"
"mime"
"path/filepath"

"github.com/jhillyerd/enmime"
)

type eml struct{}

func (c *eml) convert(data []byte, _ string) ([]converterOutput, error) {

envelope, err := enmime.ReadEnvelope(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to parse EML: %v", err)
}

var outputs []converterOutput

addOutput := func(content []byte, fileName string, contentType string) {
if len(content) == 0 {
return
}

fileName = filepath.Base(fileName)

// Fallback for filename length
if len(fileName) > 200 {
ext := filepath.Ext(fileName)
base := fileName[:len(fileName)-len(ext)]
base = base[:200-len(ext)]
fileName = base + ext
}

// Fallback for missing content type
if contentType == "" {
opts := mime.TypeByExtension(filepath.Ext(fileName))
if opts != "" {
contentType = opts
} else {
contentType = "application/octet-stream"
}
}

outputs = append(outputs, converterOutput{
Data: content,
Metadata: map[string]string{
"filename": fileName,
"content_type": contentType,
},
})
}

if envelope.HTML != "" {
addOutput([]byte(envelope.HTML), "body.html", "text/html")
}
if envelope.Text != "" {
addOutput([]byte(envelope.Text), "body.txt", "text/plain")
}

for _, attachment := range envelope.Attachments {
addOutput(attachment.Content, attachment.FileName, attachment.ContentType)
}

for _, inline := range envelope.Inlines {
addOutput(inline.Content, inline.FileName, inline.ContentType)
}

return outputs, nil
}
10 changes: 10 additions & 0 deletions test/pipelines/converter/eml.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tasks:
- name: read_eml_file
type: file
path: test/pipelines/test_mail
- name: convert_from_eml
type: converter
format: eml
- name: save_output
type: file
path: output/{{ context "filename" }}
49 changes: 49 additions & 0 deletions test/pipelines/test_mail
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
From: sender@example.com
To: receiver@example.com
Subject: Test Email with PDF, CSV, and HTML Attachments
Date: Mon, 16 Feb 2026 10:00:00 +0000
Message-ID: <test123@example.com>
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="BOUNDARY123"

--BOUNDARY123
Content-Type: text/plain; charset="UTF-8"
Content-Transfer-Encoding: 7bit

Hello,

Please find attached the PDF, CSV, and HTML files for testing.

Regards,
Test Sender

--BOUNDARY123
Content-Type: application/pdf
Content-Disposition: attachment; filename="test.pdf"
Content-Transfer-Encoding: base64

JVBERi0xLjQKMSAwIG9iago8PAovVHlwZSAvQ2F0YWxvZwovUGFnZXMgMiAwIFIKPj4KZW5k
b2JqCjIgMCBvYmoKPDwKL1R5cGUgL1BhZ2VzCi9LaWRzIFszIDAgUl0KL0NvdW50IDEKPj4K
ZW5kb2JqCjMgMCBvYmoKPDwKL1R5cGUgL1BhZ2UKL1BhcmVudCAyIDAgUgovTWVkaWFCb3gg
WzAgMCAzMDAgMTQ0XQovQ29udGVudHMgNCAwIFIKPj4KZW5kb2JqCjQgMCBvYmoKPDwKL0xl
bmd0aCA0NAo+PgpzdHJlYW0KQlQKL0YxIDEyIFRmCjEwMCAxMDAgVGQKKFRlc3QgUERGIEZp
bGUpIFRqCkVUCmVuZHN0cmVhbQplbmRvYmoKeHJlZgowIDUKMDAwMDAwMDAwMCA2NTUzNSBm
IAowMDAwMDAwMDEwIDAwMDAwIG4gCjAwMDAwMDAwNTMgMDAwMDAgbiAKMDAwMDAwMDEwMCAw
MDAwMCBuIAowMDAwMDAwMTc1IDAwMDAwIG4gCnRyYWlsZXIKPDwKL1NpemUgNQovUm9vdCAx
IDAgUgo+PgpzdGFydHhyZWYKMjUwCiUlRU9G

--BOUNDARY123
Content-Type: text/csv
Content-Disposition: attachment; filename="test.csv"
Content-Transfer-Encoding: base64

bmFtZSxhZ2UKSm9obiwzMApBbGljZSwyNQ==

--BOUNDARY123
Content-Type: text/html
Content-Disposition: attachment; filename="test.html"
Content-Transfer-Encoding: base64

PGh0bWw+Cjxib2R5PkhlbGxvIFdvcmxkPC9ib2R5Pgo8L2h0bWw+

--BOUNDARY123--