Skip to content

Commit

Permalink
feat: Limit Kafka sink async message in-flight (#513)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Dec 6, 2021
1 parent de6a76e commit 2179640
Show file tree
Hide file tree
Showing 14 changed files with 1,053 additions and 404 deletions.
1,340 changes: 937 additions & 403 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions api/v1alpha1/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type KafkaSink struct {
EnableIdempotence bool `json:"enableIdempotence,omitempty" protobuf:"varint,7,opt,name=enableIdempotence"`
// +kubebuilder:default="30s"
MessageTimeout *metav1.Duration `json:"messageTimeout,omitempty" protobuf:"bytes,8,opt,name=messageTimeout"`
// The maximum number of messages to be in-flight when async.
// +kubebuilder:default=20
MaxInflight uint32 `json:"maxInflight,omitempty" protobuf:"varint,9,opt,name=maxInflight"`
}

func (m *KafkaSink) GetBatchSize() int {
Expand Down Expand Up @@ -51,3 +54,10 @@ func (m *KafkaSink) GetAcks() interface{} {
func (m *KafkaSink) GetMessageMaxBytes() int {
return m.Kafka.GetMessageMaxBytes()
}

func (m *KafkaSink) GetMessageInflight() int {
if m.MaxInflight < 1 {
return CommitN
}
return int(m.MaxInflight)
}
12 changes: 12 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
12 changes: 12 additions & 0 deletions config/cluster-quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
12 changes: 12 additions & 0 deletions config/default-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
12 changes: 12 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
12 changes: 12 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
12 changes: 12 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be
in-flight when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8841,6 +8847,12 @@ spec:
type: boolean
linger:
type: string
maxInflight:
default: 20
description: The maximum number of messages to be in-flight
when async.
format: int32
type: integer
maxMessageBytes:
format: int32
type: integer
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/weaveworks/promrus v1.2.0
golang.org/x/crypto v0.0.0-20210915214749-c084706c2272
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
k8s.io/api v0.20.4
k8s.io/apimachinery v0.20.4
k8s.io/client-go v0.20.4
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
17 changes: 16 additions & 1 deletion runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
kafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
Expand All @@ -24,6 +25,7 @@ type kafkaSink struct {
producer *kafka.Producer
topic string
async bool
inflight *semaphore.Weighted
}

func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.KafkaSink, errorsCounter prometheus.Counter) (sink.Interface, error) {
Expand Down Expand Up @@ -61,11 +63,14 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
}
}, 3*time.Second, 1.2, true)

inflight := semaphore.NewWeighted(int64(x.GetMessageInflight()))

go wait.JitterUntilWithContext(ctx, func(context.Context) {
logger.Info("starting producer event consuming loop")
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
inflight.Release(1)
if err := ev.TopicPartition.Error; err != nil {
logger.Error(err, "Async to Kafka failed", "topic", x.Topic)
errorsCounter.Inc()
Expand All @@ -74,7 +79,13 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
}
}, time.Second, 1.2, true)

return &kafkaSink{sinkName, producer, x.Topic, x.Async}, nil
return &kafkaSink{
sinkName,
producer,
x.Topic,
x.Async,
inflight,
}, nil
}

func (h *kafkaSink) Sink(ctx context.Context, msg []byte) error {
Expand All @@ -89,6 +100,9 @@ func (h *kafkaSink) Sink(ctx context.Context, msg []byte) error {
deliveryChan = make(chan kafka.Event)
defer close(deliveryChan)
}
if err := h.inflight.Acquire(ctx, 1); err != nil {
return err
}
if err := h.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &h.topic, Partition: kafka.PartitionAny},
Headers: []kafka.Header{
Expand All @@ -100,6 +114,7 @@ func (h *kafkaSink) Sink(ctx context.Context, msg []byte) error {
return err
}
if deliveryChan != nil {
defer h.inflight.Release(1)
select {
case <-ctx.Done():
return fmt.Errorf("failed to get delivery: %w", ctx.Err())
Expand Down

0 comments on commit 2179640

Please sign in to comment.