Skip to content

Commit 564ecae

Browse files
author
ffffwh
committed
fix repeated ack to dtle.src in dtle.dest.kafka
1 parent 4c2e115 commit 564ecae

File tree

1 file changed

+2
-7
lines changed

1 file changed

+2
-7
lines changed

drivers/mysql/kafka/kafka3.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,12 +476,7 @@ func (kr *KafkaRunner) initiateStreaming() error {
476476
}
477477

478478
_, err = kr.natsConn.Subscribe(fmt.Sprintf("%s_incr_hete", kr.subject), func(m *gonats.Msg) {
479-
kr.logger.Debug("recv a incr_hete msg")
480-
if err := kr.natsConn.Publish(m.Reply, nil); err != nil {
481-
kr.onError(TaskStateDead, errors.Wrap(err, "Publish"))
482-
return
483-
}
484-
kr.logger.Debug("ack a incr_hete msg")
479+
kr.logger.Debug("recv an incr_hete msg")
485480

486481
var binlogEntries common.BinlogEntries
487482
if err := common.Decode(m.Data, &binlogEntries); err != nil {
@@ -491,12 +486,12 @@ func (kr *KafkaRunner) initiateStreaming() error {
491486
t := time.NewTimer(common.DefaultConnectWait / 2)
492487
select {
493488
case kr.chBinlogEntries <-&binlogEntries:
494-
atomic.AddInt64(kr.memory2, int64(binlogEntries.Size()))
495489
if err := kr.natsConn.Publish(m.Reply, nil); err != nil {
496490
kr.onError(TaskStateDead, errors.Wrap(err, "Publish"))
497491
return
498492
}
499493
kr.logger.Debug("ack an incr_hete msg")
494+
atomic.AddInt64(kr.memory2, int64(binlogEntries.Size()))
500495
case <-t.C:
501496
kr.logger.Debug("discard an incr_hete msg")
502497
//kr.natsConn.Publish(m.Reply, "wait")

0 commit comments

Comments
 (0)