Skip to content

Commit a52fbf8

Browse files
author
ffffwh
committed
fix repeated kafka run
a bug introduced in 7406c29.
1 parent 564ecae commit a52fbf8

File tree

2 files changed

+2
-1
lines changed

2 files changed

+2
-1
lines changed

drivers/mysql/driver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,6 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
495495
d.logger.Debug("found kafka", "KafkaConfig", driverConfig.KafkaConfig)
496496
h.runner = kafka.NewKafkaRunner(ctx, driverConfig.KafkaConfig, d.logger,
497497
d.storeManager, d.config.NatsAdvertise, h.waitCh)
498-
go h.runner.Run()
499498
} else {
500499
h.runner, err = mysql.NewApplier(ctx, driverConfig, d.logger, d.storeManager,
501500
d.config.NatsAdvertise, h.waitCh,d.eventer, h.taskConfig)

drivers/mysql/kafka/kafka3.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ func (kr *KafkaRunner) handleFullCopy() {
352352
}
353353
}
354354
func (kr *KafkaRunner) handleIncr() {
355+
kr.logger.Debug("handleIncr")
355356
var err error
356357
bigEntries := &common.BinlogEntries{}
357358
for !kr.shutdown {
@@ -696,6 +697,7 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(
696697
}
697698

698699
func (kr *KafkaRunner) kafkaTransformDMLEventQuery(dmlEvent *common.BinlogEntry) (err error) {
700+
kr.logger.Debug("kafkaTransformDMLEventQuery", "gno", dmlEvent.Coordinates.GNO)
699701
txSid := dmlEvent.Coordinates.GetSid()
700702

701703
for i, _ := range dmlEvent.Events {

0 commit comments

Comments
 (0)