Skip to content

Commit

Permalink
feat: Allow users to specify image for code processor. Fixes #482 (#491)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Nov 1, 2021
1 parent 096e906 commit 0c5a5e8
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 354 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Intermediate:
* [Events interop](docs/EVENTS_INTEROP.md)
* [Workflow interop](docs/WORKFLOW_INTEROP.md)
* [Meta-data](docs/META.md)
* [Idempotence](docs/IDEMPOTENCE.md)

Advanced

Expand Down
12 changes: 9 additions & 3 deletions api/v1alpha1/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import (
)

type Code struct {
Runtime Runtime `json:"runtime" protobuf:"bytes,4,opt,name=runtime,casttype=Runtime"`
Source string `json:"source" protobuf:"bytes,3,opt,name=source"`
Runtime Runtime `json:"runtime,omitempty" protobuf:"bytes,4,opt,name=runtime,casttype=Runtime"`
// Image is used in preference to Runtime.
Image string `json:"image,omitempty" protobuf:"bytes,5,opt,name=image"`
Source string `json:"source" protobuf:"bytes,3,opt,name=source"`
}

func (in Code) getContainer(req getContainerReq) corev1.Container {
image := in.Image
if image == "" {
image = fmt.Sprintf(req.imageFormat, "dataflow-"+in.Runtime)
}
return containerBuilder{}.
init(req).
image(fmt.Sprintf(req.imageFormat, "dataflow-"+in.Runtime)).
image(image).
build()
}
15 changes: 10 additions & 5 deletions api/v1alpha1/code_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
)

func TestCode_getContainer(t *testing.T) {
x := Code{
Runtime: "my-runtime",
}
c := x.getContainer(getContainerReq{imageFormat: "fmt-%s"})
assert.Equal(t, "fmt-dataflow-my-runtime", c.Image)
t.Run("Runtime", func(t *testing.T) {
x := Code{Runtime: "my-runtime"}
c := x.getContainer(getContainerReq{imageFormat: "fmt-%s"})
assert.Equal(t, "fmt-dataflow-my-runtime", c.Image)
})
t.Run("Runtime", func(t *testing.T) {
x := Code{Image: "my-image"}
c := x.getContainer(getContainerReq{})
assert.Equal(t, "my-image", c.Image)
})
}
703 changes: 372 additions & 331 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -731,7 +734,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down Expand Up @@ -7907,6 +7909,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -7918,7 +7923,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -726,7 +729,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -691,7 +694,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
8 changes: 6 additions & 2 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -731,7 +734,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down Expand Up @@ -7907,6 +7909,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -7918,7 +7923,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
8 changes: 6 additions & 2 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -731,7 +734,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down Expand Up @@ -7907,6 +7909,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -7918,7 +7923,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
8 changes: 6 additions & 2 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -731,7 +734,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down Expand Up @@ -7907,6 +7909,9 @@ spec:
type: object
code:
properties:
image:
description: Image is used in preference to Runtime.
type: string
runtime:
enum:
- golang1-16
Expand All @@ -7918,7 +7923,6 @@ spec:
source:
type: string
required:
- runtime
- source
type: object
container:
Expand Down
19 changes: 19 additions & 0 deletions docs/IDEMPOTENCE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Idempotence

Because network connections can always fail to either deliver a message, or receive an acknowledgement, processors and
sinks may receive the same message twice.

It maybe that you don't mind the occasional duplicate messages. But maybe that is not the case. If so, processors and
external systems, need to be written to cater for this, to be idempotent.

The safest way to do that is for each message to contain a unique identifier that is intrinsic to the message. For
example:

* If each message is the result of a horse race, the the location and time would be enough to de-duplicate messages.
* If each message was the final year grade of a student, then the year, class, and the student's IDs would be suitable.

When there is no intrinsic identifier, then you can use the identifiers that are added as [meta-data](META.md) to each
message. You should add an identifier to you messages as soon as possible.

Some sinks have inherent idempotence, e.g. when sinking to a volume, if duplicate processing results in a file being
created with the same name, the the old file will be overwritten.
15 changes: 10 additions & 5 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def dump(self):


class CodeStep(Step):
def __init__(self, name=None, source=None, code=None, runtime=None, sources=None, sinks=None, terminator=False):
def __init__(self, name=None, source=None, code=None, runtime=None, image=None, sources=None, sinks=None, terminator=False):
super().__init__(name, sources=sources, sinks=sinks, terminator=terminator)
if source:
self._source = inspect.getsource(source).replace('def ' + source.__name__ + str(inspect.signature(source)),
Expand All @@ -457,13 +457,18 @@ def __init__(self, name=None, source=None, code=None, runtime=None, sources=None
self._runtime = runtime
else:
self._runtime = DEFAULT_RUNTIME
self._image = image

def dump(self):
x = super().dump()
x['code'] = {
'runtime': self._runtime,
y = {
'source': self._source,
}
if self._runtime:
y['runtime'] = self._runtime
if self._image:
y['image'] = self._image
x['code'] = y
return x


Expand Down Expand Up @@ -521,8 +526,8 @@ def group(self, name=None, key=None, format=None, endOfGroup=None, storage=None)
def flatten(self, name=None):
return FlattenStep(name, sources=[self])

def code(self, name=None, source=None, code=None, runtime=None):
return CodeStep(name, source=source, code=code, runtime=runtime, sources=[self])
def code(self, name=None, source=None, code=None, runtime=None, image=None):
return CodeStep(name, source=source, code=code, runtime=runtime, image=image, sources=[self])

def map(self, name=None, expression=None):
return MapStep(name, expression, sources=[self])
Expand Down

0 comments on commit 0c5a5e8

Please sign in to comment.