diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index e6b74647..db6c516e 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -57,6 +57,10 @@ const ( // Match any strings starting with "ON" (case insensitive) var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) +type tableIdentifier struct { + dbName, tableName string +} + // binlogReplicaApplier represents the process that applies updates from a binlog connection. // // This type is NOT used concurrently – there is currently only one single applier process running to process binlog @@ -64,6 +68,7 @@ var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) type binlogReplicaApplier struct { format *mysql.BinlogFormat tableMapsById map[uint64]*mysql.TableMap + tablesByName map[tableIdentifier]sql.Table stopReplicationChan chan struct{} currentGtid replication.GTID replicationSourceUuid string @@ -86,6 +91,7 @@ type binlogReplicaApplier struct { func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier { return &binlogReplicaApplier{ tableMapsById: make(map[uint64]*mysql.TableMap), + tablesByName: make(map[tableIdentifier]sql.Table), stopReplicationChan: make(chan struct{}), filters: filters, } @@ -895,6 +901,7 @@ func (a *binlogReplicaApplier) executeQueryWithEngine(ctx *sql.Context, engine * } if mysqlutil.CauseSchemaChange(node) { flushReason = delta.DDLStmtFlushReason + a.tablesByName = make(map[tableIdentifier]sql.Table) } } @@ -1013,7 +1020,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin ctx.GetLogger().Errorf(msg) MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, msg) } - pkSchema, tableName, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) + pkSchema, tableName, err := a.getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) if err != nil { return err } @@ -1199,6 +1206,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } a.deltaBufSize.Add(uint64(pos)) } + appender.IncDeleteEventCount() } // Insert the after image @@ -1229,6 +1237,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } a.deltaBufSize.Add(uint64(pos)) } + appender.IncInsertEventCount() } return nil @@ -1291,17 +1300,25 @@ varsLoop: // getTableSchema returns a sql.Schema for the case-insensitive |tableName| in the database named // |databaseName|, along with the exact, case-sensitive table name. -func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) { - database, err := engine.Analyzer.Catalog.Database(ctx, databaseName) - if err != nil { - return sql.PrimaryKeySchema{}, "", err - } - table, ok, err := database.GetTableInsensitive(ctx, tableName) - if err != nil { - return sql.PrimaryKeySchema{}, "", err - } - if !ok { - return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName) +func (a *binlogReplicaApplier) getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.PrimaryKeySchema, string, error) { + key := tableIdentifier{dbName: strings.ToLower(databaseName), tableName: strings.ToLower(tableName)} + table, found := a.tablesByName[key] + + if !found { + database, err := engine.Analyzer.Catalog.Database(ctx, databaseName) + if err != nil { + return sql.PrimaryKeySchema{}, "", err + } + var ok bool + table, ok, err = database.GetTableInsensitive(ctx, tableName) + if err != nil { + return sql.PrimaryKeySchema{}, "", err + } + if !ok { + return sql.PrimaryKeySchema{}, "", fmt.Errorf("unable to find table %q", tableName) + } + + a.tablesByName[key] = table } if pkTable, ok := table.(sql.PrimaryKeyTable); ok { diff --git a/binlogreplication/writer.go b/binlogreplication/writer.go index 19f4f8a9..1697c1ec 100644 --- a/binlogreplication/writer.go +++ b/binlogreplication/writer.go @@ -25,6 +25,11 @@ type DeltaAppender interface { TxnGroup() *array.BinaryDictionaryBuilder TxnSeqNumber() *array.Uint64Builder TxnStmtOrdinal() *array.Uint64Builder + IncInsertEventCount() + IncDeleteEventCount() + GetInsertEventCount() int + GetDeleteEventCount() int + ResetEventCounts() } type TableWriterProvider interface { diff --git a/delta/controller.go b/delta/controller.go index 073463f6..649092cc 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -147,7 +147,10 @@ func (c *DeltaController) updateTable( schema := appender.BaseSchema() // schema of the base table record := appender.Build() - defer record.Release() + defer func() { + record.Release() + appender.ResetEventCounts() + }() // fmt.Println("record:", record) @@ -197,26 +200,39 @@ func (c *DeltaController) updateTable( augmentedSchema := appender.Schema() var builder strings.Builder builder.Grow(512) - builder.WriteString("SELECT ") - builder.WriteString("r[1] AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for i, col := range augmentedSchema[1:] { - builder.WriteString(", r[") - builder.WriteString(strconv.Itoa(i + 2)) - builder.WriteString("]") - if types.IsTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") + if appender.GetDeleteEventCount() > 0 { + builder.WriteString("SELECT ") + builder.WriteString("r[1] AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for i, col := range augmentedSchema[1:] { + builder.WriteString(", r[") + builder.WriteString(strconv.Itoa(i + 2)) + builder.WriteString("]") + if types.IsTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } + builder.WriteString(" AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + } + builder.WriteString(" FROM (SELECT ") + builder.WriteString(pkList) + builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") + builder.WriteString(ipcSQL) + builder.WriteString(" GROUP BY ") + builder.WriteString(pkList) + builder.WriteString(")") + } else { + builder.WriteString("SELECT ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for _, col := range augmentedSchema[1:] { + builder.WriteString(", ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + if types.IsTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } } - builder.WriteString(" AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + builder.WriteString(ipcSQL) } - builder.WriteString(" FROM (SELECT ") - builder.WriteString(pkList) - builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") - builder.WriteString(ipcSQL) - builder.WriteString(" GROUP BY ") - builder.WriteString(pkList) - builder.WriteString(")") condenseDeltaSQL := builder.String() var ( @@ -244,7 +260,11 @@ func (c *DeltaController) updateTable( } // Insert or replace new rows (action = INSERT) into the base table. - insertSQL := "INSERT OR REPLACE INTO " + + insertSQL := "INSERT " + if appender.GetDeleteEventCount() > 0 { + insertSQL += "OR REPLACE " + } + insertSQL += "INTO " + qualifiedTableName + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) diff --git a/delta/delta.go b/delta/delta.go index b914773a..f8d25d98 100644 --- a/delta/delta.go +++ b/delta/delta.go @@ -17,8 +17,10 @@ type tableIdentifier struct { } type DeltaAppender struct { - schema sql.Schema - appender myarrow.ArrowAppender + schema sql.Schema + appender myarrow.ArrowAppender + insertEventCount int + deleteEventCount int } // Create a new appender. @@ -115,3 +117,24 @@ func (a *DeltaAppender) Grow(n int) { func (a *DeltaAppender) Release() { a.appender.Release() } + +func (a *DeltaAppender) IncInsertEventCount() { + a.insertEventCount++ +} + +func (a *DeltaAppender) IncDeleteEventCount() { + a.deleteEventCount++ +} + +func (a *DeltaAppender) GetInsertEventCount() int { + return a.insertEventCount +} + +func (a *DeltaAppender) GetDeleteEventCount() int { + return a.deleteEventCount +} + +func (a *DeltaAppender) ResetEventCounts() { + a.insertEventCount = 0 + a.deleteEventCount = 0 +}