Skip to content

Commit 125c029

Browse files
author
Lyubo Kamenov
authored
Add support for reading replication slot (#171)
* Read the state of the replication slot when subscription is created. * Ensure the starting position in CDC mode is consistent with what is available in the slot. * Refactor connection management to use pgxpool and allow for acquired connections to be upgraded for replication * Add subscription teardown method which stops and waits for wind down.
1 parent 247dd1b commit 125c029

File tree

16 files changed

+357
-213
lines changed

16 files changed

+357
-213
lines changed

source.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/conduitio/conduit-commons/csync"
2424
"github.com/conduitio/conduit-connector-postgres/source"
25+
"github.com/conduitio/conduit-connector-postgres/source/cpool"
2526
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
2627
sdk "github.com/conduitio/conduit-connector-sdk"
2728
"github.com/jackc/pgx/v5"
@@ -63,7 +64,7 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
6364
}
6465

6566
func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
66-
pool, err := pgxpool.New(ctx, s.config.URL)
67+
pool, err := cpool.New(ctx, s.config.URL)
6768
if err != nil {
6869
return fmt.Errorf("failed to create a connection pool to database: %w", err)
6970
}

source/cpool/cpool.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright © 2024 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cpool
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/jackc/pgx/v5"
22+
"github.com/jackc/pgx/v5/pgxpool"
23+
)
24+
25+
type replicationCtxKey struct{}
26+
27+
func WithReplication(ctx context.Context) context.Context {
28+
return context.WithValue(ctx, replicationCtxKey{}, true)
29+
}
30+
31+
// New returns new pgxpool.Pool with added hooks.
32+
func New(ctx context.Context, conninfo string) (*pgxpool.Pool, error) {
33+
config, err := pgxpool.ParseConfig(conninfo)
34+
if err != nil {
35+
return nil, fmt.Errorf("failed to parse pool config: %w", err)
36+
}
37+
38+
config.BeforeAcquire = beforeAcquireHook
39+
config.BeforeConnect = beforeConnectHook
40+
config.AfterRelease = afterReleaseHook
41+
42+
pool, err := pgxpool.NewWithConfig(ctx, config)
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
return pool, nil
48+
}
49+
50+
// beforeAcquireHook ensures purpose specific connections are returned:
51+
// * If a replication connection is requested, ensure the connection has replication enabled.
52+
// * If a regular connection is requested, return non-replication connections.
53+
func beforeAcquireHook(ctx context.Context, conn *pgx.Conn) bool {
54+
replReq := ctx.Value(replicationCtxKey{}) != nil
55+
replOn := conn.Config().RuntimeParams["replication"] != ""
56+
57+
return replReq == replOn
58+
}
59+
60+
// beforeConnectHook customizes the configuration of the new connection.
61+
func beforeConnectHook(ctx context.Context, config *pgx.ConnConfig) error {
62+
if v := ctx.Value(replicationCtxKey{}); v != nil {
63+
config.RuntimeParams["replication"] = "database"
64+
}
65+
66+
return nil
67+
}
68+
69+
// afterReleaseHook marks all replication connections for disposal.
70+
func afterReleaseHook(conn *pgx.Conn) bool {
71+
return conn.Config().RuntimeParams["replication"] == ""
72+
}

source/logrepl/cdc.go

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,12 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21-
"time"
2221

2322
"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
2423
"github.com/conduitio/conduit-connector-postgres/source/position"
2524
sdk "github.com/conduitio/conduit-connector-sdk"
2625
"github.com/jackc/pglogrepl"
27-
"github.com/jackc/pgx/v5/pgconn"
28-
)
29-
30-
const (
31-
subscriberDoneTimeout = time.Second * 2
26+
"github.com/jackc/pgx/v5/pgxpool"
3227
)
3328

3429
// Config holds configuration values for CDCIterator.
@@ -45,21 +40,15 @@ type CDCConfig struct {
4540
type CDCIterator struct {
4641
config CDCConfig
4742
records chan sdk.Record
48-
pgconn *pgconn.PgConn
4943

5044
sub *internal.Subscription
5145
}
5246

5347
// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
54-
func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*CDCIterator, error) {
55-
conn, err := pgconn.ConnectConfig(ctx, withReplication(pgconf))
56-
if err != nil {
57-
return nil, fmt.Errorf("could not establish replication connection: %w", err)
58-
}
59-
48+
func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCIterator, error) {
6049
if err := internal.CreatePublication(
6150
ctx,
62-
conn,
51+
pool,
6352
c.PublicationName,
6453
internal.CreatePublicationOptions{Tables: c.Tables},
6554
); err != nil {
@@ -77,7 +66,7 @@ func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*C
7766

7867
sub, err := internal.CreateSubscription(
7968
ctx,
80-
conn,
69+
pool,
8170
c.SlotName,
8271
c.PublicationName,
8372
c.Tables,
@@ -91,7 +80,6 @@ func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*C
9180
return &CDCIterator{
9281
config: c,
9382
records: records,
94-
pgconn: conn,
9583
sub: sub,
9684
}, nil
9785
}
@@ -181,14 +169,11 @@ func (i *CDCIterator) Ack(_ context.Context, sdkPos sdk.Position) error {
181169
// or the context gets canceled. If the subscription stopped with an unexpected
182170
// error, the error is returned.
183171
func (i *CDCIterator) Teardown(ctx context.Context) error {
184-
defer i.pgconn.Close(ctx)
185-
186-
if !i.subscriberReady() {
187-
return nil
172+
if i.sub != nil {
173+
return i.sub.Teardown(ctx)
188174
}
189175

190-
i.sub.Stop()
191-
return i.sub.Wait(ctx, subscriberDoneTimeout)
176+
return nil
192177
}
193178

194179
// subscriberReady returns true when the subscriber is running.
@@ -207,16 +192,3 @@ func (i *CDCIterator) subscriberReady() bool {
207192
func (i *CDCIterator) TXSnapshotID() string {
208193
return i.sub.TXSnapshotID
209194
}
210-
211-
// withReplication adds the `replication` parameter to the connection config.
212-
// This will uprgade a regular command connection to accept replication commands.
213-
func withReplication(pgconf *pgconn.Config) *pgconn.Config {
214-
c := pgconf.Copy()
215-
if c.RuntimeParams == nil {
216-
c.RuntimeParams = make(map[string]string)
217-
}
218-
// enable replication on connection
219-
c.RuntimeParams["replication"] = "database"
220-
221-
return c
222-
}

source/logrepl/cdc_test.go

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/google/go-cmp/cmp"
2929
"github.com/google/go-cmp/cmp/cmpopts"
3030
"github.com/jackc/pglogrepl"
31-
"github.com/jackc/pgx/v5/pgconn"
3231
"github.com/jackc/pgx/v5/pgxpool"
3332
"github.com/matryer/is"
3433
)
@@ -40,12 +39,10 @@ func TestCDCIterator_New(t *testing.T) {
4039
tests := []struct {
4140
name string
4241
setup func(t *testing.T) CDCConfig
43-
pgconf *pgconn.Config
4442
wantErr error
4543
}{
4644
{
47-
name: "publication already exists",
48-
pgconf: &pool.Config().ConnConfig.Config,
45+
name: "publication already exists",
4946
setup: func(t *testing.T) CDCConfig {
5047
is := is.New(t)
5148
table := test.SetupTestTable(ctx, t, pool)
@@ -66,21 +63,7 @@ func TestCDCIterator_New(t *testing.T) {
6663
},
6764
},
6865
{
69-
name: "fails to connect",
70-
pgconf: func() *pgconn.Config {
71-
c := pool.Config().ConnConfig.Config
72-
c.Port = 31337
73-
74-
return &c
75-
}(),
76-
setup: func(*testing.T) CDCConfig {
77-
return CDCConfig{}
78-
},
79-
wantErr: errors.New("could not establish replication connection"),
80-
},
81-
{
82-
name: "fails to create publication",
83-
pgconf: &pool.Config().ConnConfig.Config,
66+
name: "fails to create publication",
8467
setup: func(*testing.T) CDCConfig {
8568
return CDCConfig{
8669
PublicationName: "foobar",
@@ -89,8 +72,7 @@ func TestCDCIterator_New(t *testing.T) {
8972
wantErr: errors.New("requires at least one table"),
9073
},
9174
{
92-
name: "fails to create subscription",
93-
pgconf: &pool.Config().ConnConfig.Config,
75+
name: "fails to create subscription",
9476
setup: func(t *testing.T) CDCConfig {
9577
is := is.New(t)
9678
table := test.SetupTestTable(ctx, t, pool)
@@ -118,7 +100,7 @@ func TestCDCIterator_New(t *testing.T) {
118100

119101
config := tt.setup(t)
120102

121-
_, err := NewCDCIterator(ctx, tt.pgconf, config)
103+
i, err := NewCDCIterator(ctx, pool, config)
122104
if tt.wantErr != nil {
123105
if match := strings.Contains(err.Error(), tt.wantErr.Error()); !match {
124106
t.Logf("%s != %s", err.Error(), tt.wantErr.Error())
@@ -127,6 +109,9 @@ func TestCDCIterator_New(t *testing.T) {
127109
} else {
128110
is.NoErr(err)
129111
}
112+
if i != nil {
113+
is.NoErr(i.Teardown(ctx))
114+
}
130115
})
131116
}
132117
}
@@ -276,8 +261,6 @@ func TestCDCIterator_Next_Fail(t *testing.T) {
276261
}
277262

278263
func TestCDCIterator_EnsureLSN(t *testing.T) {
279-
// t.Skip()
280-
281264
ctx := context.Background()
282265
is := is.New(t)
283266

@@ -370,13 +353,6 @@ func TestCDCIterator_Ack(t *testing.T) {
370353
}
371354
}
372355

373-
func Test_withReplication(t *testing.T) {
374-
is := is.New(t)
375-
376-
c := withReplication(&pgconn.Config{})
377-
is.Equal(c.RuntimeParams["replication"], "database")
378-
}
379-
380356
func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table string, start bool) *CDCIterator {
381357
is := is.New(t)
382358
config := CDCConfig{
@@ -386,7 +362,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
386362
SlotName: table, // table is random, reuse for slot name
387363
}
388364

389-
i, err := NewCDCIterator(ctx, &pool.Config().ConnConfig.Config, config)
365+
i, err := NewCDCIterator(ctx, pool, config)
390366
is.NoErr(err)
391367

392368
i.sub.StatusTimeout = 1 * time.Second

source/logrepl/cleaner.go

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import (
2121

2222
"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
2323
sdk "github.com/conduitio/conduit-connector-sdk"
24-
"github.com/jackc/pglogrepl"
25-
"github.com/jackc/pgx/v5/pgconn"
24+
"github.com/jackc/pgx/v5/pgxpool"
2625
)
2726

2827
type CleanupConfig struct {
@@ -36,21 +35,11 @@ type CleanupConfig struct {
3635
func Cleanup(ctx context.Context, c CleanupConfig) error {
3736
logger := sdk.Logger(ctx)
3837

39-
pgconfig, err := pgconn.ParseConfig(c.URL)
38+
pool, err := pgxpool.New(ctx, c.URL)
4039
if err != nil {
41-
return fmt.Errorf("failed to parse config URL: %w", err)
40+
return fmt.Errorf("failed to connect to database: %w", err)
4241
}
43-
44-
if pgconfig.RuntimeParams == nil {
45-
pgconfig.RuntimeParams = make(map[string]string)
46-
}
47-
pgconfig.RuntimeParams["replication"] = "database"
48-
49-
conn, err := pgconn.ConnectConfig(ctx, pgconfig)
50-
if err != nil {
51-
return fmt.Errorf("could not establish replication connection: %w", err)
52-
}
53-
defer conn.Close(ctx)
42+
defer pool.Close()
5443

5544
var errs []error
5645

@@ -61,18 +50,16 @@ func Cleanup(ctx context.Context, c CleanupConfig) error {
6150

6251
if c.SlotName != "" {
6352
// Terminate any outstanding backends which are consuming the slot before deleting it.
64-
mrr := conn.Exec(ctx, fmt.Sprintf(
65-
"SELECT pg_terminate_backend(active_pid) FROM pg_replication_slots WHERE slot_name='%s' AND active=true", c.SlotName,
66-
))
67-
if err := mrr.Close(); err != nil {
53+
if _, err := pool.Exec(
54+
ctx,
55+
"SELECT pg_terminate_backend(active_pid) FROM pg_replication_slots WHERE slot_name=$1 AND active=true", c.SlotName,
56+
); err != nil {
6857
errs = append(errs, fmt.Errorf("failed to terminate active backends on slot: %w", err))
6958
}
7059

71-
if err := pglogrepl.DropReplicationSlot(
60+
if _, err := pool.Exec(
7261
ctx,
73-
conn,
74-
c.SlotName,
75-
pglogrepl.DropReplicationSlotOptions{},
62+
"SELECT pg_drop_replication_slot($1)", c.SlotName,
7663
); err != nil {
7764
errs = append(errs, fmt.Errorf("failed to clean up replication slot %q: %w", c.SlotName, err))
7865
}
@@ -83,7 +70,7 @@ func Cleanup(ctx context.Context, c CleanupConfig) error {
8370
if c.PublicationName != "" {
8471
if err := internal.DropPublication(
8572
ctx,
86-
conn,
73+
pool,
8774
c.PublicationName,
8875
internal.DropPublicationOptions{IfExists: true},
8976
); err != nil {

source/logrepl/combined.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
171171
return fmt.Errorf("failed to parse LSN in position: %w", err)
172172
}
173173

174-
cdcIterator, err := NewCDCIterator(ctx, &c.pool.Config().ConnConfig.Config, CDCConfig{
174+
cdcIterator, err := NewCDCIterator(ctx, c.pool, CDCConfig{
175175
LSN: lsn,
176176
SlotName: c.conf.SlotName,
177177
PublicationName: c.conf.PublicationName,

source/logrepl/combined_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func TestCombinedIterator_New(t *testing.T) {
137137
}
138138

139139
func TestCombinedIterator_Next(t *testing.T) {
140-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
140+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
141141
defer cancel()
142142

143143
is := is.New(t)
@@ -220,6 +220,7 @@ func TestCombinedIterator_Next(t *testing.T) {
220220
WithSnapshot: true,
221221
})
222222
is.NoErr(err)
223+
223224
_, err = pool.Exec(ctx, fmt.Sprintf(
224225
`INSERT INTO %s (id, column1, column2, column3, column4, column5)
225226
VALUES (7, 'buzz', 10101, true, 121.9, 51)`,

0 commit comments

Comments
 (0)