Skip to content

Commit

Permalink
fix: sensible linger Kafka defaults (#432)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 7, 2021
1 parent 11251f5 commit 6423c9e
Show file tree
Hide file tree
Showing 12 changed files with 605 additions and 85 deletions.
644 changes: 577 additions & 67 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions api/v1alpha1/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ type KafkaSink struct {
Async bool `json:"async,omitempty" protobuf:"varint,2,opt,name=async"`
// +kubebuilder:default="100Ki"
BatchSize *resource.Quantity `json:"batchSize,omitempty" protobuf:"bytes,3,opt,name=batchSize"`
// +kubebuilder:default="0ms"
Linger *metav1.Duration `json:"linger,omitempty" protobuf:"bytes,4,opt,name=linger"`
Linger *metav1.Duration `json:"linger,omitempty" protobuf:"bytes,4,opt,name=linger"`
// +kubebuilder:default="lz4"
CompressionType string `json:"compressionType,omitempty" protobuf:"bytes,5,opt,name=compressionType"`
// +kubebuilder:default="all"
Expand All @@ -24,6 +23,13 @@ func (m *KafkaSink) GetBatchSize() int {
}

func (m *KafkaSink) GetLingerMs() int {
if m.Linger == nil {
if m.Async {
return 5
} else {
return 0
}
}
return int(m.Linger.Milliseconds())
}

Expand Down
14 changes: 12 additions & 2 deletions api/v1alpha1/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ func TestKafkaSink_GetBatchSize(t *testing.T) {
}

func TestKafkaSink_GetLingerMs(t *testing.T) {
s := KafkaSink{Linger: &metav1.Duration{Duration: time.Second}}
assert.Equal(t, 1000, s.GetLingerMs())
t.Run("Sync", func(t *testing.T) {
s := KafkaSink{}
assert.Equal(t, 0, s.GetLingerMs())
})
t.Run("ASync", func(t *testing.T) {
s := KafkaSink{Async: true}
assert.Equal(t, 5, s.GetLingerMs())
})
t.Run("Specified", func(t *testing.T) {
s := KafkaSink{Linger: &metav1.Duration{Duration: time.Second}}
assert.Equal(t, 1000, s.GetLingerMs())
})
}

func TestKafkaSink_GetAcks(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down Expand Up @@ -8809,7 +8808,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
1 change: 0 additions & 1 deletion config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
1 change: 0 additions & 1 deletion config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
2 changes: 0 additions & 2 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down Expand Up @@ -8809,7 +8808,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
2 changes: 0 additions & 2 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down Expand Up @@ -8809,7 +8808,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
2 changes: 0 additions & 2 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down Expand Up @@ -8809,7 +8808,6 @@ spec:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
Expand Down
5 changes: 5 additions & 0 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def dump(self):
x['stan'] = {'subject': self._topic}
return x


class JetStreamSink(Sink):
def __init__(self, subject, name=None):
super().__init__(name=name)
Expand All @@ -205,6 +206,7 @@ def dump(self):
x['jetstream'] = {'subject': self._subject}
return x


class Step:
def __init__(self, name, sources=None, sinks=None, volumes=None, terminator=False, sidecarResource=None):
self._name = name or 'main'
Expand Down Expand Up @@ -626,6 +628,7 @@ def dump(self):
x['stan'] = y
return x


class JetStreamSource(Source):
def __init__(self, subject, name=None, retry=None):
super().__init__(name=name, retry=retry)
Expand All @@ -638,6 +641,7 @@ def dump(self):
x['jetstream'] = y
return x


def cron(schedule=None, layout=None, name=None, retry=None):
return CronSource(schedule, layout=layout, name=name, retry=retry)

Expand All @@ -653,5 +657,6 @@ def kafka(topic=None, name=None, retry=None, startOffset=None, fetchMin=None, fe
def stan(subject=None, name=None, retry=None):
return STANSource(subject, name=name, retry=retry)


def jetstream(subject=None, name=None, retry=None):
return JetStreamSource(subject, name, retry=retry)
6 changes: 3 additions & 3 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"TestKafkaAsyncSinkStress/.tps": 2300,
"TestKafkaSinkStress/.tps": 550,
"TestKafkaAsyncSinkStress/.tps": 1300,
"TestKafkaSinkStress/.tps": 400,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/N=50000.tps": 750,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 2300,
"TestKafkaSourceStress/.tps": 1550,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/N=50000.tps": 3150,
Expand Down

0 comments on commit 6423c9e

Please sign in to comment.