Skip to content

Commit

Permalink
fix: data replicated from PG becomes invisible after restarting MyDuc…
Browse files Browse the repository at this point in the history
…k Server (#230)

* Add tests

---------

Co-authored-by: Fan Yang <[email protected]>
  • Loading branch information
TianyuZhang1214 and fanyang01 authored Nov 28, 2024
1 parent 0cd1d5b commit e536c63
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 74 deletions.
123 changes: 123 additions & 0 deletions .github/workflows/pgrepl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
name: PostgreSQL Replication E2E Test

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
replication-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Install dependencies
run: |
go get .
pip3 install "sqlglot[rs]" pyarrow pandas
sudo apt-get update
sudo apt-get install --yes --no-install-recommends postgresql-client
# Install DuckDB CLI and extensions
curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
chmod +x duckdb
sudo mv duckdb /usr/local/bin
duckdb -c 'INSTALL json from core'
- name: Build MyDuck
run: go build -v

- name: Start Primary PostgreSQL
run: |
docker run -d --name pg-primary \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=testdb \
-p 15432:5432 \
postgres:latest \
-c wal_level=logical \
-c max_wal_senders=30
# Wait for PostgreSQL to be ready
for i in {1..30}; do
if psql "postgres://postgres:password@localhost:15432/testdb" -c "SELECT 1" >/dev/null 2>&1; then
break
fi
sleep 1
done
- name: Configure Primary for Replication
run: |
# Create test data
psql "postgres://postgres:password@localhost:15432/testdb" <<-EOSQL
CREATE TABLE test_table (id int primary key, name text);
INSERT INTO test_table VALUES (1, 'initial data 1'), (2, 'initial data 2');
CREATE PUBLICATION testpub FOR TABLE test_table;
EOSQL
- name: Start MyDuck and Test Initial Replication
run: |
# Start MyDuck
./myduckserver &
sleep 5
# Create subscription
psql -h 127.0.0.1 -p 5432 -U postgres <<-EOSQL
CREATE SUBSCRIPTION testsub
CONNECTION 'dbname=testdb host=localhost port=15432 user=postgres password=password'
PUBLICATION testpub;
EOSQL
sleep 5
# Verify initial data
psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 1 AND name = 'initial data 1';" | grep -q 1
psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 2 AND name = 'initial data 2';" | grep -q 1
- name: Test Ongoing Replication
run: |
# Insert new data in primary
psql "postgres://postgres:password@localhost:15432/testdb" \
-c "INSERT INTO test_table VALUES (3, 'new data 3');"
sleep 2
# Verify replication of new data
psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 3 AND name = 'new data 3';" | grep -q 1
# Kill MyDuck
pkill myduckserver
sleep 2
- name: Test Replication Recovery
run: |
# Insert more data in primary while MyDuck is down
psql "postgres://postgres:password@localhost:15432/testdb" \
-c "INSERT INTO test_table VALUES (4, 'offline data 4');"
# Restart MyDuck
./myduckserver &
sleep 5
# Verify replication catches up
psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 4 AND name = 'offline data 4';" | grep -q 1
# Kill MyDuck
pkill myduckserver
- name: Cleanup
if: always()
run: |
pkill myduckserver || true
docker rm -f pg-primary || true
1 change: 1 addition & 0 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ var internalTables = []InternalTable{
InternalTables.PersistentVariable,
InternalTables.BinlogPosition,
InternalTables.PgReplicationLSN,
InternalTables.PgSubscription,
InternalTables.GlobalStatus,
InternalTables.PGStatReplication,
InternalTables.PGCurrentSetting,
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func main() {

pool := backend.NewConnectionPool(provider.CatalogName(), provider.Connector(), provider.Storage())

if _, err := pool.ExecContext(context.Background(), "PRAGMA enable_checkpoint_on_shutdown"); err != nil {
logrus.WithError(err).Fatalln("Failed to enable checkpoint on shutdown")
}

if defaultTimeZone != "" {
_, err := pool.ExecContext(context.Background(), fmt.Sprintf(`SET TimeZone = '%s'`, defaultTimeZone))
if err != nil {
Expand Down Expand Up @@ -182,7 +186,7 @@ func main() {
if err != nil {
logrus.WithError(err).Fatalln("Failed to create logical replicator")
}
replicator.StartReplication(pgServer.NewInternalCtx(), pub)
go replicator.StartReplication(pgServer.NewInternalCtx(), pub)
}

go pgServer.Start()
Expand Down
8 changes: 5 additions & 3 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,10 +1047,12 @@ func (h *ConnectionHandler) query(query ConvertedQuery) error {
query.StatementTag = tag
}

callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false)
if query.SubscriptionConfig != nil {
return executeCreateSubscriptionSQL(h, query.SubscriptionConfig)
} else if err := h.duckHandler.ComQuery(
return h.executeCreateSubscriptionSQL(query.SubscriptionConfig)
}

callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false)
if err := h.duckHandler.ComQuery(
context.Background(),
h.mysqlConn,
query.String,
Expand Down
Loading

0 comments on commit e536c63

Please sign in to comment.