From 95af164ffa7119c0c89cc140a43eb55d1bbf3e3a Mon Sep 17 00:00:00 2001 From: TianyuZhang1214 Date: Tue, 26 Nov 2024 10:52:14 +0800 Subject: [PATCH] feat: Zero-ETL for PG (#199) --- README.md | 19 +- catalog/provider.go | 2 + devtools/replica-setup-mysql/checker.sh | 20 +- devtools/replica-setup-mysql/replica_setup.sh | 16 +- devtools/replica-setup-mysql/snapshot.sh | 2 +- .../replica-setup-mysql/start_replication.sh | 19 +- .../replica-setup-postgres/replica_setup.sh | 92 ++++++++ docker/Dockerfile | 1 + docker/entrypoint.sh | 124 +++++++--- pgserver/connection_data.go | 9 +- pgserver/connection_handler.go | 24 +- pgserver/logrepl/replication.go | 142 ++++++++---- pgserver/logrepl/replication_test.go | 8 +- pgserver/subscription_handler.go | 217 ++++++++++++++++++ 14 files changed, 566 insertions(+), 129 deletions(-) create mode 100644 devtools/replica-setup-postgres/replica_setup.sh create mode 100644 pgserver/subscription_handler.go diff --git a/README.md b/README.md index 1ff4f556..19356617 100644 --- a/README.md +++ b/README.md @@ -108,27 +108,24 @@ psql -h 127.0.0.1 -p 15432 -U postgres We have integrated a setup tool in the Docker image that helps replicate data from your primary (MySQL|Postgres) server to MyDuck Server. The tool is available via the `SETUP_MODE` environment variable. In `REPLICA` mode, the container will start MyDuck Server, dump a snapshot of your primary (MySQL|Postgres) server, and start replicating data in real-time. -#### MySQL Replica Setup - ```bash docker run \ - --network=host \ + -p 13306:3306 \ + -p 15432:5432 \ --privileged \ --workdir=/home/admin \ --env=SETUP_MODE=REPLICA \ - --env=MYSQL_HOST= \ - --env=MYSQL_PORT= \ - --env=MYSQL_USER= \ - --env=MYSQL_PASSWORD= \ + --env=SOURCE_DSN="://:@:/" --detach=true \ apecloud/myduckserver:latest ``` +`SOURCE_DSN` specifies the connection string to the primary database server, which can be either MySQL or PostgreSQL. -#### PostgreSQL Replica Setup +- **MySQL Primary:** Use the MySQL URI scheme, e.g., + `--env=SOURCE_DSN=mysql://root:password@example.com:3306` -```bash -TODO -``` +- **PostgreSQL Primary:** Use the PostgreSQL URI scheme, e.g., + `--env=SOURCE_DSN=postgres://postgres:password@example.com:5432` ### Connecting to Cloud MySQL & Postgres diff --git a/catalog/provider.go b/catalog/provider.go index 47b73df1..618b7be0 100644 --- a/catalog/provider.go +++ b/catalog/provider.go @@ -62,6 +62,8 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) { bootQueries := []string{ "INSTALL arrow", "LOAD arrow", + "INSTALL postgres_scanner", + "LOAD postgres_scanner", } for _, q := range bootQueries { if _, err := storage.ExecContext(context.Background(), q); err != nil { diff --git a/devtools/replica-setup-mysql/checker.sh b/devtools/replica-setup-mysql/checker.sh index d1cb91fc..0208855a 100644 --- a/devtools/replica-setup-mysql/checker.sh +++ b/devtools/replica-setup-mysql/checker.sh @@ -13,7 +13,7 @@ check_server_params() { echo "Checking MySQL server parameters..." # Retrieve the required MySQL server variables using mysqlsh - result=$(mysqlsh --host="$MYSQL_HOST" --port="$MYSQL_PORT" --user="$MYSQL_USER" --password="$MYSQL_PASSWORD" --sql -e " + result=$(mysqlsh --uri="$SOURCE_DSN" --sql -e " SHOW VARIABLES WHERE variable_name IN ('binlog_format', 'enforce_gtid_consistency', 'gtid_mode', 'gtid_strict_mode', 'log_bin'); ") @@ -62,10 +62,10 @@ check_server_params() { # Function to check MySQL current user privileges check_user_privileges() { - echo "Checking privileges for the current user '$MYSQL_USER'..." + echo "Checking privileges for the current user '$SOURCE_USER'..." # Check the user grants for the currently authenticated user using mysqlsh - result=$(mysqlsh --host="$MYSQL_HOST" --port="$MYSQL_PORT" --user="$MYSQL_USER" --password="$MYSQL_PASSWORD" --sql -e " + result=$(mysqlsh --host="$SOURCE_HOST" --port="$SOURCE_PORT" --user="$SOURCE_USER" --password="$SOURCE_PASSWORD" --sql -e " SHOW GRANTS FOR CURRENT_USER(); ") @@ -73,11 +73,11 @@ check_user_privileges() { # Check if the required privileges are granted or if GRANT ALL is present if echo "$result" | grep -q -E "GRANT (SELECT|RELOAD|REPLICATION CLIENT|REPLICATION SLAVE|SHOW VIEW|EVENT)"; then - echo "Current user '$MYSQL_USER' has all required privileges." + echo "Current user '$SOURCE_USER' has all required privileges." elif echo "$result" | grep -q "GRANT ALL"; then - echo "Current user '$MYSQL_USER' has 'GRANT ALL' privileges." + echo "Current user '$SOURCE_USER' has 'GRANT ALL' privileges." else - echo "Error: Current user '$MYSQL_USER' is missing some required privileges." + echo "Error: Current user '$SOURCE_USER' is missing some required privileges." return 1 fi @@ -98,7 +98,7 @@ check_mysql_config() { # Function to check if source MySQL server is empty check_if_source_mysql_is_empty() { # Run the query using mysqlsh and capture the output - OUTPUT=$(mysqlsh --uri "$MYSQL_USER:$MYSQL_PASSWORD@$MYSQL_HOST:$MYSQL_PORT" --sql -e "SHOW DATABASES;" 2>/dev/null) + OUTPUT=$(mysqlsh --uri "$SOURCE_DSN" --sql -e "SHOW DATABASES;" 2>/dev/null) check_command "retrieving database list" @@ -117,11 +117,11 @@ check_if_myduck_has_replica() { REPLICA_STATUS=$(mysqlsh --sql --host=$MYDUCK_HOST --port=$MYDUCK_PORT --user=root --password='' -e "SHOW REPLICA STATUS\G") check_command "retrieving replica status" - SOURCE_HOST=$(echo "$REPLICA_STATUS" | awk '/Source_Host/ {print $2}') + SOURCE_HOST_EXISTS=$(echo "$REPLICA_STATUS" | awk '/Source_Host/ {print $2}') # Check if Source_Host is not null or empty - if [[ -n "$SOURCE_HOST" ]]; then - echo "Replication has already been started. Source Host: $SOURCE_HOST" + if [[ -n "$SOURCE_HOST_EXISTS" ]]; then + echo "Replication has already been started. Source Host: $SOURCE_HOST_EXISTS" return 1 else return 0 diff --git a/devtools/replica-setup-mysql/replica_setup.sh b/devtools/replica-setup-mysql/replica_setup.sh index 21ede034..1dba31b8 100644 --- a/devtools/replica-setup-mysql/replica_setup.sh +++ b/devtools/replica-setup-mysql/replica_setup.sh @@ -16,19 +16,19 @@ GTID_MODE="ON" while [[ $# -gt 0 ]]; do case $1 in --mysql_host) - MYSQL_HOST="$2" + SOURCE_HOST="$2" shift 2 ;; --mysql_port) - MYSQL_PORT="$2" + SOURCE_PORT="$2" shift 2 ;; --mysql_user) - MYSQL_USER="$2" + SOURCE_USER="$2" shift 2 ;; --mysql_password) - MYSQL_PASSWORD="$2" + SOURCE_PASSWORD="$2" shift 2 ;; --myduck_host) @@ -62,14 +62,14 @@ while [[ $# -gt 0 ]]; do esac done -source checker.sh - # Check if all parameters are set -if [[ -z "$MYSQL_HOST" || -z "$MYSQL_PORT" || -z "$MYSQL_USER" ]]; then - echo "Error: Missing required MySQL connection variables: MYSQL_HOST, MYSQL_PORT, MYSQL_USER." +if [[ -z "$SOURCE_HOST" || -z "$SOURCE_PORT" || -z "$SOURCE_USER" ]]; then + echo "Error: Missing required MySQL connection variables: SOURCE_HOST, SOURCE_PORT, SOURCE_USER." usage fi +source checker.sh + # Step 1: Check if mysqlsh exists, if not, install it if ! command -v mysqlsh &> /dev/null; then echo "mysqlsh not found, attempting to install..." diff --git a/devtools/replica-setup-mysql/snapshot.sh b/devtools/replica-setup-mysql/snapshot.sh index 2e214dc6..cd738428 100644 --- a/devtools/replica-setup-mysql/snapshot.sh +++ b/devtools/replica-setup-mysql/snapshot.sh @@ -36,7 +36,7 @@ echo "Thread count set to: $THREAD_COUNT" echo "Copying data from MySQL to MyDuck..." # Run mysqlsh command and capture the output -output=$(mysqlsh --host=${MYSQL_HOST} --port=${MYSQL_PORT} --user=${MYSQL_USER} --password=${MYSQL_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true) +output=$(mysqlsh --host=${SOURCE_HOST} --port=${SOURCE_PORT} --user=${SOURCE_USER} --password=${SOURCE_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true) if [[ $GTID_MODE == "ON" ]]; then # Extract the EXECUTED_GTID_SET from this output: diff --git a/devtools/replica-setup-mysql/start_replication.sh b/devtools/replica-setup-mysql/start_replication.sh index 8e291b88..cfe650af 100644 --- a/devtools/replica-setup-mysql/start_replication.sh +++ b/devtools/replica-setup-mysql/start_replication.sh @@ -4,17 +4,18 @@ OS=$(uname -s) # if [[ $SOURCE_IS_EMPTY -eq 0 ]]; then -# EXECUTED_GTID_SET=$(mysqlsh --host="$MYSQL_HOST" --user="$MYSQL_USER" --password="$MYSQL_PASSWORD" --sql -e "SHOW BINARY LOG STATUS\G" | grep -i "Executed_Gtid_Set" | awk -F': ' '{print $2}') +# EXECUTED_GTID_SET=$(mysqlsh --host="$SOURCE_HOST" --user="$SOURCE_USER" --password="$SOURCE_PASSWORD" --sql -e "SHOW BINARY LOG STATUS\G" | grep -i "Executed_Gtid_Set" | awk -F': ' '{print $2}') # if [[ -z "$EXECUTED_GTID_SET" ]]; then # echo "Failed to get executed GTID set by statement 'SHOW BINARY LOG STATUS\G'. Trying to get it by statement 'SHOW MASTER STATUS\G'..." -# EXECUTED_GTID_SET=$(mysqlsh --host="$MYSQL_HOST" --user="$MYSQL_USER" --password="$MYSQL_PASSWORD" --sql -e "SHOW MASTER STATUS\G" | grep -i "Executed_Gtid_Set" | awk -F': ' '{print $2}') +# EXECUTED_GTID_SET=$(mysqlsh --host="$SOURCE_HOST" --user="$SOURCE_USER" --password="$SOURCE_PASSWORD" --sql -e "SHOW MASTER STATUS\G" | grep -i "Executed_Gtid_Set" | awk -F': ' '{print $2}') # fi # fi -if [[ "${MYDUCK_IN_DOCKER}" =~ "true" ]] && [[ "$OS" == "Darwin" ]] && ([[ "${MYSQL_HOST}" == "127.0.0.1" ]] || [[ "${MYSQL_HOST}" == "localhost" ]] || [[ "${MYSQL_HOST}" == "0.0.0.0" ]]); then - MYSQL_HOST_FOR_REPLICA="host.docker.internal" +if [[ "${MYDUCK_IN_DOCKER}" == "true" && "$OS" == "Darwin" && + ("${SOURCE_HOST}" == "127.0.0.1" || "${SOURCE_HOST}" == "localhost" || "${SOURCE_HOST}" == "0.0.0.0") ]]; then + SOURCE_HOST_FOR_REPLICA="host.docker.internal" else - MYSQL_HOST_FOR_REPLICA="${MYSQL_HOST}" + SOURCE_HOST_FOR_REPLICA="${SOURCE_HOST}" fi # Use the EXECUTED_GTID_SET variable from the previous steps @@ -26,10 +27,10 @@ fi # Connect to MySQL and execute the replication configuration commands REPLICATION_CMD="CHANGE REPLICATION SOURCE TO \ - SOURCE_HOST='${MYSQL_HOST_FOR_REPLICA}', \ - SOURCE_PORT=${MYSQL_PORT}, \ - SOURCE_USER='${MYSQL_USER}', \ - SOURCE_PASSWORD='${MYSQL_PASSWORD}'" + SOURCE_HOST='${SOURCE_HOST_FOR_REPLICA}', \ + SOURCE_PORT=${SOURCE_PORT}, \ + SOURCE_USER='${SOURCE_USER}', \ + SOURCE_PASSWORD='${SOURCE_PASSWORD}'" if [ $GTID_MODE == "OFF" ]; then REPLICATION_CMD="${REPLICATION_CMD}, \ diff --git a/devtools/replica-setup-postgres/replica_setup.sh b/devtools/replica-setup-postgres/replica_setup.sh new file mode 100644 index 00000000..5d48c495 --- /dev/null +++ b/devtools/replica-setup-postgres/replica_setup.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +usage() { + echo "Usage: $0 --postgres_host --postgres_port --postgres_user --postgres_password [--myduck_host ] [--myduck_port ] [--myduck_user ] [--myduck_password ] [--myduck_in_docker ]" + exit 1 +} + +MYDUCK_HOST=${MYDUCK_HOST:-127.0.0.1} +MYDUCK_PORT=${MYDUCK_PORT:-5432} +MYDUCK_USER=${MYDUCK_USER:-mysql} +MYDUCK_PASSWORD=${MYDUCK_PASSWORD:-} +MYDUCK_SERVER_ID=${MYDUCK_SERVER_ID:-2} +MYDUCK_IN_DOCKER=${MYDUCK_IN_DOCKER:-false} + +while [[ $# -gt 0 ]]; do + case $1 in + --postgres_host) + SOURCE_HOST="$2" + shift 2 + ;; + --postgres_port) + SOURCE_PORT="$2" + shift 2 + ;; + --postgres_user) + SOURCE_USER="$2" + shift 2 + ;; + --postgres_password) + SOURCE_PASSWORD="$2" + shift 2 + ;; + --myduck_host) + MYDUCK_HOST="$2" + shift 2 + ;; + --myduck_port) + MYDUCK_PORT="$2" + shift 2 + ;; + --myduck_user) + MYDUCK_USER="$2" + shift 2 + ;; + --myduck_password) + MYDUCK_PASSWORD="$2" + shift 2 + ;; + --myduck_server_id) + MYDUCK_SERVER_ID="$2" + shift 2 + ;; + --myduck_in_docker) + MYDUCK_IN_DOCKER="$2" + shift 2 + ;; + *) + echo "Unknown parameter: $1" + usage + ;; + esac +done + +# Check if all parameters are set +if [[ -z "$SOURCE_HOST" || -z "$SOURCE_PORT" || -z "$SOURCE_USER" ]]; then + echo "Error: Missing required Postgres connection variables: SOURCE_HOST, SOURCE_PORT, SOURCE_USER." + usage +fi + +# Step 1: Check Postgres configuration +echo "Checking Postgres configuration..." +# TODO(neo.zty): add check for Postgres configuration + +# Step 2: Establish replication +echo "Starting replication..." +export PUBLICATION_NAME="myduck_publication" +export SUBSCRIPTION_NAME="myduck_subscription" + +CREATE_SUBSCRIPTION_SQL="CREATE SUBSCRIPTION ${SUBSCRIPTION_NAME} \ + CONNECTION 'dbname=${SOURCE_DATABASE} host=${SOURCE_HOST} port=${SOURCE_PORT} user=${SOURCE_USER} password=${SOURCE_PASSWORD}' \ + PUBLICATION ${PUBLICATION_NAME};" + +psql -h $MYDUCK_HOST -p $MYDUCK_PORT -U $MYDUCK_USER <> "${LOG_PATH}"/server.log 2>&1 & + nohup myduckserver $LOG_LEVEL >> "${LOG_PATH}"/server.log 2>&1 & echo "$!" > "${PID_FILE}" } run_server_in_foreground() { cd "$DATA_PATH" || { echo "Error: Could not change directory to ${DATA_PATH}"; exit 1; } - myduckserver $PGSQL_PRIMARY_DSN_ARG $PGSQL_SLOT_NAME_ARG $LOG_LEVEL + myduckserver $LOG_LEVEL } wait_for_my_duck_server_ready() { @@ -93,28 +151,24 @@ check_process_alive() { # Handle the setup_mode setup() { - mkdir -p "${DATA_PATH}" - mkdir -p "${LOG_PATH}" + if [ -n "$LOG_LEVEL" ]; then + export LOG_LEVEL="-loglevel $LOG_LEVEL" + fi + parse_dsn + # Ensure required directories exist + mkdir -p "${DATA_PATH}" "${LOG_PATH}" + case "$SETUP_MODE" in "" | "SERVER") echo "Starting MyDuck Server in SERVER mode..." run_server_in_foreground ;; - - "MYSQL_REPLICA") - echo "Starting MyDuck Server and running replica setup in MySQL REPLICA mode..." - run_server_in_background - wait_for_my_duck_server_ready - run_mysql_replica_setup - ;; - - "PGSQL_REPLICA") - echo "Starting MyDuck Server and running replica setup in PGSQL REPLICA mode..." + "REPLICA") + echo "Starting MyDuck Server and running replica setup in REPLICA mode..." run_server_in_background wait_for_my_duck_server_ready - # TODO: run pgsql replica setup + run_replica_setup ;; - *) echo "Error: Invalid SETUP_MODE value. Valid options are: SERVER, REPLICA." exit 1 @@ -124,7 +178,7 @@ setup() { setup -while [[ "$SETUP_MODE" == "MYSQL_REPLICA" ]]; do +while [[ "$SETUP_MODE" == "REPLICA" ]]; do # Check if the processes have started check_process_alive "$PID_FILE" "MyDuck Server" MY_DUCK_SERVER_STATUS=$? diff --git a/pgserver/connection_data.go b/pgserver/connection_data.go index c293b417..eda6a774 100644 --- a/pgserver/connection_data.go +++ b/pgserver/connection_data.go @@ -53,10 +53,11 @@ const ( // version of the converted query, and is the recommended form to use. If AST is nil, then use the String version, // otherwise always prefer to AST. type ConvertedQuery struct { - String string - AST tree.Statement - StatementTag string - PgParsable bool + String string + AST tree.Statement + StatementTag string + PgParsable bool + SubscriptionConfig *SubscriptionConfig } // copyFromStdinState tracks the metadata for an import of data into a table using a COPY FROM STDIN statement. When diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 322d8579..f832c84f 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -1045,9 +1045,16 @@ func (h *ConnectionHandler) query(query ConvertedQuery) error { } callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false) - err := h.duckHandler.ComQuery(context.Background(), h.mysqlConn, query.String, query.AST, callback) - if err != nil { - return err + if query.SubscriptionConfig != nil { + return executeCreateSubscriptionSQL(h, query.SubscriptionConfig) + } else if err := h.duckHandler.ComQuery( + context.Background(), + h.mysqlConn, + query.String, + query.AST, + callback, + ); err != nil { + return fmt.Errorf("fallback query execution failed: %w", err) } return h.send(makeCommandComplete(query.StatementTag, rowsAffected)) @@ -1245,6 +1252,17 @@ func (h *ConnectionHandler) convertQuery(query string, modifiers ...QueryModifie } parsable := true + + // Check if the query is a subscription query, and if so, parse it as a subscription query. + subscriptionConfig, err := parseSubscriptionSQL(query) + if subscriptionConfig != nil && err == nil { + return ConvertedQuery{ + String: query, + PgParsable: true, + SubscriptionConfig: subscriptionConfig, + }, nil + } + stmts, err := parser.Parse(query) if err != nil { // DuckDB syntax is not fully compatible with PostgreSQL, so we need to handle some queries differently. diff --git a/pgserver/logrepl/replication.go b/pgserver/logrepl/replication.go index 2d18e7a5..ea4a9037 100644 --- a/pgserver/logrepl/replication.go +++ b/pgserver/logrepl/replication.go @@ -544,76 +544,130 @@ func DropPublication(primaryDns, slotName string) error { return err } -// CreatePublication creates a publication with the given name if it does not already exist. Mostly useful for testing. +// CreatePublicationIfNotExists creates a publication with the given name if it does not already exist. Mostly useful for testing. // Customers should run the CREATE PUBLICATION command on their primary server manually, specifying whichever tables // they want to replicate. -func CreatePublication(primaryDns, slotName string) error { - conn, err := pgconn.Connect(context.Background(), primaryDns) +func CreatePublicationIfNotExists(primaryDns, publicationName string) error { + // Connect to the primary PostgreSQL server + conn, err := pgx.Connect(context.Background(), primaryDns) if err != nil { - return err + return fmt.Errorf("failed to connect to primary database: %w", err) } defer conn.Close(context.Background()) - result := conn.Exec(context.Background(), fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES;", slotName)) - _, err = result.ReadAll() - return err + // Check if the publication exists + query := `SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = $1)` + var exists bool + if err := conn.QueryRow(context.Background(), query, publicationName).Scan(&exists); err != nil { + return fmt.Errorf("failed to check publication existence: %w", err) + } + + // Create the publication if it does not exist + if !exists { + createQuery := fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES", publicationName) + if _, err := conn.Exec(context.Background(), createQuery); err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "42710" { + // Ignore "publication already exists" error + return nil + } + return fmt.Errorf("failed to create publication: %w", err) + } + } + + return nil } -// DropReplicationSlot drops the replication slot with the given name. Any error from the slot not existing is ignored. -func (r *LogicalReplicator) DropReplicationSlot(slotName string) error { +// DropReplicationSlotIfExists drops the replication slot with the given name. Any error from the slot not existing is ignored. +func (r *LogicalReplicator) DropReplicationSlotIfExists(slotName string) error { + // Check if the replication slot exists + exists, err := r.replicationSlotExists(slotName) + if err != nil { + return fmt.Errorf("failed to check replication slot existence: %w", err) + } + + if !exists { + r.logger.Infof("Replication slot '%s' does not exist.", slotName) + return nil + } + + // Connect to the replication database conn, err := pgconn.Connect(context.Background(), r.ReplicationDns()) if err != nil { - return err + return fmt.Errorf("failed to connect to replication database: %w", err) + } + defer conn.Close(context.Background()) + + if err := pglogrepl.DropReplicationSlot(context.Background(), conn, slotName, pglogrepl.DropReplicationSlotOptions{}); err != nil { + return fmt.Errorf("failed to drop replication slot '%s': %w", slotName, err) } - _ = pglogrepl.DropReplicationSlot(context.Background(), conn, slotName, pglogrepl.DropReplicationSlotOptions{}) + r.logger.Infof("Replication slot '%s' successfully dropped.", slotName) return nil } -// CreateReplicationSlotIfNecessary creates the replication slot named if it doesn't already exist. -func (r *LogicalReplicator) CreateReplicationSlotIfNecessary(slotName string) error { - conn, err := pgx.Connect(context.Background(), r.PrimaryDns()) +// CreateReplicationSlotIfNotExists creates the replication slot named if it doesn't already exist. +func (r *LogicalReplicator) CreateReplicationSlotIfNotExists(slotName string) error { + // Check if the replication slot already exists + exists, err := r.replicationSlotExists(slotName) if err != nil { - return err + return fmt.Errorf("error checking replication slot existence: %w", err) } - rows, err := conn.Query(context.Background(), "select * from pg_replication_slots where slot_name = $1", slotName) + // If the slot already exists, no further action is needed + if exists { + r.logger.Infof("Replication slot '%s' already exists.", slotName) + return nil + } + + // Create the replication slot + err = r.createReplicationSlot(slotName) if err != nil { - return err + return fmt.Errorf("error creating replication slot '%s': %w", slotName, err) } - slotExists := false - defer rows.Close() - for rows.Next() { - _, err := rows.Values() - if err != nil { - return err - } - slotExists = true + r.logger.Infof("Replication slot '%s' created successfully.", slotName) + return nil +} + +// Helper method to check if a replication slot exists +func (r *LogicalReplicator) replicationSlotExists(slotName string) (bool, error) { + conn, err := pgx.Connect(context.Background(), r.PrimaryDns()) + if err != nil { + return false, fmt.Errorf("failed to connect to primary database: %w", err) } + defer conn.Close(context.Background()) - if rows.Err() != nil { - return rows.Err() + var exists bool + query := `SELECT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = $1)` + err = conn.QueryRow(context.Background(), query, slotName).Scan(&exists) + if err != nil { + return false, fmt.Errorf("error querying replication slots: %w", err) } - // We need a different connection to create the replication slot - conn, err = pgx.Connect(context.Background(), r.ReplicationDns()) + return exists, nil +} + +// Helper method to create a replication slot +func (r *LogicalReplicator) createReplicationSlot(slotName string) error { + conn, err := pgx.Connect(context.Background(), r.ReplicationDns()) if err != nil { - return err + return fmt.Errorf("failed to connect to replication database: %w", err) } + defer conn.Close(context.Background()) - if !slotExists { - _, err = pglogrepl.CreateReplicationSlot(context.Background(), conn.PgConn(), slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{}) - if err != nil { - pgErr, ok := err.(*pgconn.PgError) - if ok && pgErr.Code == "42710" { - // replication slot already exists, we can ignore this error - } else { - return err - } + _, err = pglogrepl.CreateReplicationSlot( + context.Background(), + conn.PgConn(), + slotName, + outputPlugin, + pglogrepl.CreateReplicationSlotOptions{}, + ) + if err != nil { + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "42710" { + // Replication slot already exists; ignore this error + return nil } - - r.logger.Infoln("Created replication slot:", slotName) + return fmt.Errorf("error creating replication slot: %w", err) } return nil @@ -841,8 +895,8 @@ func (r *LogicalReplicator) readWALPosition(ctx *sql.Context, slotName string) ( return pglogrepl.ParseLSN(lsn) } -// writeWALPosition writes the recorded WAL position to the WAL position table -func (r *LogicalReplicator) writeWALPosition(ctx *sql.Context, slotName string, lsn pglogrepl.LSN) error { +// WriteWALPosition writes the recorded WAL position to the WAL position table +func (r *LogicalReplicator) WriteWALPosition(ctx *sql.Context, slotName string, lsn pglogrepl.LSN) error { _, err := adapter.ExecCatalogInTxn(ctx, catalog.InternalTables.PgReplicationLSN.UpsertStmt(), slotName, lsn.String()) return err } @@ -942,7 +996,7 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso } r.logger.Debugf("Writing LSN %s\n", state.lastCommitLSN) - if err = r.writeWALPosition(state.replicaCtx, state.slotName, state.lastCommitLSN); err != nil { + if err = r.WriteWALPosition(state.replicaCtx, state.slotName, state.lastCommitLSN); err != nil { return err } diff --git a/pgserver/logrepl/replication_test.go b/pgserver/logrepl/replication_test.go index 10ff2ed7..13293eaa 100644 --- a/pgserver/logrepl/replication_test.go +++ b/pgserver/logrepl/replication_test.go @@ -559,12 +559,12 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) { require.NoError(t, err) }() - primaryDns := dsn + "?sslmode=disable&replication=database" + primaryDns := dsn + "?sslmode=disable" // We drop and recreate the replication slot once at the beginning of the test suite. Postgres seems to do a little // work in the background with a publication, so we need to wait a little bit before running any test scripts. require.NoError(t, logrepl.DropPublication(primaryDns, slotName)) - require.NoError(t, logrepl.CreatePublication(primaryDns, slotName)) + require.NoError(t, logrepl.CreatePublicationIfNotExists(primaryDns, slotName)) time.Sleep(500 * time.Millisecond) // for i, script := range scripts { @@ -730,10 +730,10 @@ func connectionForQuery(t *testing.T, query string, connections map[string]*pgx. func handlePseudoQuery(t *testing.T, server *pgserver.Server, query string, r *logrepl.LogicalReplicator) bool { switch query { case createReplicationSlot: - require.NoError(t, r.CreateReplicationSlotIfNecessary(slotName)) + require.NoError(t, r.CreateReplicationSlotIfNotExists(slotName)) return true case dropReplicationSlot: - require.NoError(t, r.DropReplicationSlot(slotName)) + require.NoError(t, r.DropReplicationSlotIfExists(slotName)) return true case startReplication: go func() { diff --git a/pgserver/subscription_handler.go b/pgserver/subscription_handler.go new file mode 100644 index 00000000..22848b31 --- /dev/null +++ b/pgserver/subscription_handler.go @@ -0,0 +1,217 @@ +package pgserver + +import ( + "context" + stdsql "database/sql" + "fmt" + "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/pgserver/logrepl" + "github.com/jackc/pglogrepl" + "regexp" + "strings" +) + +// This file implements the logic for handling CREATE SUBSCRIPTION SQL statements. +// Example usage of CREATE SUBSCRIPTION SQL: +// +// CREATE SUBSCRIPTION mysub +// CONNECTION 'dbname= host=127.0.0.1 port=15432 user=postgres password=root' +// PUBLICATION mypub; +// +// The statement creates a subscription named 'mysub' that connects to a PostgreSQL +// database and subscribes to changes published under the 'mypub' publication. + +type SubscriptionConfig struct { + SubscriptionName string + PublicationName string + DBName string + Host string + Port string + User string + Password string + LSN pglogrepl.LSN +} + +var lsnQueryIndex = 1 + +var subscriptionRegex = regexp.MustCompile(`(?i)CREATE SUBSCRIPTION\s+(\w+)\s+CONNECTION\s+'([^']+)'\s+PUBLICATION\s+(\w+);`) +var connectionRegex = regexp.MustCompile(`(\b\w+)=([\w\.\d]*)`) + +// ToConnectionInfo Format SubscriptionConfig into a ConnectionInfo +func (config *SubscriptionConfig) ToConnectionInfo() string { + return fmt.Sprintf("dbname=%s user=%s password=%s host=%s port=%s", + config.DBName, config.User, config.Password, config.Host, config.Port) +} + +// ToDNS Format SubscriptionConfig into a DNS +func (config *SubscriptionConfig) ToDNS() string { + return fmt.Sprintf("postgres://%s:%s@%s:%s/%s", + config.User, config.Password, config.Host, config.Port, config.DBName) +} + +func (config *SubscriptionConfig) ToDuckDBQuery() []string { + return []string{ + fmt.Sprintf("ATTACH '%s' AS pg_postgres (TYPE POSTGRES);", config.ToConnectionInfo()), + "SELECT * FROM postgres_query('pg_postgres', 'SELECT pg_current_wal_lsn()');", + "BEGIN;", + "COPY FROM DATABASE pg_postgres TO mysql;", + "COMMIT;", + "DETACH pg_postgres;", + } +} + +func parseSubscriptionSQL(sql string) (*SubscriptionConfig, error) { + subscriptionMatch := subscriptionRegex.FindStringSubmatch(sql) + if len(subscriptionMatch) < 4 { + return nil, fmt.Errorf("invalid CREATE SUBSCRIPTION SQL format") + } + + subscriptionName := subscriptionMatch[1] + connectionString := subscriptionMatch[2] + publicationName := subscriptionMatch[3] + + // Parse the connection string into key-value pairs + matches := connectionRegex.FindAllStringSubmatch(connectionString, -1) + if matches == nil { + return nil, fmt.Errorf("no valid key-value pairs found in connection string") + } + + // Initialize SubscriptionConfig struct + config := &SubscriptionConfig{ + SubscriptionName: subscriptionName, + PublicationName: publicationName, + } + + // Map the matches to struct fields + for _, match := range matches { + key := strings.ToLower(match[1]) + switch key { + case "dbname": + config.DBName = match[2] + case "host": + config.Host = match[2] + case "port": + config.Port = match[2] + case "user": + config.User = match[2] + case "password": + config.Password = match[2] + } + } + + // Handle default values + if config.DBName == "" { + config.DBName = "postgres" + } + if config.Port == "" { + config.Port = "5432" + } + + return config, nil +} + +func executeCreateSubscriptionSQL(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) error { + err := doSnapshot(h, subscriptionConfig) + if err != nil { + return fmt.Errorf("failed to execute snapshot in CREATE SUBSCRIPTION: %w", err) + } + + err = doCreateSubscription(h, subscriptionConfig) + if err != nil { + return fmt.Errorf("failed to execute CREATE SUBSCRIPTION: %w", err) + } + + return nil +} + +func doSnapshot(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) error { + + duckDBQueries := subscriptionConfig.ToDuckDBQuery() + + for index, duckDBQuery := range duckDBQueries { + // Create a new SQL context for the DuckDB query + sqlCtx, err := h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, duckDBQuery) + if err != nil { + return fmt.Errorf("failed to create context for query at index %d: %w", index, err) + } + + // Execute the query + rows, err := adapter.Query(sqlCtx, duckDBQuery) + if err != nil { + return fmt.Errorf("query execution failed at index %d: %w", index, err) + } + defer func() { + closeErr := rows.Close() + if closeErr != nil { + err = fmt.Errorf("failed to close rows at index %d: %w", index, closeErr) + } + }() + + // Process LSN query only for the specific index + if index == lsnQueryIndex { + if err := processLSN(rows, subscriptionConfig); err != nil { + return fmt.Errorf("failed to process LSN query at index %d: %w", index, err) + } + } + } + + return nil +} + +func doCreateSubscription(h *ConnectionHandler, subscriptionConfig *SubscriptionConfig) error { + replicator, err := logrepl.NewLogicalReplicator(subscriptionConfig.ToDNS()) + if err != nil { + return fmt.Errorf("failed to create logical replicator: %w", err) + } + + err = logrepl.CreatePublicationIfNotExists(subscriptionConfig.ToDNS(), subscriptionConfig.PublicationName) + if err != nil { + return fmt.Errorf("failed to create publication: %w", err) + } + + err = replicator.CreateReplicationSlotIfNotExists(subscriptionConfig.PublicationName) + if err != nil { + return fmt.Errorf("failed to create replication slot: %w", err) + } + + sqlCtx, err := h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, "") + if err != nil { + return fmt.Errorf("failed to create context for query: %w", err) + } + + err = replicator.WriteWALPosition(sqlCtx, subscriptionConfig.PublicationName, subscriptionConfig.LSN) + if err != nil { + return fmt.Errorf("failed to write WAL position: %w", err) + } + + sqlCtx, err = h.duckHandler.sm.NewContextWithQuery(context.Background(), h.mysqlConn, "") + if err != nil { + return fmt.Errorf("failed to create context for query: %w", err) + } + + go replicator.StartReplication(sqlCtx, subscriptionConfig.PublicationName) + + return nil +} + +// processLSN scans the rows for the LSN value and updates the subscriptionConfig. +func processLSN(rows *stdsql.Rows, subscriptionConfig *SubscriptionConfig) error { + for rows.Next() { + var lsnStr string + if err := rows.Scan(&lsnStr); err != nil { + return fmt.Errorf("failed to scan LSN: %w", err) + } + lsn, err := pglogrepl.ParseLSN(lsnStr) + if err != nil { + return fmt.Errorf("failed to parse LSN: %w", err) + } + subscriptionConfig.LSN = lsn + } + + // Check for iteration errors + if err := rows.Err(); err != nil { + return fmt.Errorf("error encountered during rows iteration: %w", err) + } + + return nil +}