Skip to content

Commit bba24ca

Browse files
to #119 feat: make CREATE SUBSCRIPTION work
1 parent 3e26aae commit bba24ca

File tree

3 files changed

+106
-50
lines changed

3 files changed

+106
-50
lines changed

pgserver/logrepl/replication.go

+95-41
Original file line numberDiff line numberDiff line change
@@ -533,76 +533,130 @@ func DropPublication(primaryDns, slotName string) error {
533533
return err
534534
}
535535

536-
// CreatePublication creates a publication with the given name if it does not already exist. Mostly useful for testing.
536+
// CreatePublicationIfNotExists creates a publication with the given name if it does not already exist. Mostly useful for testing.
537537
// Customers should run the CREATE PUBLICATION command on their primary server manually, specifying whichever tables
538538
// they want to replicate.
539-
func CreatePublication(primaryDns, slotName string) error {
540-
conn, err := pgconn.Connect(context.Background(), primaryDns)
539+
func CreatePublicationIfNotExists(primaryDns, publicationName string) error {
540+
// Connect to the primary PostgreSQL server
541+
conn, err := pgx.Connect(context.Background(), primaryDns)
541542
if err != nil {
542-
return err
543+
return fmt.Errorf("failed to connect to primary database: %w", err)
543544
}
544545
defer conn.Close(context.Background())
545546

546-
result := conn.Exec(context.Background(), fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES;", slotName))
547-
_, err = result.ReadAll()
548-
return err
547+
// Check if the publication exists
548+
query := `SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = $1)`
549+
var exists bool
550+
if err := conn.QueryRow(context.Background(), query, publicationName).Scan(&exists); err != nil {
551+
return fmt.Errorf("failed to check publication existence: %w", err)
552+
}
553+
554+
// Create the publication if it does not exist
555+
if !exists {
556+
createQuery := fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES", publicationName)
557+
if _, err := conn.Exec(context.Background(), createQuery); err != nil {
558+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "42710" {
559+
// Ignore "publication already exists" error
560+
return nil
561+
}
562+
return fmt.Errorf("failed to create publication: %w", err)
563+
}
564+
}
565+
566+
return nil
549567
}
550568

551-
// DropReplicationSlot drops the replication slot with the given name. Any error from the slot not existing is ignored.
552-
func (r *LogicalReplicator) DropReplicationSlot(slotName string) error {
569+
// DropReplicationSlotIfExists drops the replication slot with the given name. Any error from the slot not existing is ignored.
570+
func (r *LogicalReplicator) DropReplicationSlotIfExists(slotName string) error {
571+
// Check if the replication slot exists
572+
exists, err := r.replicationSlotExists(slotName)
573+
if err != nil {
574+
return fmt.Errorf("failed to check replication slot existence: %w", err)
575+
}
576+
577+
if !exists {
578+
r.logger.Infof("Replication slot '%s' does not exist.", slotName)
579+
return nil
580+
}
581+
582+
// Connect to the replication database
553583
conn, err := pgconn.Connect(context.Background(), r.ReplicationDns())
554584
if err != nil {
555-
return err
585+
return fmt.Errorf("failed to connect to replication database: %w", err)
586+
}
587+
defer conn.Close(context.Background())
588+
589+
if err := pglogrepl.DropReplicationSlot(context.Background(), conn, slotName, pglogrepl.DropReplicationSlotOptions{}); err != nil {
590+
return fmt.Errorf("failed to drop replication slot '%s': %w", slotName, err)
556591
}
557592

558-
_ = pglogrepl.DropReplicationSlot(context.Background(), conn, slotName, pglogrepl.DropReplicationSlotOptions{})
593+
r.logger.Infof("Replication slot '%s' successfully dropped.", slotName)
559594
return nil
560595
}
561596

562-
// CreateReplicationSlotIfNecessary creates the replication slot named if it doesn't already exist.
563-
func (r *LogicalReplicator) CreateReplicationSlotIfNecessary(slotName string) error {
564-
conn, err := pgx.Connect(context.Background(), r.PrimaryDns())
597+
// CreateReplicationSlotIfNotExists creates the replication slot named if it doesn't already exist.
598+
func (r *LogicalReplicator) CreateReplicationSlotIfNotExists(slotName string) error {
599+
// Check if the replication slot already exists
600+
exists, err := r.replicationSlotExists(slotName)
565601
if err != nil {
566-
return err
602+
return fmt.Errorf("error checking replication slot existence: %w", err)
567603
}
568604

569-
rows, err := conn.Query(context.Background(), "select * from pg_replication_slots where slot_name = $1", slotName)
605+
// If the slot already exists, no further action is needed
606+
if exists {
607+
r.logger.Infof("Replication slot '%s' already exists.", slotName)
608+
return nil
609+
}
610+
611+
// Create the replication slot
612+
err = r.createReplicationSlot(slotName)
570613
if err != nil {
571-
return err
614+
return fmt.Errorf("error creating replication slot '%s': %w", slotName, err)
572615
}
573616

574-
slotExists := false
575-
defer rows.Close()
576-
for rows.Next() {
577-
_, err := rows.Values()
578-
if err != nil {
579-
return err
580-
}
581-
slotExists = true
617+
r.logger.Infof("Replication slot '%s' created successfully.", slotName)
618+
return nil
619+
}
620+
621+
// Helper method to check if a replication slot exists
622+
func (r *LogicalReplicator) replicationSlotExists(slotName string) (bool, error) {
623+
conn, err := pgx.Connect(context.Background(), r.PrimaryDns())
624+
if err != nil {
625+
return false, fmt.Errorf("failed to connect to primary database: %w", err)
582626
}
627+
defer conn.Close(context.Background())
583628

584-
if rows.Err() != nil {
585-
return rows.Err()
629+
var exists bool
630+
query := `SELECT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = $1)`
631+
err = conn.QueryRow(context.Background(), query, slotName).Scan(&exists)
632+
if err != nil {
633+
return false, fmt.Errorf("error querying replication slots: %w", err)
586634
}
587635

588-
// We need a different connection to create the replication slot
589-
conn, err = pgx.Connect(context.Background(), r.ReplicationDns())
636+
return exists, nil
637+
}
638+
639+
// Helper method to create a replication slot
640+
func (r *LogicalReplicator) createReplicationSlot(slotName string) error {
641+
conn, err := pgx.Connect(context.Background(), r.ReplicationDns())
590642
if err != nil {
591-
return err
643+
return fmt.Errorf("failed to connect to replication database: %w", err)
592644
}
645+
defer conn.Close(context.Background())
593646

594-
if !slotExists {
595-
_, err = pglogrepl.CreateReplicationSlot(context.Background(), conn.PgConn(), slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{})
596-
if err != nil {
597-
pgErr, ok := err.(*pgconn.PgError)
598-
if ok && pgErr.Code == "42710" {
599-
// replication slot already exists, we can ignore this error
600-
} else {
601-
return err
602-
}
647+
_, err = pglogrepl.CreateReplicationSlot(
648+
context.Background(),
649+
conn.PgConn(),
650+
slotName,
651+
outputPlugin,
652+
pglogrepl.CreateReplicationSlotOptions{},
653+
)
654+
if err != nil {
655+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "42710" {
656+
// Replication slot already exists; ignore this error
657+
return nil
603658
}
604-
605-
r.logger.Infoln("Created replication slot:", slotName)
659+
return fmt.Errorf("error creating replication slot: %w", err)
606660
}
607661

608662
return nil

pgserver/logrepl/replication_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) {
565565
// We drop and recreate the replication slot once at the beginning of the test suite. Postgres seems to do a little
566566
// work in the background with a publication, so we need to wait a little bit before running any test scripts.
567567
require.NoError(t, logrepl.DropPublication(primaryDns, slotName))
568-
require.NoError(t, logrepl.CreatePublication(primaryDns, slotName))
568+
require.NoError(t, logrepl.CreatePublicationIfNotExists(primaryDns, slotName))
569569
time.Sleep(500 * time.Millisecond)
570570

571571
// for i, script := range scripts {
@@ -731,10 +731,10 @@ func connectionForQuery(t *testing.T, query string, connections map[string]*pgx.
731731
func handlePseudoQuery(t *testing.T, server *pgserver.Server, query string, r *logrepl.LogicalReplicator) bool {
732732
switch query {
733733
case createReplicationSlot:
734-
require.NoError(t, r.CreateReplicationSlotIfNecessary(slotName))
734+
require.NoError(t, r.CreateReplicationSlotIfNotExists(slotName))
735735
return true
736736
case dropReplicationSlot:
737-
require.NoError(t, r.DropReplicationSlot(slotName))
737+
require.NoError(t, r.DropReplicationSlotIfExists(slotName))
738738
return true
739739
case startReplication:
740740
go func() {

pgserver/subscription_handler.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,12 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription
163163
return fmt.Errorf("failed to create logical replicator: %w", err)
164164
}
165165

166-
err = replicator.CreateReplicationSlotIfNecessary(subscriptionConfig.SubscriptionName)
166+
err = logrepl.CreatePublicationIfNotExists(subscriptionConfig.ToDNS(), subscriptionConfig.PublicationName)
167+
if err != nil {
168+
return fmt.Errorf("failed to create publication: %w", err)
169+
}
170+
171+
err = replicator.CreateReplicationSlotIfNotExists(subscriptionConfig.PublicationName)
167172
if err != nil {
168173
return fmt.Errorf("failed to create replication slot: %w", err)
169174
}
@@ -173,7 +178,7 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription
173178
return fmt.Errorf("failed to create context for query: %w", err)
174179
}
175180

176-
err = replicator.WriteWALPosition(sqlCtx, subscriptionConfig.SubscriptionName, subscriptionConfig.LSN)
181+
err = replicator.WriteWALPosition(sqlCtx, subscriptionConfig.PublicationName, subscriptionConfig.LSN)
177182
if err != nil {
178183
return fmt.Errorf("failed to write WAL position: %w", err)
179184
}
@@ -183,10 +188,7 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription
183188
return fmt.Errorf("failed to create context for query: %w", err)
184189
}
185190

186-
// TODO(neo.zty): This process has error message with:
187-
// Received 'conn busy' error, waiting and retrying component=replicator protocol=pg
188-
// Deal with it.
189-
go replicator.StartReplication(sqlCtx, subscriptionConfig.SubscriptionName)
191+
go replicator.StartReplication(sqlCtx, subscriptionConfig.PublicationName)
190192

191193
return nil
192194
}

0 commit comments

Comments
 (0)