Skip to content

Commit

Permalink
fix: handle "conn busy" error
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 committed Nov 20, 2024
1 parent 65bb037 commit c1f86ab
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 43 deletions.
6 changes: 6 additions & 0 deletions delta/flush_reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
TimeTickFlushReason
// QueryFlushReason means that the changes have to be flushed because some tables are queried.
QueryFlushReason
// InitFlushReason means that the changes have to be flushed because the controller is initialized.
InitFlushReason
// OnCloseFlushReason means that the changes have to be flushed because the controller is closed.
OnCloseFlushReason
)
Expand All @@ -35,6 +37,10 @@ func (r FlushReason) String() string {
return "TimeTick"
case QueryFlushReason:
return "Query"
case InitFlushReason:
return "Init"
case OnCloseFlushReason:
return "OnClose"
default:
return "Unknown"
}
Expand Down
2 changes: 1 addition & 1 deletion pgserver/logrepl/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func StartPostgresServer() (containerName string, dsn string, port int, err erro
"postgres:latest", // Use the latest Postgres image
"-c", "wal_level=logical", // Enable logical replication
"-c", "max_wal_senders=30", // Set the maximum number of WAL senders
"-c", "wal_sender_timeout=10", // Set the WAL sender timeout
// "-c", "wal_sender_timeout=10000", // Set the WAL sender timeout (in milliseconds)
)

// Execute the Docker command
Expand Down
82 changes: 49 additions & 33 deletions pgserver/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apecloud/myduckserver/adapter"
Expand Down Expand Up @@ -184,12 +183,13 @@ type replicationState struct {
keys map[uint32][]uint16 // relationID -> slice of key column indices
deltas *delta.DeltaController

deltaBufSize atomic.Uint64 // size of the delta buffer in bytes
lastCommitTime time.Time // time of last commit
ongoingBatchTxn atomic.Bool // true if we're in a batched transaction
dirtyTxn atomic.Bool // true if we have uncommitted changes
dirtyStream atomic.Bool // true if the binlog stream does not end with a commit
inTxnStmtID atomic.Uint64 // statement ID within transaction
deltaBufSize uint64 // size of the delta buffer in bytes
lastCommitTime time.Time // time of last commit
commitCount uint64 // number of commits
ongoingBatchTxn bool // true if we're in a batched transaction
dirtyTxn bool // true if we have uncommitted changes
dirtyStream bool // true if the binlog stream does not end with a commit
inTxnStmtID uint64 // statement ID within transaction
}

func (state *replicationState) reset(ctx *sql.Context, slotName string, lsn pglogrepl.LSN) {
Expand Down Expand Up @@ -354,14 +354,23 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
case <-ticker.C:
cancel()
if time.Since(state.lastCommitTime) > r.flushInterval {
return r.commitOngoingTxnIfClean(state, delta.TimeTickFlushReason)
err := r.commitOngoingTxnIfClean(state, delta.TimeTickFlushReason)
if err != nil {
return err
}
return sendStandbyStatusUpdate(state)
}
return nil
}

if msgAndErr.err != nil {
if pgconn.Timeout(msgAndErr.err) {
return nil
} else if msgAndErr.err.Error() == "conn busy" || primaryConn.IsBusy() {
// This is a transient error, so we'll just wait a bit and try again
r.logger.Warnf("Received 'conn busy' error, waiting and retrying")
time.Sleep(100 * time.Millisecond)
return nil
} else {
return handleErrWithRetry(msgAndErr.err, true)
}
Expand Down Expand Up @@ -401,13 +410,14 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
return err
}

_, err = r.processMessage(xld, state)
commit, err := r.processMessage(xld, state)
if err != nil {
// TODO: do we need more than one handler, one for each connection?
return handleErrWithRetry(err, true)
}

// return sendStandbyStatusUpdate(state)
if commit {
return sendStandbyStatusUpdate(state)
}
return nil
default:
r.logger.Debugf("Received unexpected message: %T\n", rawMsg)
Expand Down Expand Up @@ -685,13 +695,14 @@ func (r *LogicalReplicator) processMessage(
if err != nil {
return false, err
}
state.ongoingBatchTxn.Store(true)
state.ongoingBatchTxn = true
}

case *pglogrepl.CommitMessage:
r.logger.Debugf("CommitMessage: %v", logicalMsg)

state.lastCommitLSN = logicalMsg.CommitLSN
state.commitCount += 1

extend, reason := r.mayExtendBatchTxn(state)
if !extend {
Expand All @@ -700,8 +711,8 @@ func (r *LogicalReplicator) processMessage(
return false, err
}
}
state.dirtyStream.Store(false)
state.inTxnStmtID.Store(0)
state.dirtyStream = false
state.inTxnStmtID = 0

state.processMessages = false

Expand All @@ -717,9 +728,9 @@ func (r *LogicalReplicator) processMessage(
return false, err
}

state.dirtyTxn.Store(true)
state.dirtyStream.Store(true)
state.inTxnStmtID.Add(1)
state.dirtyTxn = true
state.dirtyStream = true
state.inTxnStmtID += 1

case *pglogrepl.UpdateMessageV2:
if !state.processMessages {
Expand All @@ -746,9 +757,9 @@ func (r *LogicalReplicator) processMessage(
return false, err
}

state.dirtyTxn.Store(true)
state.dirtyStream.Store(true)
state.inTxnStmtID.Add(1)
state.dirtyTxn = true
state.dirtyStream = true
state.inTxnStmtID += 1

case *pglogrepl.DeleteMessageV2:
if !state.processMessages {
Expand All @@ -771,9 +782,9 @@ func (r *LogicalReplicator) processMessage(
return false, err
}

state.dirtyTxn.Store(true)
state.dirtyStream.Store(true)
state.inTxnStmtID.Add(1)
state.dirtyTxn = true
state.dirtyStream = true
state.inTxnStmtID += 1

case *pglogrepl.TruncateMessageV2:
r.logger.Debugf("truncate for xid %d\n", logicalMsg.Xid)
Expand Down Expand Up @@ -876,20 +887,23 @@ func encodeColumnData(mi *pgtype.Map, data interface{}, dataType uint32) (string
// mayExtendBatchTxn checks if we should extend the current batch transaction
func (r *LogicalReplicator) mayExtendBatchTxn(state *replicationState) (bool, delta.FlushReason) {
extend, reason := false, delta.UnknownFlushReason
if state.ongoingBatchTxn.Load() {
if state.ongoingBatchTxn {
extend = true
switch {
case state.commitCount == 1:
// This is the first commit message we've received, we commit it immediately to avoid the keepalive flood
extend, reason = false, delta.InitFlushReason
case time.Since(state.lastCommitTime) >= 200*time.Millisecond:
extend, reason = false, delta.TimeTickFlushReason
case state.deltaBufSize.Load() >= (128 << 20): // 128MB
case state.deltaBufSize >= (128 << 20): // 128MB
extend, reason = false, delta.MemoryLimitFlushReason
}
}
return extend, reason
}

func (r *LogicalReplicator) commitOngoingTxnIfClean(state *replicationState, reason delta.FlushReason) error {
if state.dirtyTxn.Load() && !state.dirtyStream.Load() {
if state.dirtyTxn && !state.dirtyStream {
return r.commitOngoingTxn(state, reason)
}
return nil
Expand Down Expand Up @@ -922,10 +936,10 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso
}

// Reset transaction state
state.ongoingBatchTxn.Store(false)
state.dirtyTxn.Store(false)
state.dirtyStream.Store(false)
state.inTxnStmtID.Store(0)
state.ongoingBatchTxn = false
state.dirtyTxn = false
state.dirtyStream = false
state.inTxnStmtID = 0
state.lastCommitTime = time.Now()

state.lastWrittenLSN = state.lastCommitLSN
Expand All @@ -935,7 +949,9 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso

// flushDeltaBuffer flushes the accumulated changes in the delta buffer
func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, tx *stdsql.Tx, reason delta.FlushReason) error {
defer state.deltaBufSize.Store(0)
defer func() {
state.deltaBufSize = 0
}()

_, err := state.deltas.Flush(state.replicaCtx, tx, reason)
return err
Expand Down Expand Up @@ -964,7 +980,7 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
txnServers.Append([]byte(""))
txnGroups.AppendNull()
txnSeqNumbers.Append(uint64(state.currentTransactionLSN))
txnStmtOrdinals.Append(state.inTxnStmtID.Load())
txnStmtOrdinals.Append(state.inTxnStmtID)

size := 0
idx := 0
Expand Down Expand Up @@ -996,7 +1012,7 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
}
}

state.deltaBufSize.Add(uint64(size))
state.deltaBufSize += uint64(size)
return nil
}

Expand Down
17 changes: 8 additions & 9 deletions pgserver/logrepl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/dolthub/go-mysql-server/sql"
"github.com/jackc/pgx/v5"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -529,7 +528,7 @@ var replicationTests = []ReplicationTest{
}

func TestReplication(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
// logrus.SetLevel(logrus.DebugLevel)
RunReplicationScripts(t, replicationTests)
}

Expand Down Expand Up @@ -569,14 +568,14 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) {
require.NoError(t, logrepl.CreatePublication(primaryDns, slotName))
time.Sleep(500 * time.Millisecond)

for i, script := range scripts {
if i == 0 {
RunReplicationScript(t, dsn, script)
}
}
// for _, script := range scripts {
// RunReplicationScript(t, dsn, script)
// for i, script := range scripts {
// if i == 0 {
// RunReplicationScript(t, dsn, script)
// }
// }
for _, script := range scripts {
RunReplicationScript(t, dsn, script)
}
}

const slotName = "myduck_slot"
Expand Down

0 comments on commit c1f86ab

Please sign in to comment.