Skip to content

Commit

Permalink
perf: speedup replication (apecloud#207)
Browse files Browse the repository at this point in the history
This commit improves replication in two ways:

1. Cache table schema in binary log applier instead of querying it from
   duckdb for each row event

2. Ingest delta using simple INSERT statement when there are no deletes
   or updates in the batch
  • Loading branch information
gl-sergei authored Nov 24, 2024
1 parent 2943d53 commit ee071fd
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 34 deletions.
41 changes: 29 additions & 12 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,18 @@ const (
// Match any strings starting with "ON" (case insensitive)
var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`)

type tableIdentifier struct {
dbName, tableName string
}

// binlogReplicaApplier represents the process that applies updates from a binlog connection.
//
// This type is NOT used concurrently – there is currently only one single applier process running to process binlog
// events, so the state in this type is NOT protected with a mutex.
type binlogReplicaApplier struct {
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
tablesByName map[tableIdentifier]sql.Table
stopReplicationChan chan struct{}
currentGtid replication.GTID
replicationSourceUuid string
Expand All @@ -86,6 +91,7 @@ type binlogReplicaApplier struct {
func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier {
return &binlogReplicaApplier{
tableMapsById: make(map[uint64]*mysql.TableMap),
tablesByName: make(map[tableIdentifier]sql.Table),
stopReplicationChan: make(chan struct{}),
filters: filters,
}
Expand Down Expand Up @@ -895,6 +901,7 @@ func (a *binlogReplicaApplier) executeQueryWithEngine(ctx *sql.Context, engine *
}
if mysqlutil.CauseSchemaChange(node) {
flushReason = delta.DDLStmtFlushReason
a.tablesByName = make(map[tableIdentifier]sql.Table)
}
}

Expand Down Expand Up @@ -1013,7 +1020,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
ctx.GetLogger().Errorf(msg)
MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, msg)
}
pkSchema, tableName, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
pkSchema, tableName, err := a.getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}
Expand Down Expand Up @@ -1199,6 +1206,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
}
a.deltaBufSize.Add(uint64(pos))
}
appender.IncDeleteEventCount()
}

// Insert the after image
Expand Down Expand Up @@ -1229,6 +1237,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
}
a.deltaBufSize.Add(uint64(pos))
}
appender.IncInsertEventCount()
}

return nil
Expand Down Expand Up @@ -1291,17 +1300,25 @@ varsLoop:

// getTableSchema returns a sql.Schema for the case-insensitive |tableName| in the database named
// |databaseName|, along with the exact, case-sensitive table name.
func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) {
database, err := engine.Analyzer.Catalog.Database(ctx, databaseName)
if err != nil {
return sql.PrimaryKeySchema{}, "", err
}
table, ok, err := database.GetTableInsensitive(ctx, tableName)
if err != nil {
return sql.PrimaryKeySchema{}, "", err
}
if !ok {
return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName)
func (a *binlogReplicaApplier) getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) {
key := tableIdentifier{dbName: strings.ToLower(databaseName), tableName: strings.ToLower(tableName)}
table, found := a.tablesByName[key]

if !found {
database, err := engine.Analyzer.Catalog.Database(ctx, databaseName)
if err != nil {
return sql.PrimaryKeySchema{}, "", err
}
var ok bool
table, ok, err = database.GetTableInsensitive(ctx, tableName)
if err != nil {
return sql.PrimaryKeySchema{}, "", err
}
if !ok {
return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName)
}

a.tablesByName[key] = table
}

if pkTable, ok := table.(sql.PrimaryKeyTable); ok {
Expand Down
5 changes: 5 additions & 0 deletions binlogreplication/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type DeltaAppender interface {
TxnGroup() *array.BinaryDictionaryBuilder
TxnSeqNumber() *array.Uint64Builder
TxnStmtOrdinal() *array.Uint64Builder
IncInsertEventCount()
IncDeleteEventCount()
GetInsertEventCount() int
GetDeleteEventCount() int
ResetEventCounts()
}

type TableWriterProvider interface {
Expand Down
60 changes: 40 additions & 20 deletions delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ func (c *DeltaController) updateTable(

schema := appender.BaseSchema() // schema of the base table
record := appender.Build()
defer record.Release()
defer func() {
record.Release()
appender.ResetEventCounts()
}()

// fmt.Println("record:", record)

Expand Down Expand Up @@ -197,26 +200,39 @@ func (c *DeltaController) updateTable(
augmentedSchema := appender.Schema()
var builder strings.Builder
builder.Grow(512)
builder.WriteString("SELECT ")
builder.WriteString("r[1] AS ")
builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name))
for i, col := range augmentedSchema[1:] {
builder.WriteString(", r[")
builder.WriteString(strconv.Itoa(i + 2))
builder.WriteString("]")
if types.IsTimestampType(col.Type) {
builder.WriteString("::TIMESTAMP")
if appender.GetDeleteEventCount() > 0 {
builder.WriteString("SELECT ")
builder.WriteString("r[1] AS ")
builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name))
for i, col := range augmentedSchema[1:] {
builder.WriteString(", r[")
builder.WriteString(strconv.Itoa(i + 2))
builder.WriteString("]")
if types.IsTimestampType(col.Type) {
builder.WriteString("::TIMESTAMP")
}
builder.WriteString(" AS ")
builder.WriteString(catalog.QuoteIdentifierANSI(col.Name))
}
builder.WriteString(" FROM (SELECT ")
builder.WriteString(pkList)
builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r")
builder.WriteString(ipcSQL)
builder.WriteString(" GROUP BY ")
builder.WriteString(pkList)
builder.WriteString(")")
} else {
builder.WriteString("SELECT ")
builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name))
for _, col := range augmentedSchema[1:] {
builder.WriteString(", ")
builder.WriteString(catalog.QuoteIdentifierANSI(col.Name))
if types.IsTimestampType(col.Type) {
builder.WriteString("::TIMESTAMP")
}
}
builder.WriteString(" AS ")
builder.WriteString(catalog.QuoteIdentifierANSI(col.Name))
builder.WriteString(ipcSQL)
}
builder.WriteString(" FROM (SELECT ")
builder.WriteString(pkList)
builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r")
builder.WriteString(ipcSQL)
builder.WriteString(" GROUP BY ")
builder.WriteString(pkList)
builder.WriteString(")")
condenseDeltaSQL := builder.String()

var (
Expand Down Expand Up @@ -244,7 +260,11 @@ func (c *DeltaController) updateTable(
}

// Insert or replace new rows (action = INSERT) into the base table.
insertSQL := "INSERT OR REPLACE INTO " +
insertSQL := "INSERT "
if appender.GetDeleteEventCount() > 0 {
insertSQL += "OR REPLACE "
}
insertSQL += "INTO " +
qualifiedTableName +
" SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " +
strconv.Itoa(int(binlog.InsertRowEvent))
Expand Down
27 changes: 25 additions & 2 deletions delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ type tableIdentifier struct {
}

type DeltaAppender struct {
schema sql.Schema
appender myarrow.ArrowAppender
schema sql.Schema
appender myarrow.ArrowAppender
insertEventCount int
deleteEventCount int
}

// Create a new appender.
Expand Down Expand Up @@ -115,3 +117,24 @@ func (a *DeltaAppender) Grow(n int) {
func (a *DeltaAppender) Release() {
a.appender.Release()
}

func (a *DeltaAppender) IncInsertEventCount() {
a.insertEventCount++
}

func (a *DeltaAppender) IncDeleteEventCount() {
a.deleteEventCount++
}

func (a *DeltaAppender) GetInsertEventCount() int {
return a.insertEventCount
}

func (a *DeltaAppender) GetDeleteEventCount() int {
return a.deleteEventCount
}

func (a *DeltaAppender) ResetEventCounts() {
a.insertEventCount = 0
a.deleteEventCount = 0
}

0 comments on commit ee071fd

Please sign in to comment.