Skip to content

Commit 6cf9737

Browse files
authoredNov 8, 2022
feat: add script processor using Tengo (#428)
- Implement a new 'script' processor with 'tengo' as the only supported script engine. - Add internal packages for translating asset proto value into map[string]interface{} and for updating asset proto value with the given map.
1 parent 6495614 commit 6cf9737

17 files changed

+1331
-8
lines changed
 

‎.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@ meteor-plugin-*
3232

3333
# build
3434
/dist
35+
36+
.playground

‎docs/docs/reference/processors.md

+15-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,18 @@ processors:
2828

2929
This processor will append Asset's Labels with value from given config.
3030

31-
[More details](https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md)
31+
[More details][labels-readme]
32+
33+
## Script
34+
35+
Script processor uses the user specified script to transform each asset emitted
36+
from the extractor. Currently, [Tengo][tengo] is the only supported script
37+
engine.
38+
39+
[More details][script-readme]
40+
41+
[labels-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md
42+
43+
[script-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/script/README.md
44+
45+
[tengo]: https://github.com/d5/tengo

‎go.mod

+4-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/benbjohnson/clock v1.3.0 // indirect
1717
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
1818
github.com/cenkalti/backoff/v4 v4.1.2
19+
github.com/d5/tengo/v2 v2.13.0
1920
github.com/denisenkom/go-mssqldb v0.10.0
2021
github.com/dnaeon/go-vcr/v2 v2.0.1
2122
github.com/elastic/go-elasticsearch v0.0.0
@@ -31,7 +32,9 @@ require (
3132
github.com/go-playground/validator/v10 v10.10.0
3233
github.com/go-sql-driver/mysql v1.6.0
3334
github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de
35+
github.com/google/go-cmp v0.5.8
3436
github.com/google/go-github/v37 v37.0.0
37+
github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect
3538
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
3639
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
3740
github.com/lib/pq v1.10.4
@@ -50,6 +53,7 @@ require (
5053
github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c
5154
github.com/schollz/progressbar/v3 v3.8.5
5255
github.com/segmentio/kafka-go v0.4.17
56+
github.com/sergi/go-diff v1.1.0 // indirect
5357
github.com/sijms/go-ora/v2 v2.2.22
5458
github.com/snowflakedb/gosnowflake v1.6.7
5559
github.com/spf13/cast v1.5.0 // indirect
@@ -74,8 +78,3 @@ require (
7478
gopkg.in/ini.v1 v1.66.6 // indirect
7579
gopkg.in/yaml.v3 v3.0.1
7680
)
77-
78-
require (
79-
github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect
80-
github.com/sergi/go-diff v1.1.0 // indirect
81-
)

‎go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,8 @@ github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1S
501501
github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s=
502502
github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=
503503
github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I=
504+
github.com/d5/tengo/v2 v2.13.0 h1:4pZ5mR4vjOejpp+PMeIMpjZdObK7iwWoLTpVyhT+0Jk=
505+
github.com/d5/tengo/v2 v2.13.0/go.mod h1:XRGjEs5I9jYIKTxly6HCF8oiiilk5E/RYXOZ5b0DZC8=
504506
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 h1:y5HC9v93H5EPKqaS1UYVg1uYah5Xf51mBfIoWehClUQ=
505507
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964/go.mod h1:Xd9hchkHSWYkEqJwUGisez3G1QY8Ryz0sdWrLPMGjLk=
506508
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package tengoutil
2+
3+
import (
4+
"github.com/d5/tengo/v2"
5+
"github.com/d5/tengo/v2/stdlib"
6+
)
7+
8+
const (
9+
maxAllocs = 5000
10+
maxConsts = 500
11+
)
12+
13+
func NewSecureScript(input []byte) *tengo.Script {
14+
s := tengo.NewScript(input)
15+
16+
s.SetImports(stdlib.GetModuleMap(stdlib.AllModuleNames()...))
17+
s.SetMaxAllocs(maxAllocs)
18+
s.SetMaxConstObjects(maxConsts)
19+
20+
return s
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//go:build plugins
2+
// +build plugins
3+
4+
package tengoutil
5+
6+
import (
7+
"testing"
8+
9+
"github.com/MakeNowJust/heredoc"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestNewSecureScript(t *testing.T) {
14+
t.Run("Allows import of builtin modules", func(t *testing.T) {
15+
s := NewSecureScript(([]byte)(heredoc.Doc(`
16+
math := import("math")
17+
os := import("os")
18+
text := import("text")
19+
times := import("times")
20+
rand := import("rand")
21+
fmt := import("fmt")
22+
json := import("json")
23+
base64 := import("base64")
24+
hex := import("hex")
25+
enum := import("enum")
26+
`)))
27+
_, err := s.Compile()
28+
assert.NoError(t, err)
29+
})
30+
31+
t.Run("File import disallowed", func(t *testing.T) {
32+
s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`))
33+
_, err := s.Compile()
34+
assert.ErrorContains(t, err, "Compile Error: module './testdata/sum' not found")
35+
})
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package structmap
2+
3+
import (
4+
"fmt"
5+
6+
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
7+
"google.golang.org/protobuf/reflect/protoregistry"
8+
"google.golang.org/protobuf/types/known/anypb"
9+
)
10+
11+
type AssetWrapper struct {
12+
A *v1beta2.Asset
13+
}
14+
15+
func (w AssetWrapper) AsMap() (map[string]interface{}, error) {
16+
v, err := AsMap(w.A)
17+
if err != nil {
18+
return nil, fmt.Errorf("structmap: asset as map: %w", err)
19+
}
20+
21+
m, ok := v.(map[string]interface{})
22+
if !ok {
23+
return nil, fmt.Errorf("structmap: asset as map: unexpected type for asset map: %T", v)
24+
}
25+
26+
return m, err
27+
}
28+
29+
func (w *AssetWrapper) OverwriteWith(m map[string]interface{}) error {
30+
dataMap, ok := m["data"].(map[string]interface{})
31+
if !ok {
32+
return fmt.Errorf("structmap: overwrite asset: unexpected type for asset data: %T", m["data"])
33+
}
34+
35+
mt, err := protoregistry.GlobalTypes.FindMessageByName(w.A.Data.MessageName())
36+
if err != nil {
37+
return fmt.Errorf("structmap: overwrite asset: resolve type by full name %s: %w", w.A.Data.MessageName(), err)
38+
}
39+
40+
msg := mt.New().Interface()
41+
delete(dataMap, "@type")
42+
if err := AsStruct(m["data"], &msg); err != nil {
43+
return fmt.Errorf("structmap: overwrite asset: decode asset data: %w", err)
44+
}
45+
46+
delete(m, "data")
47+
if err := AsStruct(m, w.A); err != nil {
48+
return fmt.Errorf("structmap: overwrite asset: decode asset: %w", err)
49+
}
50+
51+
data, err := anypb.New(msg)
52+
if err != nil {
53+
return fmt.Errorf("structmap: overwrite asset: marshal data as any: %w", err)
54+
}
55+
56+
w.A.Data = data
57+
58+
return nil
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
//go:build plugins
2+
// +build plugins
3+
4+
package structmap
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
11+
testutils "github.com/odpf/meteor/test/utils"
12+
"github.com/stretchr/testify/assert"
13+
"google.golang.org/protobuf/types/known/anypb"
14+
"google.golang.org/protobuf/types/known/structpb"
15+
"google.golang.org/protobuf/types/known/timestamppb"
16+
)
17+
18+
func TestAssetWrapper_AsMap(t *testing.T) {
19+
cases := []struct {
20+
name string
21+
w AssetWrapper
22+
expected map[string]interface{}
23+
}{
24+
{
25+
name: "AssetWithFeatureTable",
26+
w: AssetWrapper{A: &v1beta2.Asset{
27+
Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
28+
Name: "avg_dispatch_arrival_time_10_mins",
29+
Service: "caramlstore",
30+
Type: "feature_table",
31+
Data: testutils.BuildAny(t, &v1beta2.FeatureTable{
32+
Namespace: "sauron",
33+
Entities: []*v1beta2.FeatureTable_Entity{
34+
{Name: "merchant_uuid", Labels: map[string]string{
35+
"description": "merchant uuid",
36+
"value_type": "STRING",
37+
}},
38+
},
39+
Features: []*v1beta2.Feature{
40+
{Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"},
41+
{Name: "ongoing_orders", DataType: "INT64"},
42+
{Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"},
43+
{Name: "ongoing_accepted_orders", DataType: "INT64"},
44+
},
45+
CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)),
46+
UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)),
47+
}),
48+
Lineage: &v1beta2.Lineage{
49+
Upstreams: []*v1beta2.Resource{
50+
{
51+
Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
52+
Service: "kafka",
53+
Type: "topic",
54+
},
55+
},
56+
},
57+
}},
58+
expected: map[string]interface{}{
59+
"data": map[string]interface{}{
60+
"@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable",
61+
"create_time": "2022-09-19T22:42:04Z",
62+
"entities": []interface{}{
63+
map[string]interface{}{
64+
"labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"},
65+
"name": "merchant_uuid",
66+
},
67+
},
68+
"features": []interface{}{
69+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"},
70+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"},
71+
map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"},
72+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"},
73+
},
74+
"namespace": "sauron",
75+
"update_time": "2022-09-21T13:23:02Z",
76+
},
77+
"lineage": map[string]interface{}{
78+
"upstreams": []interface{}{
79+
map[string]interface{}{
80+
"service": "kafka",
81+
"type": "topic",
82+
"urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
83+
},
84+
},
85+
},
86+
"name": "avg_dispatch_arrival_time_10_mins",
87+
"service": "caramlstore",
88+
"type": "feature_table",
89+
"urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
90+
},
91+
},
92+
{
93+
name: "AssetWithTable",
94+
w: AssetWrapper{A: &v1beta2.Asset{
95+
Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
96+
Name: "applicant",
97+
Type: "table",
98+
Service: "cassandra",
99+
Data: testutils.BuildAny(t, &v1beta2.Table{
100+
Columns: []*v1beta2.Column{
101+
{Name: "applicantid", DataType: "int"},
102+
{Name: "first_name", DataType: "text"},
103+
{Name: "last_name", DataType: "text"},
104+
},
105+
Attributes: &structpb.Struct{
106+
Fields: map[string]*structpb.Value{
107+
"id": structpb.NewStringValue("test-id"),
108+
"name": structpb.NewStringValue("test-name"),
109+
},
110+
},
111+
}),
112+
}},
113+
expected: map[string]interface{}{
114+
"data": map[string]interface{}{
115+
"@type": "type.googleapis.com/odpf.assets.v1beta2.Table",
116+
"columns": []interface{}{
117+
map[string]interface{}{"data_type": "int", "name": "applicantid"},
118+
map[string]interface{}{"data_type": "text", "name": "first_name"},
119+
map[string]interface{}{"data_type": "text", "name": "last_name"},
120+
},
121+
"attributes": map[string]interface{}{
122+
"id": "test-id",
123+
"name": "test-name",
124+
},
125+
},
126+
"name": "applicant",
127+
"service": "cassandra",
128+
"type": "table",
129+
"urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
130+
},
131+
},
132+
}
133+
for _, tc := range cases {
134+
t.Run(tc.name, func(t *testing.T) {
135+
res, err := tc.w.AsMap()
136+
assert.NoError(t, err)
137+
assert.Equal(t, tc.expected, res)
138+
})
139+
}
140+
}
141+
142+
func TestAssetWrapper_OverwriteWith(t *testing.T) {
143+
cases := []struct {
144+
name string
145+
w AssetWrapper
146+
input map[string]interface{}
147+
expected *v1beta2.Asset
148+
expectedErr bool
149+
}{
150+
{
151+
name: "AssetWithFeatureTable",
152+
w: AssetWrapper{A: &v1beta2.Asset{
153+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.FeatureTable"},
154+
}},
155+
input: map[string]interface{}{
156+
"data": map[string]interface{}{
157+
"@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable",
158+
"create_time": "2022-09-19T22:42:04Z",
159+
"entities": []interface{}{
160+
map[string]interface{}{
161+
"labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"},
162+
"name": "merchant_uuid",
163+
},
164+
},
165+
"features": []interface{}{
166+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"},
167+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"},
168+
map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"},
169+
map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"},
170+
},
171+
"namespace": "sauron",
172+
"update_time": "2022-09-21T13:23:02Z",
173+
},
174+
"lineage": map[string]interface{}{
175+
"upstreams": []interface{}{
176+
map[string]interface{}{
177+
"service": "kafka",
178+
"type": "topic",
179+
"urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
180+
},
181+
},
182+
},
183+
"create_time": "2022-10-19T22:42:04Z",
184+
"name": "avg_dispatch_arrival_time_10_mins",
185+
"service": "caramlstore",
186+
"type": "feature_table",
187+
"urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
188+
},
189+
expected: &v1beta2.Asset{
190+
Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
191+
Name: "avg_dispatch_arrival_time_10_mins",
192+
Service: "caramlstore",
193+
Type: "feature_table",
194+
Data: testutils.BuildAny(t, &v1beta2.FeatureTable{
195+
Namespace: "sauron",
196+
Entities: []*v1beta2.FeatureTable_Entity{
197+
{
198+
Name: "merchant_uuid",
199+
Labels: map[string]string{"description": "merchant uuid", "value_type": "STRING"},
200+
},
201+
},
202+
Features: []*v1beta2.Feature{
203+
{Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"},
204+
{Name: "ongoing_orders", DataType: "INT64"},
205+
{Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"},
206+
{Name: "ongoing_accepted_orders", DataType: "INT64"},
207+
},
208+
CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 4, 0, time.UTC)),
209+
UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 2, 0, time.UTC)),
210+
}),
211+
Lineage: &v1beta2.Lineage{
212+
Upstreams: []*v1beta2.Resource{
213+
{
214+
Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
215+
Service: "kafka",
216+
Type: "topic",
217+
},
218+
},
219+
},
220+
CreateTime: timestamppb.New(time.Date(2022, time.October, 19, 22, 42, 4, 0, time.UTC)),
221+
},
222+
},
223+
{
224+
name: "AssetWithTable",
225+
w: AssetWrapper{A: &v1beta2.Asset{
226+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
227+
}},
228+
input: map[string]interface{}{
229+
"data": map[string]interface{}{
230+
"@type": "type.googleapis.com/odpf.assets.v1beta2.Table",
231+
"columns": []interface{}{
232+
map[string]interface{}{"data_type": "int", "name": "applicantid"},
233+
map[string]interface{}{"data_type": "text", "name": "first_name"},
234+
map[string]interface{}{"data_type": "text", "name": "last_name"},
235+
},
236+
"attributes": map[string]interface{}{"id": "test-id", "name": "test-name"},
237+
},
238+
"name": "applicant",
239+
"service": "cassandra",
240+
"type": "table",
241+
"urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
242+
},
243+
expected: &v1beta2.Asset{
244+
Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
245+
Name: "applicant",
246+
Type: "table",
247+
Service: "cassandra",
248+
Data: testutils.BuildAny(t, &v1beta2.Table{
249+
Columns: []*v1beta2.Column{
250+
{Name: "applicantid", DataType: "int"},
251+
{Name: "first_name", DataType: "text"},
252+
{Name: "last_name", DataType: "text"},
253+
},
254+
Attributes: &structpb.Struct{
255+
Fields: map[string]*structpb.Value{
256+
"id": structpb.NewStringValue("test-id"),
257+
"name": structpb.NewStringValue("test-name"),
258+
},
259+
},
260+
}),
261+
},
262+
},
263+
{
264+
name: "WithoutData",
265+
w: AssetWrapper{A: &v1beta2.Asset{
266+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
267+
}},
268+
input: map[string]interface{}{
269+
"name": "applicant",
270+
"service": "cassandra",
271+
"type": "table",
272+
"urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
273+
},
274+
expected: &v1beta2.Asset{
275+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
276+
},
277+
expectedErr: true,
278+
},
279+
{
280+
name: "UnknownKeys",
281+
w: AssetWrapper{A: &v1beta2.Asset{
282+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
283+
}},
284+
input: map[string]interface{}{
285+
"does-not-exist": "value",
286+
"urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
287+
"type": "table",
288+
"data": map[string]interface{}{},
289+
},
290+
expected: &v1beta2.Asset{
291+
Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant",
292+
Type: "table",
293+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
294+
},
295+
expectedErr: true,
296+
},
297+
{
298+
name: "UnknownMessageName",
299+
w: AssetWrapper{A: &v1beta2.Asset{
300+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"},
301+
}},
302+
input: map[string]interface{}{
303+
"data": map[string]interface{}{},
304+
},
305+
expected: &v1beta2.Asset{
306+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"},
307+
},
308+
expectedErr: true,
309+
},
310+
}
311+
for _, tc := range cases {
312+
t.Run(tc.name, func(t *testing.T) {
313+
err := tc.w.OverwriteWith(tc.input)
314+
assert.Equalf(t, tc.expectedErr, (err != nil),
315+
"AssetWrapper.OverwriteWith() err = %v,\nexpectedErr %v", err, tc.expectedErr)
316+
testutils.AssertEqualProto(t, tc.expected, tc.w.A)
317+
})
318+
}
319+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package structmap
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"reflect"
7+
"time"
8+
9+
"github.com/mitchellh/mapstructure"
10+
"google.golang.org/protobuf/encoding/protojson"
11+
"google.golang.org/protobuf/proto"
12+
"google.golang.org/protobuf/types/known/structpb"
13+
"google.golang.org/protobuf/types/known/timestamppb"
14+
)
15+
16+
func AsMap(v interface{}) (interface{}, error) {
17+
// Cannot use mapstructure here because of
18+
// 1. https://github.com/mitchellh/mapstructure/issues/249
19+
// 2. Handling for fields with type *timestamp.Timestamp
20+
var (
21+
data []byte
22+
err error
23+
)
24+
if m, ok := v.(proto.Message); ok {
25+
data, err = protojson.MarshalOptions{UseProtoNames: true}.Marshal(m)
26+
} else {
27+
data, err = json.Marshal(v)
28+
}
29+
if err != nil {
30+
return nil, fmt.Errorf("structmap: %T as map: marshal: %w", v, err)
31+
}
32+
33+
var res interface{}
34+
if err := json.Unmarshal(data, &res); err != nil {
35+
return nil, fmt.Errorf("structmap: %T as map: unmarshal: %w", v, err)
36+
}
37+
38+
return res, nil
39+
}
40+
41+
func AsStruct(input, output interface{}) error {
42+
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
43+
DecodeHook: mapstructure.ComposeDecodeHookFunc(
44+
stringToTimestampHookFunc(time.RFC3339),
45+
timeToTimestampHookFunc(),
46+
mapstructure.StringToTimeHookFunc(time.RFC3339),
47+
mapToAttributesHookFunc(),
48+
),
49+
WeaklyTypedInput: true,
50+
ErrorUnused: true,
51+
ZeroFields: true,
52+
Result: output,
53+
TagName: "json",
54+
})
55+
if err != nil {
56+
return fmt.Errorf("structmap: decode into %T: create decoder: %w", output, err)
57+
}
58+
59+
if err := dec.Decode(input); err != nil {
60+
return fmt.Errorf("structmap: decode into %T: %w", output, err)
61+
}
62+
63+
return nil
64+
}
65+
66+
// stringToTimestampHookFunc returns a DecodeHookFunc that converts
67+
// strings to timestamppb.Timestamp.
68+
func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType {
69+
return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
70+
s, ok := data.(string)
71+
if !ok {
72+
return data, nil
73+
}
74+
if t != reflect.TypeOf(timestamppb.Timestamp{}) && t != reflect.TypeOf(&timestamppb.Timestamp{}) {
75+
return data, nil
76+
}
77+
78+
// Convert it by parsing
79+
ts, err := time.Parse(layout, s)
80+
if err != nil {
81+
return nil, fmt.Errorf("structmap: mapstructure string to timestamp hook: %w", err)
82+
}
83+
84+
return timestamppb.New(ts), nil
85+
}
86+
}
87+
88+
func timeToTimestampHookFunc() mapstructure.DecodeHookFuncType {
89+
return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
90+
ts, ok := data.(time.Time)
91+
if !ok {
92+
return data, nil
93+
}
94+
if t != reflect.TypeOf(timestamppb.Timestamp{}) && t != reflect.TypeOf(&timestamppb.Timestamp{}) {
95+
return data, nil
96+
}
97+
98+
return timestamppb.New(ts), nil
99+
}
100+
}
101+
102+
func mapToAttributesHookFunc() mapstructure.DecodeHookFuncType {
103+
return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
104+
m, ok := data.(map[string]interface{})
105+
if !ok {
106+
return data, nil
107+
}
108+
109+
if t != reflect.TypeOf(&structpb.Struct{}) && t != reflect.TypeOf(structpb.Struct{}) {
110+
return data, nil
111+
}
112+
113+
return structpb.NewStruct(m)
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
//go:build plugins
2+
// +build plugins
3+
4+
package structmap
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
11+
"github.com/odpf/meteor/utils"
12+
"github.com/stretchr/testify/assert"
13+
"google.golang.org/protobuf/types/known/structpb"
14+
"google.golang.org/protobuf/types/known/timestamppb"
15+
)
16+
17+
func TestAsMap(t *testing.T) {
18+
cases := []struct {
19+
name string
20+
input interface{}
21+
expected interface{}
22+
expectedErr bool
23+
}{
24+
{
25+
name: "MapStringToString",
26+
input: map[string]string{"key": "value"},
27+
expected: map[string]interface{}{"key": "value"},
28+
},
29+
{
30+
name: "MapIntToStringSlice",
31+
input: map[int][]string{1: {"s1", "s2"}},
32+
expected: map[string]interface{}{"1": []interface{}{"s1", "s2"}},
33+
},
34+
{
35+
name: "StringSlice",
36+
input: []string{"s1", "s2"},
37+
expected: []interface{}{"s1", "s2"},
38+
},
39+
{
40+
name: "WithProtoMessage",
41+
input: &v1beta2.Job{
42+
Attributes: utils.TryParseMapToProto(map[string]interface{}{
43+
"id": "test-id",
44+
"name": "test-name",
45+
}),
46+
CreateTime: timestamppb.New(time.Date(
47+
2022, time.September, 19, 22, 42, 0o4, 0, time.UTC,
48+
)),
49+
},
50+
expected: map[string]interface{}{
51+
"attributes": map[string]interface{}{
52+
"id": "test-id",
53+
"name": "test-name",
54+
},
55+
"create_time": "2022-09-19T22:42:04Z",
56+
},
57+
},
58+
{
59+
name: "MarshalFailure",
60+
input: make(chan int),
61+
expectedErr: true,
62+
},
63+
}
64+
for _, tc := range cases {
65+
t.Run(tc.name, func(t *testing.T) {
66+
res, err := AsMap(tc.input)
67+
assert.Equalf(t, tc.expectedErr, (err != nil),
68+
"AsMap() err = %v,\nexpectedErr %v", err, tc.expectedErr)
69+
assert.Equal(t, tc.expected, res)
70+
})
71+
}
72+
}
73+
74+
func TestAsStruct(t *testing.T) {
75+
cases := []struct {
76+
name string
77+
input interface{}
78+
output interface{}
79+
expected interface{}
80+
expectedErr bool
81+
}{
82+
{
83+
name: "MapStringToString",
84+
input: map[string]interface{}{"key": "value"},
85+
output: map[string]string{},
86+
expected: map[string]string{"key": "value"},
87+
},
88+
{
89+
name: "MapIntToStringSlice",
90+
input: map[string]interface{}{"1": []interface{}{"s1", "s2"}},
91+
output: map[int][]string{},
92+
expected: map[int][]string{1: {"s1", "s2"}},
93+
},
94+
{
95+
name: "StringSlice",
96+
input: []interface{}{"s1", "s2"},
97+
output: []string{},
98+
expected: []string{"s1", "s2"},
99+
},
100+
{
101+
name: "WithProtoMessage",
102+
input: map[string]interface{}{
103+
"attributes": map[string]interface{}{
104+
"id": "test-id",
105+
"name": "test-name",
106+
},
107+
"create_time": "2022-09-19T22:42:04Z",
108+
},
109+
output: &v1beta2.Job{},
110+
expected: &v1beta2.Job{
111+
Attributes: &structpb.Struct{
112+
Fields: map[string]*structpb.Value{
113+
"id": structpb.NewStringValue("test-id"),
114+
"name": structpb.NewStringValue("test-name"),
115+
},
116+
},
117+
CreateTime: timestamppb.New(time.Date(
118+
2022, time.September, 19, 22, 42, 0o4, 0, time.UTC,
119+
)),
120+
},
121+
},
122+
{
123+
name: "MismatchedType",
124+
input: []interface{}{"s1"},
125+
output: map[string]interface{}{},
126+
expected: map[string]interface{}{},
127+
expectedErr: true,
128+
},
129+
}
130+
for _, tc := range cases {
131+
t.Run(tc.name, func(t *testing.T) {
132+
err := AsStruct(tc.input, &tc.output)
133+
assert.Equalf(t, tc.expectedErr, (err != nil),
134+
"AsStruct() err = %v,\nexpectedErr %v", err, tc.expectedErr)
135+
assert.Equal(t, tc.expected, tc.output)
136+
})
137+
}
138+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
base := 5
2+
3+
export func(x) {
4+
return x + base
5+
}

‎plugins/processors/populate.go

+1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ package processors
33
import (
44
_ "github.com/odpf/meteor/plugins/processors/enrich"
55
_ "github.com/odpf/meteor/plugins/processors/labels"
6+
_ "github.com/odpf/meteor/plugins/processors/script"
67
)

‎plugins/processors/script/README.md

+252
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
# script
2+
3+
`script` processor will run the user specified script to transform each asset
4+
that is emitted by the extractor. Currently, [Tengo][tengo] is the only
5+
supported script engine.
6+
7+
Refer Tengo documentation for script language syntax and supported functionality
8+
\- https://github.com/d5/tengo/tree/v2.13.0#references
9+
. [Tengo standard library modules][tengo-stdlib] can also be imported and used
10+
if requried.
11+
12+
## Usage
13+
14+
```yaml
15+
processors:
16+
- name: script
17+
config:
18+
engine: tengo
19+
script: |
20+
asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })
21+
```
22+
23+
## Inputs
24+
25+
| Key | Value | Example | Description | Required? |
26+
|:---------|:---------|:---------------------------------------------------------------|:-----------------------------------------------------|:----------|
27+
| `engine` | `string` | `"tengo"` | Script engine. Only `"tengo"` is supported currently | ✅ |
28+
| `script` | `string` | `asset.labels = merge({script_engine: "tengo"}, asset.labels)` | [Tengo][tengo] script. | ✅ |
29+
30+
### Script Globals
31+
32+
#### `asset`
33+
34+
The asset record emitted by the extractor is made available in the script
35+
environment as `asset`. Any changes made to the asset will be reflected in the
36+
record that will be output from the script processor. The field names will be as
37+
per the [`Asset` proto definition][proton-asset]. Furthermore, the data
38+
structure for `asset.data` will be one of the following:
39+
40+
- [`Bucket`][proton-bucket]
41+
- [`Dashboard`][proton-dashboard]
42+
- [`Experiment`][proton-experiment]
43+
- [`FeatureTable`][proton-featuretable]
44+
- [`Group`][proton-group]
45+
- [`Job`][proton-job]
46+
- [`Metric`][proton-metric]
47+
- [`Model`][proton-model]
48+
- [`Service`][proton-service]
49+
- [`Table`][proton-table]
50+
- [`Topic`][proton-topic]
51+
- [`User`][proton-user]
52+
53+
The data type for `asset.data` depends on the specific type of extractor.
54+
55+
## Worked Example
56+
57+
Consider a [`FeatureTable`][proton-featuretable] asset with the following data:
58+
59+
```json
60+
{
61+
"urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
62+
"name": "avg_dispatch_arrival_time_10_mins",
63+
"service": "caramlstore",
64+
"type": "feature_table",
65+
"data": {
66+
"@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable",
67+
"namespace": "sauron",
68+
"entities": [
69+
{
70+
"name": "merchant_uuid",
71+
"labels": {"description": "merchant uuid", "value_type": "STRING"}
72+
}
73+
],
74+
"features": [
75+
{
76+
"name": "ongoing_placed_and_waiting_acceptance_orders",
77+
"data_type": "INT64"
78+
},
79+
{"name": "ongoing_orders", "data_type": "INT64"},
80+
{"name": "merchant_avg_dispatch_arrival_time_10m", "data_type": "FLOAT"},
81+
{"name": "ongoing_accepted_orders", "data_type": "INT64"}
82+
],
83+
"create_time": "2022-09-19T22:42:04Z",
84+
"update_time": "2022-09-21T13:23:02Z"
85+
},
86+
"lineage": {
87+
"upstreams": [
88+
{
89+
"urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
90+
"service": "kafka",
91+
"type": "topic"
92+
}
93+
]
94+
}
95+
}
96+
```
97+
98+
With the following contrived requirements to transform the asset:
99+
100+
- Add a label to the asset - `"script_engine": "tengo`.
101+
- Add a label to each entity. Ex: `"catch_phrase": "You talkin' to me?"`.
102+
- Set an EntityName for each feature based on the following mapping:
103+
- `ongoing_placed_and_waiting_acceptance_orders: customer_orders`
104+
- `ongoing_orders: customer_orders`
105+
- `merchant_avg_dispatch_arrival_time_10m: merchant_driver`
106+
- `ongoing_accepted_orders: merchant_orders`
107+
- Set the owner as `{Name: Big Mom, Email: big.mom@wholecakeisland.com}`.
108+
- For each lineage upstream, if the service is Kafka, apply a string replace op
109+
on the URN - `{.yonkou.io => }`.
110+
- Add 1 day to the `update_time` timestamp present under `asset.data`.
111+
112+
The script to apply the transformations above:
113+
114+
[//]: # (@formatter:off)
115+
116+
```go
117+
text := import("text")
118+
times := import("times")
119+
120+
merge := func(m1, m2) {
121+
for k, v in m2 {
122+
m1[k] = v
123+
}
124+
return m1
125+
}
126+
127+
asset.labels = merge({script_engine: "tengo"}, asset.labels)
128+
129+
for e in asset.data.entities {
130+
e.labels = merge({catch_phrase: "You talkin' to me?"}, e.labels)
131+
}
132+
133+
for f in asset.data.features {
134+
if f.name == "ongoing_placed_and_waiting_acceptance_orders" || f.name == "ongoing_orders" {
135+
f.entity_name = "customer_orders"
136+
} else if f.name == "merchant_avg_dispatch_arrival_time_10m" {
137+
f.entity_name = "merchant_driver"
138+
} else if f.name == "ongoing_accepted_orders" {
139+
f.entity_name = "merchant_orders"
140+
}
141+
}
142+
143+
asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })
144+
145+
for u in asset.lineage.upstreams {
146+
u.urn = u.service != "kafka" ? u.urn : text.replace(u.urn, ".yonkou.io", "", -1)
147+
}
148+
149+
update_time := times.parse("2006-01-02T15:04:05Z07:00", asset.data.update_time)
150+
asset.data.update_time = times.add_date(update_time, 0, 0, 1)
151+
```
152+
153+
[//]: # (@formatter:on)
154+
155+
With this script, the output from the processor would have the following asset:
156+
157+
```json
158+
{
159+
"urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
160+
"name": "avg_dispatch_arrival_time_10_mins",
161+
"service": "caramlstore",
162+
"type": "feature_table",
163+
"data": {
164+
"@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable",
165+
"namespace": "sauron",
166+
"entities": [
167+
{
168+
"name": "merchant_uuid",
169+
"labels": {
170+
"catch_phrase": "You talkin' to me?",
171+
"description": "merchant uuid",
172+
"value_type": "STRING"
173+
}
174+
}
175+
],
176+
"features": [
177+
{
178+
"name": "ongoing_placed_and_waiting_acceptance_orders",
179+
"data_type": "INT64",
180+
"entity_name": "customer_orders"
181+
},
182+
{
183+
"name": "ongoing_orders",
184+
"data_type": "INT64",
185+
"entity_name": "customer_orders"
186+
},
187+
{
188+
"name": "merchant_avg_dispatch_arrival_time_10m",
189+
"data_type": "FLOAT",
190+
"entity_name": "merchant_driver"
191+
},
192+
{
193+
"name": "ongoing_accepted_orders",
194+
"data_type": "INT64",
195+
"entity_name": "merchant_orders"
196+
}
197+
],
198+
"create_time": "2022-09-19T22:42:04Z",
199+
"update_time": "2022-09-22T13:23:02Z"
200+
},
201+
"owners": [
202+
{"name": "Big Mom", "email": "big.mom@wholecakeisland.com"}
203+
],
204+
"lineage": {
205+
"upstreams": [
206+
{
207+
"urn": "urn:kafka:int-dagstream-kafka:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
208+
"service": "kafka",
209+
"type": "topic"
210+
}
211+
]
212+
},
213+
"labels": {"script_engine": "tengo"}
214+
}
215+
```
216+
217+
## Contributing
218+
219+
Refer to
220+
the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-processor)
221+
for information on contributing to this module.
222+
223+
[tengo]: https://github.com/d5/tengo
224+
225+
[tengo-stdlib]: https://github.com/d5/tengo/blob/v2.13.0/docs/stdlib.md
226+
227+
[proton-asset]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/asset.proto#L14
228+
229+
[proton-bucket]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/bucket.proto#L13
230+
231+
[proton-dashboard]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/dashboard.proto#L14
232+
233+
[proton-experiment]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/experiment.proto#L15
234+
235+
[proton-featuretable]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/feature_table.proto#L32
236+
237+
[proton-group]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/group.proto#L12
238+
239+
[proton-job]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/job.proto#L13
240+
241+
[proton-metric]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/metric.proto#L13
242+
243+
[proton-model]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/model.proto#L17
244+
245+
[proton-service]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/service.proto#L11
246+
247+
[proton-table]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/table.proto#L14
248+
249+
[proton-topic]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/topic.proto#L14
250+
251+
[proton-user]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/user.proto#L15
252+
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package script
2+
3+
import (
4+
"context"
5+
_ "embed" // used to print the embedded assets
6+
"fmt"
7+
8+
"github.com/MakeNowJust/heredoc"
9+
"github.com/d5/tengo/v2"
10+
"github.com/odpf/meteor/models"
11+
"github.com/odpf/meteor/plugins"
12+
"github.com/odpf/meteor/plugins/internal/tengoutil"
13+
"github.com/odpf/meteor/plugins/internal/tengoutil/structmap"
14+
"github.com/odpf/meteor/registry"
15+
"github.com/odpf/salt/log"
16+
)
17+
18+
func init() {
19+
if err := registry.Processors.Register("script", func() plugins.Processor {
20+
return New(plugins.GetLog())
21+
}); err != nil {
22+
return
23+
}
24+
}
25+
26+
//go:embed README.md
27+
var summary string
28+
29+
type Config struct {
30+
Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
31+
Script string `mapstructure:"script" validate:"required"`
32+
}
33+
34+
// Processor executes the configured Tengo script to transform the given asset
35+
// record.
36+
type Processor struct {
37+
plugins.BasePlugin
38+
config Config
39+
logger log.Logger
40+
41+
compiled *tengo.Compiled
42+
}
43+
44+
var sampleConfig = heredoc.Doc(`
45+
engine: tengo
46+
script: |
47+
asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })
48+
`)
49+
50+
var info = plugins.Info{
51+
Description: "Transform the extracted asset with the configured Tengo script",
52+
SampleConfig: sampleConfig,
53+
Summary: summary,
54+
Tags: []string{"processor", "transform", "script"},
55+
}
56+
57+
// New create a new processor
58+
func New(logger log.Logger) *Processor {
59+
p := &Processor{
60+
logger: logger,
61+
}
62+
p.BasePlugin = plugins.NewBasePlugin(info, &p.config)
63+
64+
return p
65+
}
66+
67+
func (p *Processor) Init(ctx context.Context, config plugins.Config) error {
68+
if err := p.BasePlugin.Init(ctx, config); err != nil {
69+
return fmt.Errorf("script processor init: %w", err)
70+
}
71+
72+
s := tengoutil.NewSecureScript(([]byte)(p.config.Script))
73+
if err := p.declareGlobals(s); err != nil {
74+
return fmt.Errorf("script processor init: %w", err)
75+
}
76+
77+
compiled, err := s.Compile()
78+
if err != nil {
79+
return fmt.Errorf("script processor init: compile script: %w", err)
80+
}
81+
82+
p.compiled = compiled
83+
84+
return nil
85+
}
86+
87+
// Process processes the data
88+
func (p *Processor) Process(ctx context.Context, src models.Record) (models.Record, error) {
89+
astWrapper := structmap.AssetWrapper{A: src.Data()}
90+
m, err := astWrapper.AsMap()
91+
if err != nil {
92+
return models.Record{}, fmt.Errorf("script processor: %w", err)
93+
}
94+
95+
c := p.compiled.Clone()
96+
if err := c.Set("asset", m); err != nil {
97+
return models.Record{}, fmt.Errorf("script processor: set asset into vm: %w", err)
98+
}
99+
100+
if err := c.RunContext(ctx); err != nil {
101+
return models.Record{}, fmt.Errorf("script processor: run script: %w", err)
102+
}
103+
104+
if err := astWrapper.OverwriteWith(c.Get("asset").Map()); err != nil {
105+
return models.Record{}, fmt.Errorf("script processor: overwrite asset: %w", err)
106+
}
107+
108+
return models.NewRecord(astWrapper.A), nil
109+
}
110+
111+
func (p *Processor) declareGlobals(s *tengo.Script) error {
112+
for name, v := range map[string]interface{}{
113+
"asset": map[string]interface{}{},
114+
} {
115+
if err := s.Add(name, v); err != nil {
116+
return fmt.Errorf("declare script globals: %w", err)
117+
}
118+
}
119+
return nil
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
//go:build plugins
2+
// +build plugins
3+
4+
package script
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/MakeNowJust/heredoc"
12+
"github.com/odpf/meteor/models"
13+
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
14+
"github.com/odpf/meteor/plugins"
15+
testutils "github.com/odpf/meteor/test/utils"
16+
"github.com/stretchr/testify/assert"
17+
"google.golang.org/protobuf/types/known/anypb"
18+
"google.golang.org/protobuf/types/known/timestamppb"
19+
)
20+
21+
var (
22+
ctx = context.Background()
23+
script = `asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })`
24+
)
25+
26+
func TestInit(t *testing.T) {
27+
t.Run("InvalidConfig", func(t *testing.T) {
28+
cases := []struct {
29+
name string
30+
cfg plugins.Config
31+
}{
32+
{
33+
name: "WithoutScript",
34+
cfg: plugins.Config{
35+
RawConfig: map[string]interface{}{"engine": "tengo"},
36+
},
37+
},
38+
{
39+
name: "WithoutEngine",
40+
cfg: plugins.Config{
41+
RawConfig: map[string]interface{}{"script": script},
42+
},
43+
},
44+
{
45+
name: "WithUnsupportedEngine",
46+
cfg: plugins.Config{
47+
RawConfig: map[string]interface{}{"script": script, "engine": "goja"},
48+
},
49+
},
50+
}
51+
for _, tc := range cases {
52+
t.Run(tc.name, func(t *testing.T) {
53+
err := New(testutils.Logger).Init(ctx, tc.cfg)
54+
assert.ErrorAs(t, err, &plugins.InvalidConfigError{})
55+
})
56+
}
57+
})
58+
59+
t.Run("InvalidScript", func(t *testing.T) {
60+
err := New(testutils.Logger).Init(ctx, plugins.Config{
61+
RawConfig: map[string]interface{}{
62+
"script": `ast.owners = []`,
63+
"engine": "tengo",
64+
},
65+
})
66+
assert.ErrorContains(t, err, "script processor init: compile script: Compile Error: unresolved reference 'ast'")
67+
})
68+
}
69+
70+
func TestProcess(t *testing.T) {
71+
cases := []struct {
72+
name string
73+
script string
74+
input *v1beta2.Asset
75+
expected *v1beta2.Asset
76+
errStr string
77+
}{
78+
{
79+
name: "FeatureTableAsset",
80+
script: heredoc.Doc(`
81+
text := import("text")
82+
times := import("times")
83+
84+
merge := func(m1, m2) {
85+
for k, v in m2 {
86+
m1[k] = v
87+
}
88+
return m1
89+
}
90+
91+
asset.labels = merge({script_engine: "tengo"}, asset.labels)
92+
93+
for e in asset.data.entities {
94+
e.labels = merge({catch_phrase: "You talkin' to me?"}, e.labels)
95+
}
96+
97+
for f in asset.data.features {
98+
if f.name == "ongoing_placed_and_waiting_acceptance_orders" || f.name == "ongoing_orders" {
99+
f.entity_name = "customer_orders"
100+
} else if f.name == "merchant_avg_dispatch_arrival_time_10m" {
101+
f.entity_name = "merchant_driver"
102+
} else if f.name == "ongoing_accepted_orders" {
103+
f.entity_name = "merchant_orders"
104+
}
105+
}
106+
107+
asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })
108+
109+
for u in asset.lineage.upstreams {
110+
u.urn = u.service != "kafka" ? u.urn : text.replace(u.urn, ".yonkou.io", "", -1)
111+
}
112+
113+
update_time := times.parse("2006-01-02T15:04:05Z07:00", asset.data.update_time)
114+
asset.data.update_time = times.add_date(update_time, 0, 0, 1)
115+
`),
116+
input: &v1beta2.Asset{
117+
Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
118+
Name: "avg_dispatch_arrival_time_10_mins",
119+
Service: "caramlstore",
120+
Type: "feature_table",
121+
Data: testutils.BuildAny(t, &v1beta2.FeatureTable{
122+
Namespace: "sauron",
123+
Entities: []*v1beta2.FeatureTable_Entity{
124+
{Name: "merchant_uuid", Labels: map[string]string{
125+
"description": "merchant uuid",
126+
"value_type": "STRING",
127+
}},
128+
},
129+
Features: []*v1beta2.Feature{
130+
{Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"},
131+
{Name: "ongoing_orders", DataType: "INT64"},
132+
{Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"},
133+
{Name: "ongoing_accepted_orders", DataType: "INT64"},
134+
},
135+
CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)),
136+
UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)),
137+
}),
138+
Lineage: &v1beta2.Lineage{
139+
Upstreams: []*v1beta2.Resource{
140+
{
141+
Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
142+
Service: "kafka",
143+
Type: "topic",
144+
},
145+
},
146+
},
147+
},
148+
expected: &v1beta2.Asset{
149+
Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins",
150+
Name: "avg_dispatch_arrival_time_10_mins",
151+
Service: "caramlstore",
152+
Type: "feature_table",
153+
Data: testutils.BuildAny(t, &v1beta2.FeatureTable{
154+
Namespace: "sauron",
155+
Entities: []*v1beta2.FeatureTable_Entity{
156+
{Name: "merchant_uuid", Labels: map[string]string{
157+
"catch_phrase": "You talkin' to me?",
158+
"description": "merchant uuid",
159+
"value_type": "STRING",
160+
}},
161+
},
162+
Features: []*v1beta2.Feature{
163+
{Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64", EntityName: "customer_orders"},
164+
{Name: "ongoing_orders", DataType: "INT64", EntityName: "customer_orders"},
165+
{Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT", EntityName: "merchant_driver"},
166+
{Name: "ongoing_accepted_orders", DataType: "INT64", EntityName: "merchant_orders"},
167+
},
168+
CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)),
169+
UpdateTime: timestamppb.New(time.Date(2022, time.September, 22, 13, 23, 0o2, 0, time.UTC)),
170+
}),
171+
Lineage: &v1beta2.Lineage{
172+
Upstreams: []*v1beta2.Resource{
173+
{
174+
Urn: "urn:kafka:int-dagstream-kafka:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log",
175+
Service: "kafka",
176+
Type: "topic",
177+
},
178+
},
179+
},
180+
Labels: map[string]string{"script_engine": "tengo"},
181+
Owners: []*v1beta2.Owner{{Name: "Big Mom", Email: "big.mom@wholecakeisland.com"}},
182+
},
183+
},
184+
{
185+
name: "UnknownFields",
186+
script: heredoc.Doc(`
187+
asset.does_not_exist = "value"
188+
`),
189+
input: &v1beta2.Asset{
190+
Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"},
191+
},
192+
expected: nil,
193+
errStr: "invalid keys: does_not_exist",
194+
},
195+
}
196+
for _, tc := range cases {
197+
t.Run(tc.name, func(t *testing.T) {
198+
p := New(testutils.Logger)
199+
err := p.Init(ctx, plugins.Config{
200+
RawConfig: map[string]interface{}{
201+
"script": tc.script,
202+
"engine": "tengo",
203+
},
204+
})
205+
if !assert.NoError(t, err) {
206+
return
207+
}
208+
209+
res, err := p.Process(ctx, models.NewRecord(tc.input))
210+
if tc.errStr == "" {
211+
assert.NoError(t, err)
212+
} else {
213+
assert.ErrorContains(t, err, tc.errStr)
214+
}
215+
216+
testutils.AssertEqualProto(t, tc.expected, res.Data())
217+
})
218+
}
219+
}

‎test/utils/any.go

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
)
1010

1111
func BuildAny(t *testing.T, protoMessage protoreflect.ProtoMessage) *anypb.Any {
12+
t.Helper()
13+
1214
res, err := anypb.New(protoMessage)
1315
require.NoError(t, err)
1416

‎test/utils/assert.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
11
package utils
22

33
import (
4+
"fmt"
45
"os"
56
"testing"
67

7-
"google.golang.org/protobuf/encoding/protojson"
8-
8+
"github.com/google/go-cmp/cmp"
99
"github.com/nsf/jsondiff"
1010
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
11+
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
13+
"google.golang.org/protobuf/encoding/protojson"
14+
"google.golang.org/protobuf/proto"
15+
"google.golang.org/protobuf/testing/protocmp"
1216
)
1317

18+
func AssertEqualProto(t *testing.T, expected, actual proto.Message) {
19+
t.Helper()
20+
21+
if diff := cmp.Diff(actual, expected, protocmp.Transform()); diff != "" {
22+
msg := fmt.Sprintf(
23+
"Not equal:\n"+
24+
"expected:\n\t'%s'\n"+
25+
"actual:\n\t'%s'\n"+
26+
"diff (-expected +actual):\n%s",
27+
expected, actual, diff,
28+
)
29+
assert.Fail(t, msg)
30+
}
31+
}
32+
1433
func AssertAssetsWithJSON(t *testing.T, expected, actuals []*v1beta2.Asset) {
1534
t.Helper()
1635

0 commit comments

Comments
 (0)
Please sign in to comment.