diff --git a/README.md b/README.md index afe4daee..8e85e361 100644 --- a/README.md +++ b/README.md @@ -108,8 +108,6 @@ psql -h 127.0.0.1 -p 15432 -U postgres We have integrated a setup tool in the Docker image that helps replicate data from your primary (MySQL|Postgres) server to MyDuck Server. The tool is available via the `SETUP_MODE` environment variable. In `REPLICA` mode, the container will start MyDuck Server, dump a snapshot of your primary (MySQL|Postgres) server, and start replicating data in real-time. -#### MySQL or Postgres Replica Setup - ```bash docker run \ -p 13306:3306 \ @@ -122,8 +120,10 @@ docker run \ apecloud/myduckserver:latest ``` -The only difference between postgres and mysql is the header in `SOURCE_DSN` parameter. -For MySQL, use `mysql` as the header, and for Postgres, use `postgres`. +`SOURCE_DSN` is the connection string of the primary (MySQL|Postgres) server. +Use the mysql URI scheme to connect to a MySQL primary and use the postgres URI scheme to connect to a Postgres primary. +For example, +`--env=SOURCE_DSN=mysql://root:password@example.com:3306` or `--env=SOURCE_DSN=postgres://postgres:password@example.com:5432.` ### Connecting to Cloud MySQL & Postgres diff --git a/catalog/provider.go b/catalog/provider.go index 47b73df1..618b7be0 100644 --- a/catalog/provider.go +++ b/catalog/provider.go @@ -62,6 +62,8 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) { bootQueries := []string{ "INSTALL arrow", "LOAD arrow", + "INSTALL postgres_scanner", + "LOAD postgres_scanner", } for _, q := range bootQueries { if _, err := storage.ExecContext(context.Background(), q); err != nil { diff --git a/devtools/replica-setup-mysql/checker.sh b/devtools/replica-setup-mysql/checker.sh index d71f691d..0208855a 100644 --- a/devtools/replica-setup-mysql/checker.sh +++ b/devtools/replica-setup-mysql/checker.sh @@ -117,11 +117,11 @@ check_if_myduck_has_replica() { REPLICA_STATUS=$(mysqlsh --sql --host=$MYDUCK_HOST --port=$MYDUCK_PORT --user=root --password='' -e "SHOW REPLICA STATUS\G") check_command "retrieving replica status" - EXISTED_SOURCE_HOST=$(echo "$REPLICA_STATUS" | awk '/Source_Host/ {print $2}') + SOURCE_HOST_EXISTS=$(echo "$REPLICA_STATUS" | awk '/Source_Host/ {print $2}') # Check if Source_Host is not null or empty - if [[ -n "$EXISTED_SOURCE_HOST" ]]; then - echo "Replication has already been started. Source Host: $EXISTED_SOURCE_HOST" + if [[ -n "$SOURCE_HOST_EXISTS" ]]; then + echo "Replication has already been started. Source Host: $SOURCE_HOST_EXISTS" return 1 else return 0 diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index dd40065a..f832c84f 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -1047,10 +1047,14 @@ func (h *ConnectionHandler) query(query ConvertedQuery) error { callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false) if query.SubscriptionConfig != nil { return executeCreateSubscriptionSQL(h, query.SubscriptionConfig) - } else { - if err := h.duckHandler.ComQuery(context.Background(), h.mysqlConn, query.String, query.AST, callback); err != nil { - return fmt.Errorf("fallback query execution failed: %w", err) - } + } else if err := h.duckHandler.ComQuery( + context.Background(), + h.mysqlConn, + query.String, + query.AST, + callback, + ); err != nil { + return fmt.Errorf("fallback query execution failed: %w", err) } return h.send(makeCommandComplete(query.StatementTag, rowsAffected)) diff --git a/pgserver/subscription_handler.go b/pgserver/subscription_handler.go index 591a7714..8fdb29b3 100644 --- a/pgserver/subscription_handler.go +++ b/pgserver/subscription_handler.go @@ -8,6 +8,7 @@ import ( "github.com/apecloud/myduckserver/pgserver/logrepl" "github.com/jackc/pglogrepl" "regexp" + "strings" ) // This file implements the logic for handling CREATE SUBSCRIPTION SQL statements. @@ -33,7 +34,7 @@ type SubscriptionConfig struct { var lsnQueryIndex = 5 -var subscriptionRegex = regexp.MustCompile(`CREATE SUBSCRIPTION\s+(\w+)\s+CONNECTION\s+'([^']+)'\s+PUBLICATION\s+(\w+);`) +var subscriptionRegex = regexp.MustCompile(`(?i)CREATE SUBSCRIPTION\s+(\w+)\s+CONNECTION\s+'([^']+)'\s+PUBLICATION\s+(\w+);`) var connectionRegex = regexp.MustCompile(`(\b\w+)=([\w\.\d]*)`) // ToConnectionInfo Format SubscriptionConfig into a ConnectionInfo @@ -50,10 +51,8 @@ func (config *SubscriptionConfig) ToDNS() string { func (config *SubscriptionConfig) ToDuckDBQuery() []string { return []string{ - "INSTALL postgres_scanner;", - "LOAD postgres_scanner;", fmt.Sprintf("ATTACH '%s' AS pg_postgres (TYPE POSTGRES);", config.ToConnectionInfo()), - "BEGIN;", + "BEGIN ISOLATION LEVEL REPEATABLE READ;", "COPY FROM DATABASE pg_postgres TO mysql;", "SELECT * FROM postgres_query('pg_postgres', 'SELECT pg_current_wal_lsn()');", "COMMIT;", @@ -84,7 +83,8 @@ func parseSubscriptionSQL(sql string) (*SubscriptionConfig, error) { // Map the matches to struct fields for _, match := range matches { - switch match[1] { + key := strings.ToLower(match[1]) + switch key { case "dbname": config.DBName = match[2] case "host":