Skip to content

Commit

Permalink
feat: Expand Kafka sink auto-commit config (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Sep 2, 2021
1 parent 6b406c2 commit 45f7847
Show file tree
Hide file tree
Showing 15 changed files with 1,005 additions and 391 deletions.
1,211 changes: 833 additions & 378 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/v1alpha1/kafka_auto_commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package v1alpha1

type KafkaAutoCommit struct {
Enable bool `json:"enable" protobuf:"varint,1,opt,name=enable"`
}
2 changes: 2 additions & 0 deletions api/v1alpha1/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ type KafkaSource struct {
Kafka `json:",inline" protobuf:"bytes,1,opt,name=kafka"`
// +kubebuilder:default=Last
StartOffset KafkaOffset `json:"startOffset,omitempty" protobuf:"bytes,2,opt,name=startOffset,casttype=KafkaOffset"`
// +kubebuilder:default={enable:false}
AutoCommit KafkaAutoCommit `json:"autoCommit,omitempty" protobuf:"bytes,3,opt,name=autoCommit"`
}
16 changes: 16 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3362,6 +3362,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down Expand Up @@ -10136,6 +10145,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3357,6 +3357,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
18 changes: 18 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3362,6 +3362,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down Expand Up @@ -10136,6 +10145,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
18 changes: 18 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3362,6 +3362,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down Expand Up @@ -10136,6 +10145,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
18 changes: 18 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3362,6 +3362,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down Expand Up @@ -10136,6 +10145,15 @@ spec:
type: object
kafka:
properties:
autoCommit:
default:
enable: false
properties:
enable:
type: boolean
required:
- enable
type: object
brokers:
items:
type: string
Expand Down
11 changes: 7 additions & 4 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,16 @@ def dump(self):


class KafkaSource(Source):
def __init__(self, topic, name=None, retry=None):
def __init__(self, topic, name=None, retry=None, autoCommit=None):
super().__init__(name=name, retry=retry)
self._topic = topic
self._autoCommit = autoCommit

def dump(self):
x = super().dump()
x['topic'] = self._topic
if self._autoCommit:
x['autoCommit'] = self._autoCommit
return {'kafka': x}


Expand All @@ -549,11 +552,11 @@ def cron(schedule, layout=None, name=None, retry=None):


def http(name=None, retry=None, serviceName=None):
return HTTPSource(name=name, serviceName=serviceName)
return HTTPSource(name=name, serviceName=serviceName, retry=retry)


def kafka(topic, name=None, retry=None):
return KafkaSource(topic, name=name, retry=retry)
def kafka(topic, name=None, retry=None, autoCommit=None):
return KafkaSource(topic, name=name, retry=retry, autoCommit=autoCommit)


def stan(subject, name=None, retry=None):
Expand Down
7 changes: 4 additions & 3 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
)

type handler struct {
f source.Func
i int
f source.Func
i int
manualCommit bool
}

func (handler) Setup(_ sarama.ConsumerGroupSession) error {
Expand All @@ -29,7 +30,7 @@ func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Con
sess.MarkMessage(msg, "")
}
h.i++
if h.i%dfv1.CommitN == 0 {
if h.manualCommit && h.i%dfv1.CommitN == 0 {
sess.Commit()
}
}
Expand Down
14 changes: 9 additions & 5 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGr
if x.Kafka.MaxMessageBytes > 0 {
config.Consumer.Fetch.Max = x.Kafka.MaxMessageBytes
}
logger.Info("Kafka config Consumer.Fetch.Max", "value", config.Consumer.Fetch.Max)
config.Consumer.Offsets.AutoCommit.Enable = false
config.Consumer.Offsets.AutoCommit.Enable = x.AutoCommit.Enable
if x.StartOffset == "First" {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

logger.Info("Kafka consumer group ID", "consumerGroupID", consumerGroupID)
logger.Info("Kafka config",
"consumerGroupID", consumerGroupID,
"maxProcessingTime", config.Consumer.MaxProcessingTime.String(),
"fetchMax", config.Consumer.Fetch.Max,
"autoCommitEnable", config.Consumer.Offsets.AutoCommit.Enable,
"offsetsInitial", config.Consumer.Offsets.Initial,
)
consumerGroup, err := sarama.NewConsumerGroup(x.Brokers, consumerGroupID, config)
if err != nil {
return nil, err
}
h := handler{f, 0}
h := handler{f, 0, !x.AutoCommit.Enable}
go wait.JitterUntil(func() {
defer runtime.HandleCrash()
ctx := context.Background()
Expand Down
31 changes: 31 additions & 0 deletions test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,37 @@ func TestKafka(t *testing.T) {
WaitForPodsToBeDeleted()
}

func TestKafkaAutoCommit(t *testing.T) {
defer Setup(t)()

topic := CreateKafkaTopic()
sinkTopic := CreateKafkaTopic()

CreatePipeline(Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: "kafka"},
Spec: PipelineSpec{
Steps: []StepSpec{{
Name: "main",
Cat: &Cat{},
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}, AutoCommit: KafkaAutoCommit{Enable: true}}}},
Sinks: []Sink{{Kafka: &KafkaSink{Kafka: Kafka{Topic: sinkTopic}}}},
}},
},
})
WaitForPipeline()
WaitForPod()

PumpKafkaTopic(topic, 17)

WaitForPipeline(UntilMessagesSunk)

WaitForStep(TotalSourceMessages(17))
WaitForStep(TotalSunkMessages(17))

DeletePipelines()
WaitForPodsToBeDeleted()
}

func TestKafkaAsync(t *testing.T) {
defer Setup(t)()

Expand Down

0 comments on commit 45f7847

Please sign in to comment.