Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/go-playground/validator/v10 v10.27.0
github.com/google/uuid v1.6.0
github.com/itchyny/gojq v0.12.17
github.com/jhillyerd/enmime v1.3.0
github.com/segmentio/kafka-go v0.4.49
github.com/stretchr/testify v1.11.1
github.com/xuri/excelize/v2 v2.10.0
Expand All @@ -28,6 +29,7 @@ require (
github.com/DataDog/zstd v1.5.7 // indirect
github.com/antchfx/xpath v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.12.0 // indirect
github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect
Expand All @@ -40,12 +42,16 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -55,7 +61,9 @@ require (
github.com/prometheus/procfs v0.17.0 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.4 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect
github.com/tiendc/go-deepcopy v1.7.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0=
github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE=
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a h1:MISbI8sU/PSK/ztvmWKFcI7UGb5/HQT7B+i3a2myKgI=
github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a/go.mod h1:2GxOXOlEPAMFPfp014mK1SWq8G8BN8o7/dfYqJrVGn8=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
Expand Down Expand Up @@ -88,8 +90,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f h1:3BSP1Tbs2djlpprl7wCLuiqMaUh5SJkkzI2gDs+FgLs=
github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f/go.mod h1:Pcatq5tYkCW2Q6yrR2VRHlbHpZ/R4/7qyL1TCF7vl14=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand All @@ -106,6 +112,10 @@ github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg=
github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 h1:iCHtR9CQyktQ5+f3dMVZfwD2KWJUgm7M0gdL9NGr8KA=
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056/go.mod h1:CVKlgaMiht+LXvHG173ujK6JUhZXKb2u/BQtjPDIvyk=
github.com/jhillyerd/enmime v1.3.0 h1:LV5kzfLidiOr8qRGIpYYmUZCnhrPbcFAnAFUnWn99rw=
github.com/jhillyerd/enmime v1.3.0/go.mod h1:6c6jg5HdRRV2FtvVL69LjiX1M8oE0xDX9VEhV3oy4gs=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand All @@ -116,8 +126,13 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand All @@ -140,11 +155,16 @@ github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7
github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg=
github.com/richardlehane/msoleps v1.0.4 h1:WuESlvhX3gH2IHcd8UqyCuFY5yiq/GR/yqaSM/9/g00=
github.com/richardlehane/msoleps v1.0.4/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo=
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tiendc/go-deepcopy v1.7.1 h1:LnubftI6nYaaMOcaz0LphzwraqN8jiWTwm416sitff4=
Expand Down
33 changes: 31 additions & 2 deletions internal/pkg/pipeline/task/converter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Converter Task

The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), and other data format transformations.
The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), EML (Email), and other data format transformations.

## Function

Expand All @@ -21,7 +21,7 @@ The converter task transforms data between different formats. It receives record
|-------|------|---------|-------------|
| `name` | string | - | Task name for identification |
| `type` | string | `converter` | Must be "converter" |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx) |
| `format` | string | - | Format to convert to (csv, html, sst, xlsx, eml) |
| `delimiter` | string| \t | Used only in sst converter for spliting key and value|

### CSV Format Options
Expand All @@ -39,6 +39,21 @@ The converter task transforms data between different formats. It receives record
|-------|------|---------|-------------|
| `container` | string | - | XPath expression to select specific container elements |

### EML Format Options

The EML converter does not have specific configuration options. It automatically parses the email content and extracts:

- **HTML Body**: Saved as `body.html`
- **Text Body**: Saved as `body.txt`
- **Headers**: Saved as `headers.json` (key-value pairs of all email headers)
- **Attachments**: Saved with their original filenames (or sanitized/truncated if necessary)
- **Inline Images**: Saved with confirmable filenames

Metadata generated for each output:

- `converter_filename`: The name of the output file
- `content_type`: The MIME type of the content

### SST Format Options
Convert a single line to the SSTable which could be stored on s3 or via file. It expects a single line as input

Expand All @@ -58,6 +73,7 @@ The converter supports the following formats:
- **CSV**: Converts CSV data to JSON with column mapping and type conversion
- **HTML**: Converts HTML to JSON representation with element structure
- **XLSX**: Converts Excel files to CSV format. **Note:** Each sheet in the Excel file is emitted as a separate record with the sheet name stored in the context (key: `xlsx_sheet_name`)
- **EML**: Converts EML (Email) files to their constituent parts (HTML body, Text body, Attachments)

## Example Configurations

Expand All @@ -84,6 +100,18 @@ tasks:
type: converter
format: html
container: "//div[@class='content']"

### EML processing:
```yaml
tasks:
- name: read_email
type: file
path: test_email
- name: process_email
type: converter
format: eml
# Output will correspond to body parts and attachments
```
```

### Excel to CSV conversion (all sheets):
Expand Down Expand Up @@ -140,6 +168,7 @@ tasks:
## Use Cases

- **Data format conversion**: Convert between different data formats
- **Email processing**: Parse MIME/EML files to extract bodies and attachments
- **Excel processing**: Extract and process data from Excel spreadsheets, with separate handling for each sheet
- **API integration**: Transform data for different API requirements
- **Database migration**: Convert data for different database systems
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/pipeline/task/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error {
`html`: new(html),
`sst`: new(sst),
`xlsx`: new(xlsx),
`eml`: new(eml),
}

// let's figure out what converter we'll use
Expand Down
101 changes: 101 additions & 0 deletions internal/pkg/pipeline/task/converter/eml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package converter

import (
"bytes"
"encoding/json"
"fmt"
"mime"
"path/filepath"

"github.com/jhillyerd/enmime"
)

type eml struct{}

const maxFilenameLength = 200

func (c *eml) convert(data []byte, _ string) ([]converterOutput, error) {

envelope, err := enmime.ReadEnvelope(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to parse EML: %v", err)
}

var outputs []converterOutput

if envelope.HTML != "" {
if out := c.processOutput([]byte(envelope.HTML), "body.html", "text/html"); out != nil {
outputs = append(outputs, *out)
}
}
if envelope.Text != "" {
if out := c.processOutput([]byte(envelope.Text), "body.txt", "text/plain"); out != nil {
outputs = append(outputs, *out)
}
}

// Extract headers
headerMap := make(map[string]string)
for _, key := range envelope.GetHeaderKeys() {
headerMap[key] = envelope.GetHeader(key)
}
if len(headerMap) > 0 {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
enc.SetIndent("", " ")
if err := enc.Encode(headerMap); err == nil {
if out := c.processOutput(buf.Bytes(), "headers.json", "application/json"); out != nil {
outputs = append(outputs, *out)
}
}
}

for _, attachment := range envelope.Attachments {
if out := c.processOutput(attachment.Content, attachment.FileName, attachment.ContentType); out != nil {
outputs = append(outputs, *out)
}
}

for _, inline := range envelope.Inlines {
if out := c.processOutput(inline.Content, inline.FileName, inline.ContentType); out != nil {
outputs = append(outputs, *out)
}
}

return outputs, nil
}

func (c *eml) processOutput(content []byte, fileName string, contentType string) *converterOutput {
if len(content) == 0 {
return nil
}

fileName = filepath.Base(fileName)

// Fallback for filename length
if len(fileName) > maxFilenameLength {
ext := filepath.Ext(fileName)
base := fileName[:len(fileName)-len(ext)]
base = base[:maxFilenameLength-len(ext)]
fileName = base + ext
}

// Fallback for missing content type
if contentType == "" {
opts := mime.TypeByExtension(filepath.Ext(fileName))
if opts != "" {
contentType = opts
} else {
contentType = "application/octet-stream"
}
}

return &converterOutput{
Data: content,
Metadata: map[string]string{
"converter_filename": fileName,
"content_type": contentType,
},
}
}
10 changes: 10 additions & 0 deletions test/pipelines/converter/eml.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tasks:
- name: read_eml_file
type: file
path: test/pipelines/converter/test_mail
- name: convert_from_eml
type: converter
format: eml
- name: save_output
type: file
path: output/{{ context "converter_filename" }}
49 changes: 49 additions & 0 deletions test/pipelines/converter/test_mail
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
From: sender@example.com
To: receiver@example.com
Subject: Test Email with PDF, CSV, and HTML Attachments
Date: Mon, 16 Feb 2026 10:00:00 +0000
Message-ID: <test123@example.com>
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="BOUNDARY123"

--BOUNDARY123
Content-Type: text/plain; charset="UTF-8"
Content-Transfer-Encoding: 7bit

Hello,

Please find attached the PDF, CSV, and HTML files for testing.

Regards,
Test Sender

--BOUNDARY123
Content-Type: application/pdf
Content-Disposition: attachment; filename="test.pdf"
Content-Transfer-Encoding: base64

JVBERi0xLjQKMSAwIG9iago8PAovVHlwZSAvQ2F0YWxvZwovUGFnZXMgMiAwIFIKPj4KZW5k
b2JqCjIgMCBvYmoKPDwKL1R5cGUgL1BhZ2VzCi9LaWRzIFszIDAgUl0KL0NvdW50IDEKPj4K
ZW5kb2JqCjMgMCBvYmoKPDwKL1R5cGUgL1BhZ2UKL1BhcmVudCAyIDAgUgovTWVkaWFCb3gg
WzAgMCAzMDAgMTQ0XQovQ29udGVudHMgNCAwIFIKPj4KZW5kb2JqCjQgMCBvYmoKPDwKL0xl
bmd0aCA0NAo+PgpzdHJlYW0KQlQKL0YxIDEyIFRmCjEwMCAxMDAgVGQKKFRlc3QgUERGIEZp
bGUpIFRqCkVUCmVuZHN0cmVhbQplbmRvYmoKeHJlZgowIDUKMDAwMDAwMDAwMCA2NTUzNSBm
IAowMDAwMDAwMDEwIDAwMDAwIG4gCjAwMDAwMDAwNTMgMDAwMDAgbiAKMDAwMDAwMDEwMCAw
MDAwMCBuIAowMDAwMDAwMTc1IDAwMDAwIG4gCnRyYWlsZXIKPDwKL1NpemUgNQovUm9vdCAx
IDAgUgo+PgpzdGFydHhyZWYKMjUwCiUlRU9G

--BOUNDARY123
Content-Type: text/csv
Content-Disposition: attachment; filename="test.csv"
Content-Transfer-Encoding: base64

bmFtZSxhZ2UKSm9obiwzMApBbGljZSwyNQ==

--BOUNDARY123
Content-Type: text/html
Content-Disposition: attachment; filename="test.html"
Content-Transfer-Encoding: base64

PGh0bWw+Cjxib2R5PkhlbGxvIFdvcmxkPC9ib2R5Pgo8L2h0bWw+

--BOUNDARY123--