Skip to content

Commit

Permalink
[close #394] Fix disorder of messages to Kafka (#397)
Browse files Browse the repository at this point in the history
* collect pprof heap

Signed-off-by: Ping Yu <[email protected]>

* unlimit retry for pd connection

Signed-off-by: Ping Yu <[email protected]>

* reduce record size

Signed-off-by: Ping Yu <[email protected]>

* log level: info

Signed-off-by: Ping Yu <[email protected]>

* reduce data size; add grafana panel

Signed-off-by: Ping Yu <[email protected]>

* batch

Signed-off-by: Ping Yu <[email protected]>

* fix

Signed-off-by: Ping Yu <[email protected]>

* try debug

Signed-off-by: Ping Yu <[email protected]>

* fix encoder size

Signed-off-by: Ping Yu <[email protected]>

* fix

Signed-off-by: Ping Yu <[email protected]>

* MQMessage pool

Signed-off-by: Ping Yu <[email protected]>

* fix release

Signed-off-by: Ping Yu <[email protected]>

* wip

Signed-off-by: Ping Yu <[email protected]>

* fix flaky ut

Signed-off-by: Ping Yu <[email protected]>

* logging

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* wip

Signed-off-by: Ping Yu <[email protected]>

* adjust memory release parameter

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* idempotent = true

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* revert time count

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Mar 5, 2024
1 parent 50e26d5 commit 1b33b2a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
13 changes: 13 additions & 0 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand Down Expand Up @@ -146,6 +157,8 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
Expand Down
11 changes: 11 additions & 0 deletions cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Config struct {
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool
// Whether to enable idempotent producer
Idempotent bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -63,6 +65,7 @@ func NewConfig() *Config {
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
Idempotent: true,
}
}

Expand Down Expand Up @@ -231,6 +234,14 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// and https://github.com/tikv/migration/cdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

// See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence
config.Producer.Idempotent = c.Idempotent
if c.Idempotent {
config.Net.MaxOpenRequests = 1
} else {
log.Warn("The idempotent producer is disabled, which may cause data reordering")
}

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down
3 changes: 3 additions & 0 deletions cdc/cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

newSaramaConfigImplBak := NewSaramaConfigImpl
Expand Down Expand Up @@ -339,6 +340,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

NewAdminClientImpl = kafka.NewMockAdminClient
Expand Down Expand Up @@ -421,6 +423,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.AutoCreate = false
config.Idempotent = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

NewAdminClientImpl = kafka.NewMockAdminClient
Expand Down

0 comments on commit 1b33b2a

Please sign in to comment.