Skip to content

Commit

Permalink
feat: and 1st-class support for expand and flatten
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 17, 2021
1 parent 9e6ab3d commit 940632a
Show file tree
Hide file tree
Showing 19 changed files with 779 additions and 190 deletions.
18 changes: 18 additions & 0 deletions api/v1alpha1/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
)

type Expand struct{}

func (m *Expand) getContainer(req getContainerReq) corev1.Container {
return corev1.Container{
Name: CtrMain,
Image: req.runnerImage,
ImagePullPolicy: req.imagePullPolicy,
Args: []string{"expand"},
VolumeMounts: []corev1.VolumeMount{req.volumeMount},
Resources: SmallResourceRequirements,
}
}
18 changes: 18 additions & 0 deletions api/v1alpha1/flatten.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
)

type Flatten struct{}

func (m *Flatten) getContainer(req getContainerReq) corev1.Container {
return corev1.Container{
Name: CtrMain,
Image: req.runnerImage,
ImagePullPolicy: req.imagePullPolicy,
Args: []string{"flatten"},
VolumeMounts: []corev1.VolumeMount{req.volumeMount},
Resources: SmallResourceRequirements,
}
}
734 changes: 544 additions & 190 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/v1alpha1/step_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type StepSpec struct {
Filter Filter `json:"filter,omitempty" protobuf:"bytes,8,opt,name=filter,casttype=Filter"`
Map Map `json:"map,omitempty" protobuf:"bytes,9,opt,name=map,casttype=Map"`
Group *Group `json:"group,omitempty" protobuf:"bytes,11,opt,name=group"`
Flatten *Flatten `json:"flatten,omitempty" protobuf:"bytes,25,opt,name=flatten"`
Expand *Expand `json:"expand,omitempty" protobuf:"bytes,26,opt,name=expand"`

Replicas *uint32 `json:"replicas,omitempty" protobuf:"varint,23,opt,name=replicas"`
Scale *Scale `json:"scale,omitempty" protobuf:"bytes,24,opt,name=scale"`
Expand Down Expand Up @@ -130,8 +132,12 @@ func (in *StepSpec) getType() containerSupplier {
return x
} else if x := in.Container; x != nil {
return x
} else if x := in.Expand; x != nil {
return x
} else if x := in.Filter; x != "" {
return x
} else if x := in.Flatten; x != nil {
return x
} else if x := in.Git; x != nil {
return x
} else if x := in.Group; x != nil {
Expand Down
40 changes: 40 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down Expand Up @@ -2329,8 +2333,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
8 changes: 8 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down Expand Up @@ -2329,8 +2333,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
8 changes: 8 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down Expand Up @@ -2329,8 +2333,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
8 changes: 8 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down Expand Up @@ -2329,8 +2333,12 @@ spec:
required:
- image
type: object
expand:
type: object
filter:
type: string
flatten:
type: object
git:
properties:
branch:
Expand Down
9 changes: 9 additions & 0 deletions docs/EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ They have a single variable, `msg`, which is a byte array.
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-filter-pipeline.yaml
```

### [Flatten and expand](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-flatten-expand-pipeline.yaml)

This is an example of built-in flattening and expanding.


```
kubectl apply -f https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-flatten-expand-pipeline.yaml
```

### [Map messages](https://raw.githubusercontent.com/argoproj-labs/argo-dataflow/main/examples/102-map-pipeline.yaml)

This is an example of built-in mapping.
Expand Down
36 changes: 36 additions & 0 deletions examples/102-flatten-expand-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: dataflow.argoproj.io/v1alpha1
kind: Pipeline
metadata:
annotations:
dataflow.argoproj.io/description: |
This is an example of built-in flattening and expanding.
dataflow.argoproj.io/name: Flatten and expand
creationTimestamp: null
name: flatten-expand
spec:
steps:
- map: |
bytes('{"foo": {"bar": "' + string(msg) + '"}}')
name: generate
sinks:
- stan:
subject: data
sources:
- cron:
layout: "15:04:05"
schedule: '*/3 * * * * *'
- flatten: {}
name: flatten
sinks:
- stan:
subject: flattened
sources:
- stan:
subject: data
- expand: {}
name: expand
sinks:
- log: {}
sources:
- stan:
subject: flattened
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Masterminds/sprig v2.22.0+incompatible
github.com/Shopify/sarama v1.28.0
github.com/antonmedv/expr v1.8.9
github.com/doublerebel/bellows v0.0.0-20160303004610-f177d92a03d3
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/go-git/go-git/v5 v5.3.0
github.com/go-logr/logr v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/doublerebel/bellows v0.0.0-20160303004610-f177d92a03d3 h1:7nllYTGLnq4CqBL27lV6oNfXzM2tJ2mrKF8E+aBXOV0=
github.com/doublerebel/bellows v0.0.0-20160303004610-f177d92a03d3/go.mod h1:v/MTKot4he5oRHGirOYGN4/hEOONNnWtDBLAzllSGMw=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
Expand Down
24 changes: 24 additions & 0 deletions runner/expand/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package expand

import (
"context"
"encoding/json"

"github.com/doublerebel/bellows"

"github.com/argoproj-labs/argo-dataflow/runner/util"
)

func Exec(ctx context.Context) error {
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
v := make(map[string]interface{})
if err := json.Unmarshal(msg, &v); err != nil {
return nil, err
}
if data, err := json.Marshal(bellows.Expand(v)); err != nil {
return nil, err
} else {
return [][]byte{data}, nil
}
})
}
24 changes: 24 additions & 0 deletions runner/flatten/flatten.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package flatten

import (
"context"
"encoding/json"

"github.com/doublerebel/bellows"

"github.com/argoproj-labs/argo-dataflow/runner/util"
)

func Exec(ctx context.Context) error {
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
v := make(map[string]interface{})
if err := json.Unmarshal(msg, &v); err != nil {
return nil, err
}
if data, err := json.Marshal(bellows.Flatten(v)); err != nil {
return nil, err
} else {
return [][]byte{data}, nil
}
})
}
Loading

0 comments on commit 940632a

Please sign in to comment.