Skip to content

Commit

Permalink
perf: avoid unneeded steps when flushing delta
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 committed Dec 3, 2024
1 parent c2372a8 commit 07f655a
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 123 deletions.
6 changes: 5 additions & 1 deletion binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,14 +1245,18 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
}

func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.FlushReason) error {
conn, err := adapter.GetConn(ctx)
if err != nil {
return err
}
tx, err := adapter.GetCatalogTxn(ctx, nil)
if err != nil {
return err
}

defer a.deltaBufSize.Store(0)

if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, tx, reason); err != nil {
if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, conn, tx, reason); err != nil {
ctx.GetLogger().Errorf("Failed to flush changelog: %v", err.Error())
MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion binlogreplication/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type TableWriterProvider interface {
) (DeltaAppender, error)

// FlushDelta writes the accumulated changes to the database.
FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error
FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error

// DiscardDeltaBuffer discards the accumulated changes.
DiscardDeltaBuffer(ctx *sql.Context)
Expand Down
Loading

0 comments on commit 07f655a

Please sign in to comment.