Skip to content

Commit

Permalink
Merge branch 'main' into 119-zero-etl-pg
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 authored Nov 26, 2024
2 parents 750e392 + 0b03960 commit 495e86a
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions pgserver/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
state.reset(sqlCtx, slotName, lastWrittenLsn)

// Switch to the `public` schema.
if _, err := adapter.Exec(sqlCtx, "USE public"); err != nil {
if _, err := adapter.ExecCatalog(sqlCtx, "USE public"); err != nil {
return err
}
sqlCtx.SetCurrentDatabase("public")
Expand Down Expand Up @@ -297,6 +297,7 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
defer ticker.Stop()

for {
var msgReceiverExited chan struct{}
err := func() error {
// Shutdown if requested
select {
Expand Down Expand Up @@ -336,9 +337,14 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin

ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
receiveMsgChan := make(chan rcvMsg)
msgReceiverExited = make(chan struct{})
go func() {
defer close(msgReceiverExited)
rawMsg, err := primaryConn.ReceiveMessage(ctx)
receiveMsgChan <- rcvMsg{msg: rawMsg, err: err}
select {
case <-ctx.Done():
case receiveMsgChan <- rcvMsg{msg: rawMsg, err: err}:
}
}()

var msgAndErr rcvMsg
Expand Down Expand Up @@ -426,6 +432,11 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
return nil
}()

// Wait for the message receiver goroutine to finish.
if msgReceiverExited != nil {
<-msgReceiverExited
}

if err != nil {
if errors.Is(err, errShutdownRequested) {
return nil
Expand Down Expand Up @@ -873,7 +884,7 @@ func (r *LogicalReplicator) processMessage(
// readWALPosition reads the recorded WAL position from the WAL position table
func (r *LogicalReplicator) readWALPosition(ctx *sql.Context, slotName string) (pglogrepl.LSN, error) {
var lsn string
if err := adapter.QueryRow(ctx, catalog.InternalTables.PgReplicationLSN.SelectStmt(), slotName).Scan(&lsn); err != nil {
if err := adapter.QueryRowCatalog(ctx, catalog.InternalTables.PgReplicationLSN.SelectStmt(), slotName).Scan(&lsn); err != nil {
if errors.Is(err, stdsql.ErrNoRows) {
// if the LSN doesn't exist, consider this a cold start and return 0
return pglogrepl.LSN(0), nil
Expand Down

0 comments on commit 495e86a

Please sign in to comment.