diff --git a/go.mod b/go.mod index aa674e3..3eb63c9 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/go-playground/validator/v10 v10.27.0 github.com/google/uuid v1.6.0 github.com/itchyny/gojq v0.12.17 + github.com/jhillyerd/enmime v1.3.0 github.com/segmentio/kafka-go v0.4.49 github.com/stretchr/testify v1.11.1 github.com/xuri/excelize/v2 v2.10.0 @@ -28,6 +29,7 @@ require ( github.com/DataDog/zstd v1.5.7 // indirect github.com/antchfx/xpath v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.12.0 // indirect github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect @@ -40,12 +42,16 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect + github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -55,7 +61,9 @@ require ( github.com/prometheus/procfs v0.17.0 // indirect github.com/richardlehane/mscfb v1.0.4 // indirect github.com/richardlehane/msoleps v1.0.4 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect github.com/tiendc/go-deepcopy v1.7.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index a04a48b..2c49616 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0= github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE= +github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a h1:MISbI8sU/PSK/ztvmWKFcI7UGb5/HQT7B+i3a2myKgI= +github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a/go.mod h1:2GxOXOlEPAMFPfp014mK1SWq8G8BN8o7/dfYqJrVGn8= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= @@ -88,8 +90,12 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4= github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo= +github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= +github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f h1:3BSP1Tbs2djlpprl7wCLuiqMaUh5SJkkzI2gDs+FgLs= +github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f/go.mod h1:Pcatq5tYkCW2Q6yrR2VRHlbHpZ/R4/7qyL1TCF7vl14= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -106,6 +112,10 @@ github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg= github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY= github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= +github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 h1:iCHtR9CQyktQ5+f3dMVZfwD2KWJUgm7M0gdL9NGr8KA= +github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056/go.mod h1:CVKlgaMiht+LXvHG173ujK6JUhZXKb2u/BQtjPDIvyk= +github.com/jhillyerd/enmime v1.3.0 h1:LV5kzfLidiOr8qRGIpYYmUZCnhrPbcFAnAFUnWn99rw= +github.com/jhillyerd/enmime v1.3.0/go.mod h1:6c6jg5HdRRV2FtvVL69LjiX1M8oE0xDX9VEhV3oy4gs= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -116,8 +126,13 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -140,11 +155,16 @@ github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7 github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= github.com/richardlehane/msoleps v1.0.4 h1:WuESlvhX3gH2IHcd8UqyCuFY5yiq/GR/yqaSM/9/g00= github.com/richardlehane/msoleps v1.0.4/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= +github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo= +github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tiendc/go-deepcopy v1.7.1 h1:LnubftI6nYaaMOcaz0LphzwraqN8jiWTwm416sitff4= diff --git a/internal/pkg/pipeline/task/converter/README.md b/internal/pkg/pipeline/task/converter/README.md index 57ff96d..6638198 100644 --- a/internal/pkg/pipeline/task/converter/README.md +++ b/internal/pkg/pipeline/task/converter/README.md @@ -1,6 +1,6 @@ # Converter Task -The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), and other data format transformations. +The `converter` task converts data between different formats, supporting CSV, HTML, XLSX (Excel), EML (Email), and other data format transformations. ## Function @@ -21,7 +21,7 @@ The converter task transforms data between different formats. It receives record |-------|------|---------|-------------| | `name` | string | - | Task name for identification | | `type` | string | `converter` | Must be "converter" | -| `format` | string | - | Format to convert to (csv, html, sst, xlsx) | +| `format` | string | - | Format to convert to (csv, html, sst, xlsx, eml) | | `delimiter` | string| \t | Used only in sst converter for spliting key and value| ### CSV Format Options @@ -39,6 +39,21 @@ The converter task transforms data between different formats. It receives record |-------|------|---------|-------------| | `container` | string | - | XPath expression to select specific container elements | +### EML Format Options + +The EML converter does not have specific configuration options. It automatically parses the email content and extracts: + +- **HTML Body**: Saved as `body.html` +- **Text Body**: Saved as `body.txt` +- **Headers**: Saved as `headers.json` (key-value pairs of all email headers) +- **Attachments**: Saved with their original filenames (or sanitized/truncated if necessary) +- **Inline Images**: Saved with confirmable filenames + +Metadata generated for each output: + +- `converter_filename`: The name of the output file +- `content_type`: The MIME type of the content + ### SST Format Options Convert a single line to the SSTable which could be stored on s3 or via file. It expects a single line as input @@ -58,6 +73,7 @@ The converter supports the following formats: - **CSV**: Converts CSV data to JSON with column mapping and type conversion - **HTML**: Converts HTML to JSON representation with element structure - **XLSX**: Converts Excel files to CSV format. **Note:** Each sheet in the Excel file is emitted as a separate record with the sheet name stored in the context (key: `xlsx_sheet_name`) +- **EML**: Converts EML (Email) files to their constituent parts (HTML body, Text body, Attachments) ## Example Configurations @@ -84,6 +100,18 @@ tasks: type: converter format: html container: "//div[@class='content']" + +### EML processing: +```yaml +tasks: + - name: read_email + type: file + path: test_email + - name: process_email + type: converter + format: eml + # Output will correspond to body parts and attachments +``` ``` ### Excel to CSV conversion (all sheets): @@ -140,6 +168,7 @@ tasks: ## Use Cases - **Data format conversion**: Convert between different data formats +- **Email processing**: Parse MIME/EML files to extract bodies and attachments - **Excel processing**: Extract and process data from Excel spreadsheets, with separate handling for each sheet - **API integration**: Transform data for different API requirements - **Database migration**: Convert data for different database systems diff --git a/internal/pkg/pipeline/task/converter/converter.go b/internal/pkg/pipeline/task/converter/converter.go index 8a774bc..b9d38b7 100644 --- a/internal/pkg/pipeline/task/converter/converter.go +++ b/internal/pkg/pipeline/task/converter/converter.go @@ -40,6 +40,7 @@ func (c *core) UnmarshalYAML(unmarshal func(interface{}) error) error { `html`: new(html), `sst`: new(sst), `xlsx`: new(xlsx), + `eml`: new(eml), } // let's figure out what converter we'll use diff --git a/internal/pkg/pipeline/task/converter/eml.go b/internal/pkg/pipeline/task/converter/eml.go new file mode 100644 index 0000000..16c85c5 --- /dev/null +++ b/internal/pkg/pipeline/task/converter/eml.go @@ -0,0 +1,101 @@ +package converter + +import ( + "bytes" + "encoding/json" + "fmt" + "mime" + "path/filepath" + + "github.com/jhillyerd/enmime" +) + +type eml struct{} + +const maxFilenameLength = 200 + +func (c *eml) convert(data []byte, _ string) ([]converterOutput, error) { + + envelope, err := enmime.ReadEnvelope(bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("failed to parse EML: %v", err) + } + + var outputs []converterOutput + + if envelope.HTML != "" { + if out := c.processOutput([]byte(envelope.HTML), "body.html", "text/html"); out != nil { + outputs = append(outputs, *out) + } + } + if envelope.Text != "" { + if out := c.processOutput([]byte(envelope.Text), "body.txt", "text/plain"); out != nil { + outputs = append(outputs, *out) + } + } + + // Extract headers + headerMap := make(map[string]string) + for _, key := range envelope.GetHeaderKeys() { + headerMap[key] = envelope.GetHeader(key) + } + if len(headerMap) > 0 { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetEscapeHTML(false) + enc.SetIndent("", " ") + if err := enc.Encode(headerMap); err == nil { + if out := c.processOutput(buf.Bytes(), "headers.json", "application/json"); out != nil { + outputs = append(outputs, *out) + } + } + } + + for _, attachment := range envelope.Attachments { + if out := c.processOutput(attachment.Content, attachment.FileName, attachment.ContentType); out != nil { + outputs = append(outputs, *out) + } + } + + for _, inline := range envelope.Inlines { + if out := c.processOutput(inline.Content, inline.FileName, inline.ContentType); out != nil { + outputs = append(outputs, *out) + } + } + + return outputs, nil +} + +func (c *eml) processOutput(content []byte, fileName string, contentType string) *converterOutput { + if len(content) == 0 { + return nil + } + + fileName = filepath.Base(fileName) + + // Fallback for filename length + if len(fileName) > maxFilenameLength { + ext := filepath.Ext(fileName) + base := fileName[:len(fileName)-len(ext)] + base = base[:maxFilenameLength-len(ext)] + fileName = base + ext + } + + // Fallback for missing content type + if contentType == "" { + opts := mime.TypeByExtension(filepath.Ext(fileName)) + if opts != "" { + contentType = opts + } else { + contentType = "application/octet-stream" + } + } + + return &converterOutput{ + Data: content, + Metadata: map[string]string{ + "converter_filename": fileName, + "content_type": contentType, + }, + } +} diff --git a/test/pipelines/converter/eml.yaml b/test/pipelines/converter/eml.yaml new file mode 100644 index 0000000..683fb38 --- /dev/null +++ b/test/pipelines/converter/eml.yaml @@ -0,0 +1,10 @@ +tasks: + - name: read_eml_file + type: file + path: test/pipelines/converter/test_mail + - name: convert_from_eml + type: converter + format: eml + - name: save_output + type: file + path: output/{{ context "converter_filename" }} \ No newline at end of file diff --git a/test/pipelines/converter/test_mail b/test/pipelines/converter/test_mail new file mode 100644 index 0000000..1c1d5c4 --- /dev/null +++ b/test/pipelines/converter/test_mail @@ -0,0 +1,49 @@ +From: sender@example.com +To: receiver@example.com +Subject: Test Email with PDF, CSV, and HTML Attachments +Date: Mon, 16 Feb 2026 10:00:00 +0000 +Message-ID: +MIME-Version: 1.0 +Content-Type: multipart/mixed; boundary="BOUNDARY123" + +--BOUNDARY123 +Content-Type: text/plain; charset="UTF-8" +Content-Transfer-Encoding: 7bit + +Hello, + +Please find attached the PDF, CSV, and HTML files for testing. + +Regards, +Test Sender + +--BOUNDARY123 +Content-Type: application/pdf +Content-Disposition: attachment; filename="test.pdf" +Content-Transfer-Encoding: base64 + +JVBERi0xLjQKMSAwIG9iago8PAovVHlwZSAvQ2F0YWxvZwovUGFnZXMgMiAwIFIKPj4KZW5k +b2JqCjIgMCBvYmoKPDwKL1R5cGUgL1BhZ2VzCi9LaWRzIFszIDAgUl0KL0NvdW50IDEKPj4K +ZW5kb2JqCjMgMCBvYmoKPDwKL1R5cGUgL1BhZ2UKL1BhcmVudCAyIDAgUgovTWVkaWFCb3gg +WzAgMCAzMDAgMTQ0XQovQ29udGVudHMgNCAwIFIKPj4KZW5kb2JqCjQgMCBvYmoKPDwKL0xl +bmd0aCA0NAo+PgpzdHJlYW0KQlQKL0YxIDEyIFRmCjEwMCAxMDAgVGQKKFRlc3QgUERGIEZp +bGUpIFRqCkVUCmVuZHN0cmVhbQplbmRvYmoKeHJlZgowIDUKMDAwMDAwMDAwMCA2NTUzNSBm +IAowMDAwMDAwMDEwIDAwMDAwIG4gCjAwMDAwMDAwNTMgMDAwMDAgbiAKMDAwMDAwMDEwMCAw +MDAwMCBuIAowMDAwMDAwMTc1IDAwMDAwIG4gCnRyYWlsZXIKPDwKL1NpemUgNQovUm9vdCAx +IDAgUgo+PgpzdGFydHhyZWYKMjUwCiUlRU9G + +--BOUNDARY123 +Content-Type: text/csv +Content-Disposition: attachment; filename="test.csv" +Content-Transfer-Encoding: base64 + +bmFtZSxhZ2UKSm9obiwzMApBbGljZSwyNQ== + +--BOUNDARY123 +Content-Type: text/html +Content-Disposition: attachment; filename="test.html" +Content-Transfer-Encoding: base64 + +PGh0bWw+Cjxib2R5PkhlbGxvIFdvcmxkPC9ib2R5Pgo8L2h0bWw+ + +--BOUNDARY123--