diff --git a/.github/workflows/pgrepl.yml b/.github/workflows/pgrepl.yml new file mode 100644 index 00000000..ee79fd17 --- /dev/null +++ b/.github/workflows/pgrepl.yml @@ -0,0 +1,123 @@ +name: PostgreSQL Replication E2E Test + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + replication-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.23' + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + go get . + pip3 install "sqlglot[rs]" pyarrow pandas + sudo apt-get update + sudo apt-get install --yes --no-install-recommends postgresql-client + + # Install DuckDB CLI and extensions + curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip + unzip duckdb_cli-linux-amd64.zip + chmod +x duckdb + sudo mv duckdb /usr/local/bin + duckdb -c 'INSTALL json from core' + + - name: Build MyDuck + run: go build -v + + - name: Start Primary PostgreSQL + run: | + docker run -d --name pg-primary \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=password \ + -e POSTGRES_DB=testdb \ + -p 15432:5432 \ + postgres:latest \ + -c wal_level=logical \ + -c max_wal_senders=30 + + # Wait for PostgreSQL to be ready + for i in {1..30}; do + if psql "postgres://postgres:password@localhost:15432/testdb" -c "SELECT 1" >/dev/null 2>&1; then + break + fi + sleep 1 + done + + - name: Configure Primary for Replication + run: | + # Create test data + psql "postgres://postgres:password@localhost:15432/testdb" <<-EOSQL + CREATE TABLE test_table (id int primary key, name text); + INSERT INTO test_table VALUES (1, 'initial data 1'), (2, 'initial data 2'); + + CREATE PUBLICATION testpub FOR TABLE test_table; + EOSQL + + - name: Start MyDuck and Test Initial Replication + run: | + # Start MyDuck + ./myduckserver & + sleep 5 + + # Create subscription + psql -h 127.0.0.1 -p 5432 -U postgres <<-EOSQL + CREATE SUBSCRIPTION testsub + CONNECTION 'dbname=testdb host=localhost port=15432 user=postgres password=password' + PUBLICATION testpub; + EOSQL + sleep 5 + + # Verify initial data + psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 1 AND name = 'initial data 1';" | grep -q 1 + psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 2 AND name = 'initial data 2';" | grep -q 1 + + - name: Test Ongoing Replication + run: | + # Insert new data in primary + psql "postgres://postgres:password@localhost:15432/testdb" \ + -c "INSERT INTO test_table VALUES (3, 'new data 3');" + sleep 2 + + # Verify replication of new data + psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 3 AND name = 'new data 3';" | grep -q 1 + + # Kill MyDuck + pkill myduckserver + sleep 2 + + - name: Test Replication Recovery + run: | + # Insert more data in primary while MyDuck is down + psql "postgres://postgres:password@localhost:15432/testdb" \ + -c "INSERT INTO test_table VALUES (4, 'offline data 4');" + + # Restart MyDuck + ./myduckserver & + sleep 5 + + # Verify replication catches up + psql -h 127.0.0.1 -p 5432 -U postgres -c "SELECT 1 FROM test_table WHERE id = 4 AND name = 'offline data 4';" | grep -q 1 + + # Kill MyDuck + pkill myduckserver + + - name: Cleanup + if: always() + run: | + pkill myduckserver || true + docker rm -f pg-primary || true \ No newline at end of file diff --git a/catalog/internal_tables.go b/catalog/internal_tables.go index 0dabd5dd..c6498ae6 100644 --- a/catalog/internal_tables.go +++ b/catalog/internal_tables.go @@ -246,6 +246,7 @@ var internalTables = []InternalTable{ InternalTables.PersistentVariable, InternalTables.BinlogPosition, InternalTables.PgReplicationLSN, + InternalTables.PgSubscription, InternalTables.GlobalStatus, InternalTables.PGStatReplication, InternalTables.PGCurrentSetting, diff --git a/main.go b/main.go index bdd071d4..6514b5c8 100644 --- a/main.go +++ b/main.go @@ -120,6 +120,10 @@ func main() { pool := backend.NewConnectionPool(provider.CatalogName(), provider.Connector(), provider.Storage()) + if _, err := pool.ExecContext(context.Background(), "PRAGMA enable_checkpoint_on_shutdown"); err != nil { + logrus.WithError(err).Fatalln("Failed to enable checkpoint on shutdown") + } + if defaultTimeZone != "" { _, err := pool.ExecContext(context.Background(), fmt.Sprintf(`SET TimeZone = '%s'`, defaultTimeZone)) if err != nil { @@ -182,7 +186,7 @@ func main() { if err != nil { logrus.WithError(err).Fatalln("Failed to create logical replicator") } - replicator.StartReplication(pgServer.NewInternalCtx(), pub) + go replicator.StartReplication(pgServer.NewInternalCtx(), pub) } go pgServer.Start() diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 0561e8e7..78959c3c 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -1047,10 +1047,12 @@ func (h *ConnectionHandler) query(query ConvertedQuery) error { query.StatementTag = tag } - callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false) if query.SubscriptionConfig != nil { - return executeCreateSubscriptionSQL(h, query.SubscriptionConfig) - } else if err := h.duckHandler.ComQuery( + return h.executeCreateSubscriptionSQL(query.SubscriptionConfig) + } + + callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false) + if err := h.duckHandler.ComQuery( context.Background(), h.mysqlConn, query.String, diff --git a/pgserver/subscription_handler.go b/pgserver/subscription_handler.go index 874350c8..4b44f13f 100644 --- a/pgserver/subscription_handler.go +++ b/pgserver/subscription_handler.go @@ -2,13 +2,14 @@ package pgserver import ( "context" - stdsql "database/sql" "fmt" "regexp" "strings" "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/catalog" "github.com/apecloud/myduckserver/pgserver/logrepl" + "github.com/dolthub/go-mysql-server/sql" "github.com/jackc/pglogrepl" ) @@ -16,7 +17,7 @@ import ( // Example usage of CREATE SUBSCRIPTION SQL: // // CREATE SUBSCRIPTION mysub -// CONNECTION 'dbname= host=127.0.0.1 port=15432 user=postgres password=root' +// CONNECTION 'dbname= host=127.0.0.1 port=5432 user=postgres password=root' // PUBLICATION mypub; // // The statement creates a subscription named 'mysub' that connects to a PostgreSQL @@ -30,11 +31,8 @@ type SubscriptionConfig struct { Port string User string Password string - LSN pglogrepl.LSN } -var lsnQueryIndex = 1 - var subscriptionRegex = regexp.MustCompile(`(?i)CREATE SUBSCRIPTION\s+(\w+)\s+CONNECTION\s+'([^']+)'\s+PUBLICATION\s+(\w+);`) var connectionRegex = regexp.MustCompile(`(\b\w+)=([\w\.\d]*)`) @@ -50,17 +48,6 @@ func (config *SubscriptionConfig) ToDNS() string { config.User, config.Password, config.Host, config.Port, config.DBName) } -func (config *SubscriptionConfig) ToDuckDBQuery() []string { - return []string{ - fmt.Sprintf("ATTACH '%s' AS pg_postgres (TYPE POSTGRES);", config.ToConnectionInfo()), - "SELECT * FROM postgres_query('pg_postgres', 'SELECT pg_current_wal_lsn()');", - "BEGIN;", - "COPY FROM DATABASE pg_postgres TO mysql;", - "COMMIT;", - "DETACH pg_postgres;", - } -} - func parseSubscriptionSQL(sql string) (*SubscriptionConfig, error) { subscriptionMatch := subscriptionRegex.FindStringSubmatch(sql) if len(subscriptionMatch) < 4 { @@ -111,13 +98,26 @@ func parseSubscriptionSQL(sql string) (*SubscriptionConfig, error) { return config, nil } -func executeCreateSubscriptionSQL(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) error { - err := doSnapshot(h, subscriptionConfig) +func (h *ConnectionHandler) executeCreateSubscriptionSQL(subscriptionConfig *SubscriptionConfig) error { + sqlCtx, err := h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, "") if err != nil { - return fmt.Errorf("failed to execute snapshot in CREATE SUBSCRIPTION: %w", err) + return fmt.Errorf("failed to create context for query: %w", err) } - replicator, err := doCreateSubscription(h, subscriptionConfig) + lsn, err := h.doSnapshot(sqlCtx, subscriptionConfig) + if err != nil { + return fmt.Errorf("failed to create snapshot for CREATE SUBSCRIPTION: %w", err) + } + + // Do a checkpoint here to merge the WAL logs + // if _, err := adapter.ExecCatalog(sqlCtx, "FORCE CHECKPOINT"); err != nil { + // return fmt.Errorf("failed to execute FORCE CHECKPOINT: %w", err) + // } + // if _, err := adapter.ExecCatalog(sqlCtx, "PRAGMA force_checkpoint;"); err != nil { + // return fmt.Errorf("failed to execute PRAGMA force_checkpoint: %w", err) + // } + + replicator, err := h.doCreateSubscription(sqlCtx, subscriptionConfig, lsn) if err != nil { return fmt.Errorf("failed to execute CREATE SUBSCRIPTION: %w", err) } @@ -127,41 +127,104 @@ func executeCreateSubscriptionSQL(h *ConnectionHandler, subscriptionConfig *Subs return nil } -func doSnapshot(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) error { +func (h *ConnectionHandler) doSnapshot(sqlCtx *sql.Context, subscriptionConfig *SubscriptionConfig) (pglogrepl.LSN, error) { + // If there is ongoing transcation, commit it + if txn := adapter.TryGetTxn(sqlCtx); txn != nil { + if err := func() error { + defer txn.Rollback() + defer adapter.CloseTxn(sqlCtx) + return txn.Commit() + }(); err != nil { + return 0, fmt.Errorf("failed to commit current transaction: %w", err) + } + } - duckDBQueries := subscriptionConfig.ToDuckDBQuery() + connInfo := subscriptionConfig.ToConnectionInfo() + attachName := fmt.Sprintf("__pg_src_%d__", sqlCtx.ID()) + if _, err := adapter.ExecCatalog(sqlCtx, fmt.Sprintf("ATTACH '%s' AS %s (TYPE POSTGRES, READ_ONLY)", connInfo, attachName)); err != nil { + return 0, fmt.Errorf("failed to attach connection: %w", err) + } - for index, duckDBQuery := range duckDBQueries { - // Create a new SQL context for the DuckDB query - sqlCtx, err := h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, duckDBQuery) - if err != nil { - return fmt.Errorf("failed to create context for query at index %d: %w", index, err) + defer func() { + if _, err := adapter.ExecCatalog(sqlCtx, fmt.Sprintf("DETACH %s", attachName)); err != nil { + h.logger.Warnf("failed to detach connection: %v", err) } + }() - // Execute the query - rows, err := adapter.Query(sqlCtx, duckDBQuery) + var currentLSN string + err := adapter.QueryRowCatalog( + sqlCtx, + fmt.Sprintf("SELECT * FROM postgres_query('%s', 'SELECT pg_current_wal_lsn()')", attachName), + ).Scan(¤tLSN) + if err != nil { + return 0, fmt.Errorf("failed to query WAL LSN: %w", err) + } + + lsn, err := pglogrepl.ParseLSN(currentLSN) + if err != nil { + return 0, fmt.Errorf("failed to parse LSN: %w", err) + } + + // COPY DATABASE is buggy - it corrupts the WAL so the server cannot be restarted. + // So we need to copy tables one by one. + // if _, err := adapter.ExecCatalogInTxn(sqlCtx, fmt.Sprintf("COPY FROM DATABASE %s TO mysql", attachName)); err != nil { + // return 0, fmt.Errorf("failed to copy from database: %w", err) + // } + + type table struct { + schema string + name string + } + var tables []table + + // Get all tables from the source database + if err := func() error { + rows, err := adapter.QueryCatalog(sqlCtx, `SELECT database, schema, name FROM (SHOW ALL TABLES) WHERE database = '`+attachName+`'`) if err != nil { - return fmt.Errorf("query execution failed at index %d: %w", index, err) + return fmt.Errorf("failed to query tables: %w", err) } - defer func() { - closeErr := rows.Close() - if closeErr != nil { - err = fmt.Errorf("failed to close rows at index %d: %w", index, closeErr) - } - }() + defer rows.Close() - // Process LSN query only for the specific index - if index == lsnQueryIndex { - if err := processLSN(rows, subscriptionConfig); err != nil { - return fmt.Errorf("failed to process LSN query at index %d: %w", index, err) + for rows.Next() { + var database, schema, tableName string + if err := rows.Scan(&database, &schema, &tableName); err != nil { + return fmt.Errorf("failed to scan table: %w", err) } + tables = append(tables, table{schema: schema, name: tableName}) } + + return nil + }(); err != nil { + return 0, err } - return nil + // Create all schemas in the target database + for _, t := range tables { + if _, err := adapter.ExecCatalogInTxn(sqlCtx, `CREATE SCHEMA IF NOT EXISTS `+catalog.QuoteIdentifierANSI(t.schema)); err != nil { + return 0, fmt.Errorf("failed to create schema: %w", err) + } + } + + txn, err := adapter.GetCatalogTxn(sqlCtx, nil) + if err != nil { + return 0, fmt.Errorf("failed to get transaction: %w", err) + } + defer txn.Rollback() + defer adapter.CloseTxn(sqlCtx) + + for _, t := range tables { + if _, err := adapter.ExecCatalogInTxn( + sqlCtx, + `CREATE TABLE `+catalog.ConnectIdentifiersANSI(t.schema, t.name)+` AS FROM `+catalog.ConnectIdentifiersANSI(attachName, t.schema, t.name), + ); err != nil { + return 0, fmt.Errorf("failed to create table: %w", err) + } + } + + return lsn, txn.Commit() } -func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) (*logrepl.LogicalReplicator, error) { +func (h *ConnectionHandler) doCreateSubscription(sqlCtx *sql.Context, subscriptionConfig *SubscriptionConfig, lsn pglogrepl.LSN) (*logrepl.LogicalReplicator, error) { replicator, err := logrepl.NewLogicalReplicator(subscriptionConfig.ToDNS()) if err != nil { return nil, fmt.Errorf("failed to create logical replicator: %w", err) @@ -177,11 +240,6 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription return nil, fmt.Errorf("failed to create replication slot: %w", err) } - sqlCtx, err := h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, "") - if err != nil { - return nil, fmt.Errorf("failed to create context for query: %w", err) - } - // `WriteWALPosition` and `WriteSubscription` execute in a transaction internally, // so we start a transaction here and commit it after writing the WAL position. tx, err := adapter.GetCatalogTxn(sqlCtx, nil) @@ -191,7 +249,7 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription defer tx.Rollback() defer adapter.CloseTxn(sqlCtx) - err = replicator.WriteWALPosition(sqlCtx, subscriptionConfig.PublicationName, subscriptionConfig.LSN) + err = replicator.WriteWALPosition(sqlCtx, subscriptionConfig.PublicationName, lsn) if err != nil { return nil, fmt.Errorf("failed to write WAL position: %w", err) } @@ -203,25 +261,3 @@ func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *Subscription return replicator, tx.Commit() } - -// processLSN scans the rows for the LSN value and updates the subscriptionConfig. -func processLSN(rows *stdsql.Rows, subscriptionConfig *SubscriptionConfig) error { - for rows.Next() { - var lsnStr string - if err := rows.Scan(&lsnStr); err != nil { - return fmt.Errorf("failed to scan LSN: %w", err) - } - lsn, err := pglogrepl.ParseLSN(lsnStr) - if err != nil { - return fmt.Errorf("failed to parse LSN: %w", err) - } - subscriptionConfig.LSN = lsn - } - - // Check for iteration errors - if err := rows.Err(); err != nil { - return fmt.Errorf("error encountered during rows iteration: %w", err) - } - - return nil -}