diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index 2283617e..054dc885 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -1245,6 +1245,10 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.FlushReason) error { + conn, err := adapter.GetConn(ctx) + if err != nil { + return err + } tx, err := adapter.GetCatalogTxn(ctx, nil) if err != nil { return err @@ -1252,7 +1256,7 @@ func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.F defer a.deltaBufSize.Store(0) - if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, tx, reason); err != nil { + if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, conn, tx, reason); err != nil { ctx.GetLogger().Errorf("Failed to flush changelog: %v", err.Error()) MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, err.Error()) } diff --git a/binlogreplication/writer.go b/binlogreplication/writer.go index 1a107d41..3a6ec088 100644 --- a/binlogreplication/writer.go +++ b/binlogreplication/writer.go @@ -51,7 +51,7 @@ type TableWriterProvider interface { ) (DeltaAppender, error) // FlushDelta writes the accumulated changes to the database. - FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error + FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error // DiscardDeltaBuffer discards the accumulated changes. DiscardDeltaBuffer(ctx *sql.Context) diff --git a/delta/controller.go b/delta/controller.go index 687fb70f..228810eb 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -1,22 +1,23 @@ package delta import ( - "bytes" stdsql "database/sql" "fmt" + "hash/maphash" "math/bits" "strconv" "strings" "sync" - "unsafe" - "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apecloud/myduckserver/binlog" "github.com/apecloud/myduckserver/catalog" "github.com/apecloud/myduckserver/pgtypes" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/types" "github.com/jackc/pgx/v5/pgtype" + "github.com/marcboeker/go-duckdb" "github.com/sirupsen/logrus" ) @@ -29,11 +30,13 @@ type FlushStats struct { type DeltaController struct { mutex sync.Mutex tables map[tableIdentifier]*DeltaAppender + seed maphash.Seed } func NewController() *DeltaController { return &DeltaController{ tables: make(map[tableIdentifier]*DeltaAppender), + seed: maphash.MakeSeed(), } } @@ -68,7 +71,7 @@ func (c *DeltaController) Close() { } // Flush writes the accumulated changes to the database. -func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushReason) (FlushStats, error) { +func (c *DeltaController) Flush(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason FlushReason) (FlushStats, error) { c.mutex.Lock() defer c.mutex.Unlock() @@ -93,16 +96,13 @@ func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushRea // See: // https://duckdb.org/docs/sql/indexes.html#limitations-of-art-indexes // https://github.com/duckdb/duckdb/issues/14133 - var ( - // Share the buffer among all tables. - buf bytes.Buffer - stats FlushStats - ) + + var stats FlushStats for table, appender := range c.tables { deltaRowCount := appender.RowCount() if deltaRowCount > 0 { - if err := c.updateTable(ctx, tx, table, appender, &buf, &stats); err != nil { + if err := c.updateTable(ctx, conn, tx, table, appender, &stats); err != nil { return stats, err } } @@ -135,121 +135,247 @@ func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushRea func (c *DeltaController) updateTable( ctx *sql.Context, + conn *stdsql.Conn, tx *stdsql.Tx, table tableIdentifier, appender *DeltaAppender, - buf *bytes.Buffer, stats *FlushStats, ) error { if tx == nil { return fmt.Errorf("no active transaction") } + defer appender.ResetCounters() + + // We consider the following cases: + // 1. INSERT only - no DELETE or UPDATE. In this case, we can do a simple INSERT INTO in an optimized way, + // without the deduplication step (as the source has confirmed that there are no duplicates) and the DELETE step. + // The data can go directly from the delta view to the base table. + // 2. DELETE only - no INSERT or UPDATE. In this case, we can do a simple DELETE FROM in an optimized way, + // without the the INSERT step and the deduplication step (as the source has confirmed that there are no duplicates). + // The delta view can be directly used to delete rows from the base table, without the need for a temporary table. + // 3. INSERT + non-primary-key UPDATE - no DELETE. In this case, we can skip the DELETE step. + // Therefore, the temporary table is not needed as the delta view will be read only once. + // 4. The general case - INSERT, DELETE, and UPDATE. In this case, we need to create a temporary table + // to store the deduplicated delta and then do the INSERT and DELETE steps. + + // Identify the types of changes in the delta + hasInserts := appender.counters.event.insert > 0 + hasDeletes := appender.counters.event.delete > 0 + hasUpdates := appender.counters.event.update > 0 + + switch { + case hasInserts && !hasDeletes && !hasUpdates: + // Case 1: INSERT only + return c.handleInsertOnly(ctx, conn, tx, table, appender, stats) + case hasDeletes && !hasInserts && !hasUpdates: + // Case 2: DELETE only + return c.handleDeleteOnly(ctx, conn, tx, table, appender, stats) + case appender.counters.action.delete == 0: + // Case 3: INSERT + non-primary-key UPDATE + return c.handleZeroDelete(ctx, conn, tx, table, appender, stats) + default: + // Case 4: General case + return c.handleGeneralCase(ctx, conn, tx, table, appender, stats) + } +} - buf.Reset() - - schema := appender.BaseSchema() // schema of the base table +// Helper function to build the Arrow record and register the view +func (c *DeltaController) prepareArrowView( + conn *stdsql.Conn, + table tableIdentifier, + appender *DeltaAppender, +) (viewName string, close func(), err error) { record := appender.Build() - defer func() { + + var ar *duckdb.Arrow + err = conn.Raw(func(driverConn any) error { + var err error + ar, err = duckdb.NewArrowFromConn(driverConn.(*duckdb.Conn)) + return err + }) + if err != nil { record.Release() - appender.ResetCounters() - }() + return "", nil, err + } - // fmt.Println("record:", record) + reader, err := array.NewRecordReader(record.Schema(), []arrow.Record{record}) + if err != nil { + record.Release() + return "", nil, err + } + + // Register the Arrow view + hash := maphash.String(c.seed, table.dbName+"\x00"+table.tableName) + viewName = "__sys_view_arrow_delta_" + strconv.FormatUint(hash, 16) + "__" + release, err := ar.RegisterView(reader, viewName) + if err != nil { + reader.Release() + record.Release() + return "", nil, err + } - // TODO(fan): Switch to zero-copy Arrow ingestion once this PR is merged: - // https://github.com/marcboeker/go-duckdb/pull/283 - w := ipc.NewWriter(buf, ipc.WithSchema(record.Schema())) - if err := w.Write(record); err != nil { + close = func() { + release() + reader.Release() + record.Release() + } + return viewName, close, nil +} + +func (c *DeltaController) handleInsertOnly( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + defer release() + + // Perform direct INSERT without deduplication + var b strings.Builder + b.Grow(128) + + b.WriteString("INSERT INTO ") + b.WriteString(catalog.ConnectIdentifiersANSI(table.dbName, table.tableName)) + b.WriteString(" SELECT ") + buildColumnList(b, appender.BaseSchema()) + b.WriteString(" FROM ") + b.WriteString(viewName) + + result, err := tx.ExecContext(ctx, b.String()) + if err != nil { return err } - if err := w.Close(); err != nil { + + affected, err := result.RowsAffected() + if err != nil { + return err + } + stats.Insertions += affected + stats.DeltaSize += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Inserted") + } + + return nil +} + +func (c *DeltaController) handleDeleteOnly( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { return err } - bytes := buf.Bytes() - size := len(bytes) - ptr := unsafe.Pointer(&bytes[0]) - ipcSQL := fmt.Sprintf( - " FROM scan_arrow_ipc([{ptr: %d::ubigint, size: %d::ubigint}])", - uintptr(ptr), size, - ) + defer release() qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + pk := getPrimaryKeyStruct(appender.BaseSchema()) - pkColumns := make([]int, 0, 1) // Most tables have a single-column primary key - for i, col := range schema { - if col.PrimaryKey { - pkColumns = append(pkColumns, i) - } + // Perform direct DELETE without deduplication + deleteSQL := "DELETE FROM " + qualifiedTableName + + " WHERE " + pk + " IN (SELECT " + pk + " FROM " + viewName + ")" + result, err := tx.ExecContext(ctx, deleteSQL) + if err != nil { + return err } - pkList := catalog.QuoteIdentifierANSI(schema[pkColumns[0]].Name) - for _, i := range pkColumns[1:] { - pkList += ", " + catalog.QuoteIdentifierANSI(schema[i].Name) + + affected, err := result.RowsAffected() + if err != nil { + return err } + stats.Deletions += affected + stats.DeltaSize += affected - // Use the following SQL to get the latest view of the rows being updated. - // - // SELECT r[0] as action, ... - // FROM ( - // SELECT - // pk1, pk2, ..., - // LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r - // FROM delta - // GROUP BY pk1, pk2, ... - // ) - // - // Note that an update generates two rows: one for DELETE and one for INSERT. - // So the numeric value of DELETE action MUST be smaller than that of INSERT. - augmentedSchema := appender.Schema() - var builder strings.Builder - builder.Grow(512) - if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image - builder.WriteString("SELECT ") - builder.WriteString("r[1] AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for i, col := range augmentedSchema[1:] { - builder.WriteString(", r[") - builder.WriteString(strconv.Itoa(i + 2)) - builder.WriteString("]") - if isTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") - } - builder.WriteString(" AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) - } - builder.WriteString(" FROM (SELECT ") - builder.WriteString(pkList) - builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") - builder.WriteString(ipcSQL) - builder.WriteString(" GROUP BY ") - builder.WriteString(pkList) - builder.WriteString(")") - } else { - // For pure INSERTs, since the source has confirmed that there are no duplicates, - // we can skip the deduplication step. - builder.WriteString("SELECT ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for _, col := range augmentedSchema[1:] { - builder.WriteString(", ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) - if types.IsTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") - } - } - builder.WriteString(ipcSQL) + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Deleted") } - condenseDeltaSQL := builder.String() - var ( - result stdsql.Result - affected int64 - err error - ) + return nil +} - // Create a temporary table to store the latest delta view. - result, err = tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) - if err == nil { - affected, err = result.RowsAffected() +func (c *DeltaController) handleZeroDelete( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + defer release() + + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + + insertSQL := "INSERT OR REPLACE INTO " + + catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM (" + condenseDeltaSQL + ")" + result, err := tx.ExecContext(ctx, insertSQL) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + stats.Insertions += affected + stats.DeltaSize += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Upserted") + } + + return nil +} + +func (c *DeltaController) handleGeneralCase( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + + // Create a temporary table to store the latest delta view + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + result, err := tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) + release() // release the Arrow view immediately + if err != nil { + return err } + affected, err := result.RowsAffected() if err != nil { return err } @@ -258,17 +384,16 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, }).Debug("Delta created") } + qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + // Insert or replace new rows (action = INSERT) into the base table. - insertSQL := "INSERT " - if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image - insertSQL += "OR REPLACE " - } - insertSQL += "INTO " + + insertSQL := "INSERT OR REPLACE INTO " + qualifiedTableName + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) @@ -283,14 +408,10 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, - }).Debug("Inserted") - } - - // If there are no rows to delete, we can skip the DELETE step. - if appender.counters.action.delete == 0 { - return nil + }).Debug("Upserted") } // Delete rows that have been deleted. @@ -298,10 +419,7 @@ func (c *DeltaController) updateTable( // which is more efficient than ordinary INNER JOIN. // DuckDB does not support multiple columns in `IN` clauses, // so we need to handle this case separately using the `row()` function. - inTuple := pkList - if len(pkColumns) > 1 { - inTuple = "row(" + pkList + ")" - } + inTuple := getPrimaryKeyStruct(appender.BaseSchema()) deleteSQL := "DELETE FROM " + qualifiedTableName + " WHERE " + inTuple + " IN (SELECT " + inTuple + "FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.DeleteRowEvent)) + ")" @@ -335,7 +453,8 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, }).Debug("Deleted") } @@ -343,6 +462,87 @@ func (c *DeltaController) updateTable( return nil } +// Helper function to build column list with timestamp handling +func buildColumnList(b strings.Builder, schema sql.Schema) { + for i, col := range schema { + if i > 0 { + b.WriteString(", ") + } + b.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + if isTimestampType(col.Type) { + b.WriteString("::TIMESTAMP") + } + } +} + +// Helper function to get the primary key. For composite primary keys, `row()` is used. +func getPrimaryKeyStruct(schema sql.Schema) string { + pks := make([]string, 0, 1) + for _, col := range schema { + if col.PrimaryKey { + pks = append(pks, catalog.QuoteIdentifierANSI(col.Name)) + } + } + if len(pks) == 0 { + return "" + } else if len(pks) == 1 { + return pks[0] + } + return "row(" + strings.Join(pks, ", ") + ")" +} + +// Helper function to get the primary key list. +func getPrimaryKeyList(schema sql.Schema) string { + pks := make([]string, 0, 1) + for _, col := range schema { + if col.PrimaryKey { + pks = append(pks, catalog.QuoteIdentifierANSI(col.Name)) + } + } + return strings.Join(pks, ", ") +} + +func buildCondenseDeltaSQL(viewName string, augmentedSchema sql.Schema, pkList string) string { + var builder strings.Builder + builder.Grow(512) + // Use the following SQL to get the latest view of the rows being updated. + // + // SELECT r[0] as action, ... + // FROM ( + // SELECT + // pk1, pk2, ..., + // LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r + // FROM delta + // GROUP BY pk1, pk2, ... + // ) + // + // Note that an update generates two rows: one for DELETE and one for INSERT. + // So the numeric value of DELETE action MUST be smaller than that of INSERT. + builder.Grow(512) + builder.WriteString("SELECT ") + builder.WriteString("r[1] AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for i, col := range augmentedSchema[1:] { + builder.WriteString(", r[") + builder.WriteString(strconv.Itoa(i + 2)) + builder.WriteString("]") + if isTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } + builder.WriteString(" AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + } + builder.WriteString(" FROM (SELECT ") + builder.WriteString(pkList) + builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") + builder.WriteString(" FROM ") + builder.WriteString(viewName) + builder.WriteString(" GROUP BY ") + builder.WriteString(pkList) + builder.WriteString(")") + return builder.String() +} + func isTimestampType(t sql.Type) bool { if types.IsTimestampType(t) { return true diff --git a/pgserver/logrepl/replication.go b/pgserver/logrepl/replication.go index ea4a9037..c76c9a46 100644 --- a/pgserver/logrepl/replication.go +++ b/pgserver/logrepl/replication.go @@ -981,6 +981,10 @@ func (r *LogicalReplicator) commitOngoingTxnIfClean(state *replicationState, rea // commitOngoingTxn commits the current transaction func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReason delta.FlushReason) error { + conn, err := adapter.GetConn(state.replicaCtx) + if err != nil { + return err + } tx := adapter.TryGetTxn(state.replicaCtx) if tx == nil { return nil @@ -990,7 +994,7 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso defer adapter.CloseTxn(state.replicaCtx) // Flush the delta buffer if too large - err := r.flushDeltaBuffer(state, tx, flushReason) + err = r.flushDeltaBuffer(state, conn, tx, flushReason) if err != nil { return err } @@ -1018,12 +1022,12 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso } // flushDeltaBuffer flushes the accumulated changes in the delta buffer -func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, tx *stdsql.Tx, reason delta.FlushReason) error { +func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error { defer func() { state.deltaBufSize = 0 }() - _, err := state.deltas.Flush(state.replicaCtx, tx, reason) + _, err := state.deltas.Flush(state.replicaCtx, conn, tx, reason) return err } diff --git a/replica/replication.go b/replica/replication.go index 5b2f07fc..64f07f57 100644 --- a/replica/replication.go +++ b/replica/replication.go @@ -83,8 +83,8 @@ func (twp *tableWriterProvider) GetDeltaAppender( return twp.controller.GetDeltaAppender(databaseName, tableName, schema) } -func (twp *tableWriterProvider) FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error { - _, err := twp.controller.Flush(ctx, tx, reason) +func (twp *tableWriterProvider) FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error { + _, err := twp.controller.Flush(ctx, conn, tx, reason) return err }