Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add script processor using Tengo #428

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -32,3 +32,5 @@ meteor-plugin-*

# build
/dist

.playground
16 changes: 15 additions & 1 deletion docs/docs/reference/processors.md
Original file line number Diff line number Diff line change
@@ -28,4 +28,18 @@ processors:

This processor will append Asset's Labels with value from given config.

[More details](https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md)
[More details][labels-readme]

## Script

Script processor uses the user specified script to transform each asset emitted
from the extractor. Currently, [Tengo][tengo] is the only supported script
engine.

[More details][script-readme]

[labels-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md

[script-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/script/README.md

[tengo]: https://github.com/d5/tengo
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
github.com/cenkalti/backoff/v4 v4.1.2
github.com/d5/tengo/v2 v2.13.0
github.com/denisenkom/go-mssqldb v0.10.0
github.com/dnaeon/go-vcr/v2 v2.0.1
github.com/elastic/go-elasticsearch v0.0.0
@@ -31,7 +32,9 @@ require (
github.com/go-playground/validator/v10 v10.10.0
github.com/go-sql-driver/mysql v1.6.0
github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de
github.com/google/go-cmp v0.5.8
github.com/google/go-github/v37 v37.0.0
github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/lib/pq v1.10.4
@@ -50,6 +53,7 @@ require (
github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c
github.com/schollz/progressbar/v3 v3.8.5
github.com/segmentio/kafka-go v0.4.17
github.com/sergi/go-diff v1.1.0 // indirect
github.com/sijms/go-ora/v2 v2.2.22
github.com/snowflakedb/gosnowflake v1.6.7
github.com/spf13/cast v1.5.0 // indirect
@@ -74,8 +78,3 @@ require (
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -501,6 +501,8 @@ github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1S
github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s=
github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=
github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I=
github.com/d5/tengo/v2 v2.13.0 h1:4pZ5mR4vjOejpp+PMeIMpjZdObK7iwWoLTpVyhT+0Jk=
github.com/d5/tengo/v2 v2.13.0/go.mod h1:XRGjEs5I9jYIKTxly6HCF8oiiilk5E/RYXOZ5b0DZC8=
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 h1:y5HC9v93H5EPKqaS1UYVg1uYah5Xf51mBfIoWehClUQ=
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964/go.mod h1:Xd9hchkHSWYkEqJwUGisez3G1QY8Ryz0sdWrLPMGjLk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
21 changes: 21 additions & 0 deletions plugins/internal/tengoutil/secure_script.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tengoutil

import (
"github.com/d5/tengo/v2"
"github.com/d5/tengo/v2/stdlib"
)

const (
maxAllocs = 5000
maxConsts = 500
)

func NewSecureScript(input []byte) *tengo.Script {
s := tengo.NewScript(input)

s.SetImports(stdlib.GetModuleMap(stdlib.AllModuleNames()...))
s.SetMaxAllocs(maxAllocs)
s.SetMaxConstObjects(maxConsts)

return s
}
36 changes: 36 additions & 0 deletions plugins/internal/tengoutil/secure_script_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//go:build plugins
// +build plugins

package tengoutil

import (
"testing"

"github.com/MakeNowJust/heredoc"
"github.com/stretchr/testify/assert"
)

func TestNewSecureScript(t *testing.T) {
t.Run("Allows import of builtin modules", func(t *testing.T) {
s := NewSecureScript(([]byte)(heredoc.Doc(`
math := import("math")
os := import("os")
text := import("text")
times := import("times")
rand := import("rand")
fmt := import("fmt")
json := import("json")
base64 := import("base64")
hex := import("hex")
enum := import("enum")
`)))
_, err := s.Compile()
assert.NoError(t, err)
})

t.Run("File import disallowed", func(t *testing.T) {
s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`))
_, err := s.Compile()
assert.ErrorContains(t, err, "Compile Error: module './testdata/sum' not found")
})
}
59 changes: 59 additions & 0 deletions plugins/internal/tengoutil/structmap/asset_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package structmap

import (
"fmt"

v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
)

type AssetWrapper struct {
A *v1beta2.Asset
}

func (w AssetWrapper) AsMap() (map[string]interface{}, error) {
v, err := AsMap(w.A)
if err != nil {
return nil, fmt.Errorf("structmap: asset as map: %w", err)
}

m, ok := v.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("structmap: asset as map: unexpected type for asset map: %T", v)
}

return m, err
}

func (w *AssetWrapper) OverwriteWith(m map[string]interface{}) error {
dataMap, ok := m["data"].(map[string]interface{})
if !ok {
return fmt.Errorf("structmap: overwrite asset: unexpected type for asset data: %T", m["data"])
}

mt, err := protoregistry.GlobalTypes.FindMessageByName(w.A.Data.MessageName())
if err != nil {
return fmt.Errorf("structmap: overwrite asset: resolve type by full name %s: %w", w.A.Data.MessageName(), err)
}

msg := mt.New().Interface()
delete(dataMap, "@type")
if err := AsStruct(m["data"], &msg); err != nil {
return fmt.Errorf("structmap: overwrite asset: decode asset data: %w", err)
}

delete(m, "data")
if err := AsStruct(m, w.A); err != nil {
return fmt.Errorf("structmap: overwrite asset: decode asset: %w", err)
}

data, err := anypb.New(msg)
if err != nil {
return fmt.Errorf("structmap: overwrite asset: marshal data as any: %w", err)
}

w.A.Data = data

return nil
}
Loading