From ee071fdbf29e404d825ed73ae907dfc54b968e88 Mon Sep 17 00:00:00 2001 From: Sergei Glushchenko Date: Sun, 24 Nov 2024 17:52:15 +0300 Subject: [PATCH] perf: speedup replication (#207) This commit improves replication in two ways: 1. Cache table schema in binary log applier instead of querying it from duckdb for each row event 2. Ingest delta using simple INSERT statement when there are no deletes or updates in the batch --- binlogreplication/binlog_replica_applier.go | 41 +++++++++----- binlogreplication/writer.go | 5 ++ delta/controller.go | 60 ++++++++++++++------- delta/delta.go | 27 +++++++++- 4 files changed, 99 insertions(+), 34 deletions(-) diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index e6b74647..db6c516e 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -57,6 +57,10 @@ const ( // Match any strings starting with "ON" (case insensitive) var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) +type tableIdentifier struct { + dbName, tableName string +} + // binlogReplicaApplier represents the process that applies updates from a binlog connection. // // This type is NOT used concurrently – there is currently only one single applier process running to process binlog @@ -64,6 +68,7 @@ var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) type binlogReplicaApplier struct { format *mysql.BinlogFormat tableMapsById map[uint64]*mysql.TableMap + tablesByName map[tableIdentifier]sql.Table stopReplicationChan chan struct{} currentGtid replication.GTID replicationSourceUuid string @@ -86,6 +91,7 @@ type binlogReplicaApplier struct { func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier { return &binlogReplicaApplier{ tableMapsById: make(map[uint64]*mysql.TableMap), + tablesByName: make(map[tableIdentifier]sql.Table), stopReplicationChan: make(chan struct{}), filters: filters, } @@ -895,6 +901,7 @@ func (a *binlogReplicaApplier) executeQueryWithEngine(ctx *sql.Context, engine * } if mysqlutil.CauseSchemaChange(node) { flushReason = delta.DDLStmtFlushReason + a.tablesByName = make(map[tableIdentifier]sql.Table) } } @@ -1013,7 +1020,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin ctx.GetLogger().Errorf(msg) MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, msg) } - pkSchema, tableName, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) + pkSchema, tableName, err := a.getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) if err != nil { return err } @@ -1199,6 +1206,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } a.deltaBufSize.Add(uint64(pos)) } + appender.IncDeleteEventCount() } // Insert the after image @@ -1229,6 +1237,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } a.deltaBufSize.Add(uint64(pos)) } + appender.IncInsertEventCount() } return nil @@ -1291,17 +1300,25 @@ varsLoop: // getTableSchema returns a sql.Schema for the case-insensitive |tableName| in the database named // |databaseName|, along with the exact, case-sensitive table name. -func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) { - database, err := engine.Analyzer.Catalog.Database(ctx, databaseName) - if err != nil { - return sql.PrimaryKeySchema{}, "", err - } - table, ok, err := database.GetTableInsensitive(ctx, tableName) - if err != nil { - return sql.PrimaryKeySchema{}, "", err - } - if !ok { - return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName) +func (a *binlogReplicaApplier) getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) { + key := tableIdentifier{dbName: strings.ToLower(databaseName), tableName: strings.ToLower(tableName)} + table, found := a.tablesByName[key] + + if !found { + database, err := engine.Analyzer.Catalog.Database(ctx, databaseName) + if err != nil { + return sql.PrimaryKeySchema{}, "", err + } + var ok bool + table, ok, err = database.GetTableInsensitive(ctx, tableName) + if err != nil { + return sql.PrimaryKeySchema{}, "", err + } + if !ok { + return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName) + } + + a.tablesByName[key] = table } if pkTable, ok := table.(sql.PrimaryKeyTable); ok { diff --git a/binlogreplication/writer.go b/binlogreplication/writer.go index 19f4f8a9..1697c1ec 100644 --- a/binlogreplication/writer.go +++ b/binlogreplication/writer.go @@ -25,6 +25,11 @@ type DeltaAppender interface { TxnGroup() *array.BinaryDictionaryBuilder TxnSeqNumber() *array.Uint64Builder TxnStmtOrdinal() *array.Uint64Builder + IncInsertEventCount() + IncDeleteEventCount() + GetInsertEventCount() int + GetDeleteEventCount() int + ResetEventCounts() } type TableWriterProvider interface { diff --git a/delta/controller.go b/delta/controller.go index 073463f6..649092cc 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -147,7 +147,10 @@ func (c *DeltaController) updateTable( schema := appender.BaseSchema() // schema of the base table record := appender.Build() - defer record.Release() + defer func() { + record.Release() + appender.ResetEventCounts() + }() // fmt.Println("record:", record) @@ -197,26 +200,39 @@ func (c *DeltaController) updateTable( augmentedSchema := appender.Schema() var builder strings.Builder builder.Grow(512) - builder.WriteString("SELECT ") - builder.WriteString("r[1] AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for i, col := range augmentedSchema[1:] { - builder.WriteString(", r[") - builder.WriteString(strconv.Itoa(i + 2)) - builder.WriteString("]") - if types.IsTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") + if appender.GetDeleteEventCount() > 0 { + builder.WriteString("SELECT ") + builder.WriteString("r[1] AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for i, col := range augmentedSchema[1:] { + builder.WriteString(", r[") + builder.WriteString(strconv.Itoa(i + 2)) + builder.WriteString("]") + if types.IsTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } + builder.WriteString(" AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + } + builder.WriteString(" FROM (SELECT ") + builder.WriteString(pkList) + builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") + builder.WriteString(ipcSQL) + builder.WriteString(" GROUP BY ") + builder.WriteString(pkList) + builder.WriteString(")") + } else { + builder.WriteString("SELECT ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for _, col := range augmentedSchema[1:] { + builder.WriteString(", ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + if types.IsTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } } - builder.WriteString(" AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + builder.WriteString(ipcSQL) } - builder.WriteString(" FROM (SELECT ") - builder.WriteString(pkList) - builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") - builder.WriteString(ipcSQL) - builder.WriteString(" GROUP BY ") - builder.WriteString(pkList) - builder.WriteString(")") condenseDeltaSQL := builder.String() var ( @@ -244,7 +260,11 @@ func (c *DeltaController) updateTable( } // Insert or replace new rows (action = INSERT) into the base table. - insertSQL := "INSERT OR REPLACE INTO " + + insertSQL := "INSERT " + if appender.GetDeleteEventCount() > 0 { + insertSQL += "OR REPLACE " + } + insertSQL += "INTO " + qualifiedTableName + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) diff --git a/delta/delta.go b/delta/delta.go index b914773a..f8d25d98 100644 --- a/delta/delta.go +++ b/delta/delta.go @@ -17,8 +17,10 @@ type tableIdentifier struct { } type DeltaAppender struct { - schema sql.Schema - appender myarrow.ArrowAppender + schema sql.Schema + appender myarrow.ArrowAppender + insertEventCount int + deleteEventCount int } // Create a new appender. @@ -115,3 +117,24 @@ func (a *DeltaAppender) Grow(n int) { func (a *DeltaAppender) Release() { a.appender.Release() } + +func (a *DeltaAppender) IncInsertEventCount() { + a.insertEventCount++ +} + +func (a *DeltaAppender) IncDeleteEventCount() { + a.deleteEventCount++ +} + +func (a *DeltaAppender) GetInsertEventCount() int { + return a.insertEventCount +} + +func (a *DeltaAppender) GetDeleteEventCount() int { + return a.deleteEventCount +} + +func (a *DeltaAppender) ResetEventCounts() { + a.insertEventCount = 0 + a.deleteEventCount = 0 +}