Skip to content

Commit

Permalink
fix: make PG replication robust to the insert-only optimization (#210)
Browse files Browse the repository at this point in the history
* fix: make PG replication robust to the insert-only optimization
* fix: conn is already closed
* Refine delta stats collection
  • Loading branch information
fanyang01 authored Nov 25, 2024
1 parent fe90074 commit 6806cbf
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 42 deletions.
2 changes: 1 addition & 1 deletion backend/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (p *ConnectionPool) Close() error {
return true
})
for _, conn := range conns {
if err := conn.Close(); err != nil {
if err := conn.Close(); err != nil && !errors.Is(err, stdsql.ErrConnDone) {
logrus.WithError(err).Warn("Failed to close connection")
lastErr = err
}
Expand Down
5 changes: 3 additions & 2 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
}
a.deltaBufSize.Add(uint64(pos))
}
appender.IncDeleteEventCount()
appender.UpdateActionStats(binlog.DeleteRowEvent, len(rows.Rows))
}

// Insert the after image
Expand Down Expand Up @@ -1237,9 +1237,10 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
}
a.deltaBufSize.Add(uint64(pos))
}
appender.IncInsertEventCount()
appender.UpdateActionStats(binlog.InsertRowEvent, len(rows.Rows))
}

appender.ObserveEvents(event, len(rows.Rows))
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions binlogreplication/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ type DeltaAppender interface {
TxnGroup() *array.BinaryDictionaryBuilder
TxnSeqNumber() *array.Uint64Builder
TxnStmtOrdinal() *array.Uint64Builder
IncInsertEventCount()
IncDeleteEventCount()
GetInsertEventCount() int
GetDeleteEventCount() int
ResetEventCounts()

UpdateActionStats(action binlog.RowEventType, count int)
ObserveEvents(event binlog.RowEventType, count int)
}

type TableWriterProvider interface {
Expand Down
17 changes: 12 additions & 5 deletions delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func (c *DeltaController) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()

for k, da := range c.tables {
for _, da := range c.tables {
da.appender.Release()
delete(c.tables, k)
}
clear(c.tables)
}

// Flush writes the accumulated changes to the database.
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *DeltaController) updateTable(
record := appender.Build()
defer func() {
record.Release()
appender.ResetEventCounts()
appender.ResetCounters()
}()

// fmt.Println("record:", record)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *DeltaController) updateTable(
augmentedSchema := appender.Schema()
var builder strings.Builder
builder.Grow(512)
if appender.GetDeleteEventCount() > 0 {
if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image
builder.WriteString("SELECT ")
builder.WriteString("r[1] AS ")
builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name))
Expand All @@ -222,6 +222,8 @@ func (c *DeltaController) updateTable(
builder.WriteString(pkList)
builder.WriteString(")")
} else {
// For pure INSERTs, since the source has confirmed that there are no duplicates,
// we can skip the deduplication step.
builder.WriteString("SELECT ")
builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name))
for _, col := range augmentedSchema[1:] {
Expand Down Expand Up @@ -261,7 +263,7 @@ func (c *DeltaController) updateTable(

// Insert or replace new rows (action = INSERT) into the base table.
insertSQL := "INSERT "
if appender.GetDeleteEventCount() > 0 {
if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image
insertSQL += "OR REPLACE "
}
insertSQL += "INTO " +
Expand All @@ -284,6 +286,11 @@ func (c *DeltaController) updateTable(
}).Debug("Inserted")
}

// If there are no rows to delete, we can skip the DELETE step.
if appender.counters.action.delete == 0 {
return nil
}

// Delete rows that have been deleted.
// The plan for `IN` is optimized to a SEMI JOIN,
// which is more efficient than ordinary INNER JOIN.
Expand Down
49 changes: 30 additions & 19 deletions delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package delta
import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apecloud/myduckserver/binlog"
"github.com/apecloud/myduckserver/myarrow"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
Expand All @@ -17,10 +18,13 @@ type tableIdentifier struct {
}

type DeltaAppender struct {
schema sql.Schema
appender myarrow.ArrowAppender
insertEventCount int
deleteEventCount int
schema sql.Schema
appender myarrow.ArrowAppender

counters struct {
event struct{ delete, insert, update int }
action struct{ delete, insert int }
}
}

// Create a new appender.
Expand Down Expand Up @@ -118,23 +122,30 @@ func (a *DeltaAppender) Release() {
a.appender.Release()
}

func (a *DeltaAppender) IncInsertEventCount() {
a.insertEventCount++
}

func (a *DeltaAppender) IncDeleteEventCount() {
a.deleteEventCount++
}

func (a *DeltaAppender) GetInsertEventCount() int {
return a.insertEventCount
func (a *DeltaAppender) UpdateActionStats(action binlog.RowEventType, count int) {
switch action {
case binlog.DeleteRowEvent:
a.counters.action.delete += count
case binlog.InsertRowEvent:
a.counters.action.insert += count
}
}

func (a *DeltaAppender) GetDeleteEventCount() int {
return a.deleteEventCount
func (a *DeltaAppender) ObserveEvents(event binlog.RowEventType, count int) {
switch event {
case binlog.DeleteRowEvent:
a.counters.event.delete++
case binlog.InsertRowEvent:
a.counters.event.insert++
case binlog.UpdateRowEvent:
a.counters.event.update++
}
}

func (a *DeltaAppender) ResetEventCounts() {
a.insertEventCount = 0
a.deleteEventCount = 0
func (a *DeltaAppender) ResetCounters() {
a.counters.event.delete = 0
a.counters.event.insert = 0
a.counters.event.update = 0
a.counters.action.delete = 0
a.counters.action.insert = 0
}
36 changes: 27 additions & 9 deletions pgserver/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func (r *LogicalReplicator) processMessage(
return false, nil
}

err = r.append(state, logicalMsg.RelationID, logicalMsg.Tuple.Columns, binlog.InsertRowEvent, false)
err = r.append(state, logicalMsg.RelationID, logicalMsg.Tuple.Columns, binlog.InsertRowEvent, binlog.InsertRowEvent, false)
if err != nil {
return false, err
}
Expand All @@ -741,18 +741,23 @@ func (r *LogicalReplicator) processMessage(
// Delete the old tuple
switch logicalMsg.OldTupleType {
case pglogrepl.UpdateMessageTupleTypeKey:
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, true)
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.UpdateRowEvent, true)
case pglogrepl.UpdateMessageTupleTypeOld:
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, false)
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.UpdateRowEvent, false)
default:
// No old tuple provided; it means the key columns are unchanged
// No old tuple provided; it means the key columns are unchanged.
// It's fine not to append a delete event to the delta in this case.
// However, the delta appender implements an optimization that
// uses INSERT instead of UPSERT+DELETE when there is no deletion in a batch.
// We need to enforce the use of UPSERT here because the deletion count is zero.
err = r.append(state, logicalMsg.RelationID, nil, binlog.DeleteRowEvent, binlog.UpdateRowEvent, true)
}
if err != nil {
return false, err
}

// Insert the new tuple
err = r.append(state, logicalMsg.RelationID, logicalMsg.NewTuple.Columns, binlog.InsertRowEvent, false)
err = r.append(state, logicalMsg.RelationID, logicalMsg.NewTuple.Columns, binlog.InsertRowEvent, binlog.UpdateRowEvent, false)
if err != nil {
return false, err
}
Expand All @@ -770,9 +775,9 @@ func (r *LogicalReplicator) processMessage(

switch logicalMsg.OldTupleType {
case pglogrepl.UpdateMessageTupleTypeKey:
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, true)
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.DeleteRowEvent, true)
case pglogrepl.UpdateMessageTupleTypeOld:
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, false)
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.DeleteRowEvent, false)
default:
// No old tuple provided; cannot perform delete
err = fmt.Errorf("DeleteMessage without OldTuple")
Expand Down Expand Up @@ -957,7 +962,7 @@ func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, tx *stdsql
return err
}

func (r *LogicalReplicator) append(state *replicationState, relationID uint32, tuple []*pglogrepl.TupleDataColumn, eventType binlog.RowEventType, onlyKeys bool) error {
func (r *LogicalReplicator) append(state *replicationState, relationID uint32, tuple []*pglogrepl.TupleDataColumn, actionType, eventType binlog.RowEventType, onlyKeys bool) error {
rel, ok := state.relations[relationID]
if !ok {
return fmt.Errorf("unknown relation ID %d", relationID)
Expand All @@ -967,6 +972,16 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
return err
}

if len(tuple) == 0 {
// The only case where we can have an empty tuple is when
// we're deleting+inserting a row and the key columns are unchanged.
if eventType == binlog.UpdateRowEvent && actionType == binlog.DeleteRowEvent {
appender.ObserveEvents(binlog.UpdateRowEvent, 1)
return nil
}
return fmt.Errorf("empty tuple data")
}

fields := appender.Fields()
actions := appender.Action()
txnTags := appender.TxnTag()
Expand All @@ -975,7 +990,7 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
txnSeqNumbers := appender.TxnSeqNumber()
txnStmtOrdinals := appender.TxnStmtOrdinal()

actions.Append(int8(eventType))
actions.Append(int8(actionType))
txnTags.AppendNull()
txnServers.Append([]byte(""))
txnGroups.AppendNull()
Expand Down Expand Up @@ -1012,6 +1027,9 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
}
}

appender.UpdateActionStats(actionType, 1)
appender.ObserveEvents(eventType, 1)

state.deltaBufSize += uint64(size)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pgserver/logrepl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func waitForCaughtUp(r *logrepl.LogicalReplicator) error {
}

log.Println("replication not caught up, waiting")
if time.Since(start) >= 5*time.Second {
if time.Since(start) >= 10*time.Second {
return errors.New("Replication did not catch up")
}
time.Sleep(1 * time.Second)
Expand Down

0 comments on commit 6806cbf

Please sign in to comment.