Skip to content

Commit

Permalink
to #246 feat: refactor the subscription.
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 committed Dec 5, 2024
1 parent 9e0b9ff commit 75a9cd7
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 207 deletions.
2 changes: 1 addition & 1 deletion binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ func convertSqlTypesValue(ctx *sql.Context, engine *gms.Engine, value sqltypes.V
default:
convertedValue, _, err = column.Type.Convert(value.ToString())

// logrus.WithField("column", column.Name).WithField("type", column.Type).Infof(
// logrus.WithField("column", column.Subscription).WithField("type", column.Type).Infof(
// "Converting value[%s %v %s] to %v %T",
// value.Type(), value.Raw(), value.ToString(), convertedValue, convertedValue,
// )
Expand Down
62 changes: 50 additions & 12 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@ func (it *InternalTable) QualifiedName() string {
return it.Schema + "." + it.Name
}

func (it *InternalTable) UpdateStmt(keyColumns []string, valueColumns []string) string {
var b strings.Builder
b.Grow(128)
b.WriteString("UPDATE ")
b.WriteString(it.QualifiedName())
b.WriteString(" SET " + valueColumns[0] + " = ?")

for _, valueColumn := range valueColumns[1:] {
b.WriteString(", ")
b.WriteString(valueColumn)
b.WriteString(" = ?")
}

b.WriteString(" WHERE " + keyColumns[0] + " = ?")
for _, keyColumn := range keyColumns[1:] {
b.WriteString(", ")
b.WriteString(keyColumn)
b.WriteString(" = ?")
}

return b.String()
}

func (it *InternalTable) UpsertStmt() string {
var b strings.Builder
b.Grow(128)
Expand Down Expand Up @@ -84,6 +107,30 @@ func (it *InternalTable) SelectStmt() string {
return b.String()
}

func (it *InternalTable) SelectColumnsStmt(valueColumns []string) string {
var b strings.Builder
b.Grow(128)
b.WriteString("SELECT ")
b.WriteString(valueColumns[0])
for _, c := range valueColumns[1:] {
b.WriteString(", ")
b.WriteString(c)
}
b.WriteString(" FROM ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" WHERE ")
b.WriteString(it.KeyColumns[0])
b.WriteString(" = ?")
for _, c := range it.KeyColumns[1:] {
b.WriteString(" AND ")
b.WriteString(c)
b.WriteString(" = ?")
}
return b.String()
}

func (it *InternalTable) SelectAllStmt() string {
var b strings.Builder
b.Grow(128)
Expand All @@ -108,7 +155,6 @@ func (it *InternalTable) CountAllStmt() string {
var InternalTables = struct {
PersistentVariable InternalTable
BinlogPosition InternalTable
PgReplicationLSN InternalTable
PgSubscription InternalTable
GlobalStatus InternalTable
// TODO(sean): This is a temporary work around for clients that query the 'pg_catalog.pg_stat_replication'.
Expand All @@ -130,19 +176,12 @@ var InternalTables = struct {
ValueColumns: []string{"position"},
DDL: "channel TEXT PRIMARY KEY, position TEXT",
},
PgReplicationLSN: InternalTable{
Schema: "__sys__",
Name: "pg_replication_lsn",
KeyColumns: []string{"slot_name"},
ValueColumns: []string{"lsn"},
DDL: "slot_name TEXT PRIMARY KEY, lsn TEXT",
},
PgSubscription: InternalTable{
Schema: "__sys__",
Name: "pg_subscription",
KeyColumns: []string{"name"},
ValueColumns: []string{"connection", "publication"},
DDL: "name TEXT PRIMARY KEY, connection TEXT, publication TEXT",
KeyColumns: []string{"subname"},
ValueColumns: []string{"subconninfo", "subpublication", "subenabled", "subskiplsn"},
DDL: "subname TEXT PRIMARY KEY, subconninfo TEXT, subpublication TEXT, subenabled BOOLEAN, subskiplsn TEXT",
},
GlobalStatus: InternalTable{
Schema: "performance_schema",
Expand Down Expand Up @@ -237,7 +276,6 @@ var InternalTables = struct {
var internalTables = []InternalTable{
InternalTables.PersistentVariable,
InternalTables.BinlogPosition,
InternalTables.PgReplicationLSN,
InternalTables.PgSubscription,
InternalTables.GlobalStatus,
InternalTables.PGStatReplication,
Expand Down
8 changes: 2 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,9 @@ func main() {
}

// Check if there is a replication subscription and start replication if there is.
subscriptions, err := logrepl.GetAllSubscriptions(pgServer.NewInternalCtx())
err = logrepl.UpdateSubscriptions(pgServer.NewInternalCtx())
if err != nil {
logrus.WithError(err).Warnln("Failed to find replication")
} else {
for _, subscription := range subscriptions {
go subscription.Replicator.StartReplication(pgServer.NewInternalCtx(), subscription.Publication)
}
logrus.WithError(err).Warnln("Failed to update subscriptions")
}

// Load the configuration for the Postgres server.
Expand Down
16 changes: 8 additions & 8 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,15 +1059,15 @@ func (h *ConnectionHandler) handledPSQLCommands(statement string) (bool, error)
statement = strings.ToLower(statement)
// Command: \l
if statement == "select d.datname as \"name\",\n pg_catalog.pg_get_userbyid(d.datdba) as \"owner\",\n pg_catalog.pg_encoding_to_char(d.encoding) as \"encoding\",\n d.datcollate as \"collate\",\n d.datctype as \"ctype\",\n d.daticulocale as \"icu locale\",\n case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as \"locale provider\",\n pg_catalog.array_to_string(d.datacl, e'\\n') as \"access privileges\"\nfrom pg_catalog.pg_database d\norder by 1;" {
query, err := h.convertQuery(`select d.datname as "Name", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`)
query, err := h.convertQuery(`select d.datname as "Subscription", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`)
if err != nil {
return false, err
}
return true, h.query(query)
}
// Command: \l on psql 16
if statement == "select\n d.datname as \"name\",\n pg_catalog.pg_get_userbyid(d.datdba) as \"owner\",\n pg_catalog.pg_encoding_to_char(d.encoding) as \"encoding\",\n case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as \"locale provider\",\n d.datcollate as \"collate\",\n d.datctype as \"ctype\",\n d.daticulocale as \"icu locale\",\n null as \"icu rules\",\n pg_catalog.array_to_string(d.datacl, e'\\n') as \"access privileges\"\nfrom pg_catalog.pg_database d\norder by 1;" {
query, err := h.convertQuery(`select d.datname as "Name", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`)
query, err := h.convertQuery(`select d.datname as "Subscription", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`)
if err != nil {
return false, err
}
Expand All @@ -1076,21 +1076,21 @@ func (h *ConnectionHandler) handledPSQLCommands(statement string) (bool, error)
// Command: \dt
if statement == "select n.nspname as \"schema\",\n c.relname as \"name\",\n case c.relkind when 'r' then 'table' when 'v' then 'view' when 'm' then 'materialized view' when 'i' then 'index' when 's' then 'sequence' when 't' then 'toast table' when 'f' then 'foreign table' when 'p' then 'partitioned table' when 'i' then 'partitioned index' end as \"type\",\n pg_catalog.pg_get_userbyid(c.relowner) as \"owner\"\nfrom pg_catalog.pg_class c\n left join pg_catalog.pg_namespace n on n.oid = c.relnamespace\n left join pg_catalog.pg_am am on am.oid = c.relam\nwhere c.relkind in ('r','p','')\n and n.nspname <> 'pg_catalog'\n and n.nspname !~ '^pg_toast'\n and n.nspname <> 'information_schema'\n and pg_catalog.pg_table_is_visible(c.oid)\norder by 1,2;" {
return true, h.query(ConvertedQuery{
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Name", 'table' AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' ORDER BY 2;`,
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Subscription", 'table' AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' ORDER BY 2;`,
StatementTag: "SELECT",
})
}
// Command: \d
if statement == "select n.nspname as \"schema\",\n c.relname as \"name\",\n case c.relkind when 'r' then 'table' when 'v' then 'view' when 'm' then 'materialized view' when 'i' then 'index' when 's' then 'sequence' when 't' then 'toast table' when 'f' then 'foreign table' when 'p' then 'partitioned table' when 'i' then 'partitioned index' end as \"type\",\n pg_catalog.pg_get_userbyid(c.relowner) as \"owner\"\nfrom pg_catalog.pg_class c\n left join pg_catalog.pg_namespace n on n.oid = c.relnamespace\n left join pg_catalog.pg_am am on am.oid = c.relam\nwhere c.relkind in ('r','p','v','m','s','f','')\n and n.nspname <> 'pg_catalog'\n and n.nspname !~ '^pg_toast'\n and n.nspname <> 'information_schema'\n and pg_catalog.pg_table_is_visible(c.oid)\norder by 1,2;" {
return true, h.query(ConvertedQuery{
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Name", IF(TABLE_TYPE = 'VIEW', 'view', 'table') AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' OR TABLE_TYPE = 'VIEW' ORDER BY 2;`,
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Subscription", IF(TABLE_TYPE = 'VIEW', 'view', 'table') AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' OR TABLE_TYPE = 'VIEW' ORDER BY 2;`,
StatementTag: "SELECT",
})
}
// Alternate \d for psql 14
if statement == "select n.nspname as \"schema\",\n c.relname as \"name\",\n case c.relkind when 'r' then 'table' when 'v' then 'view' when 'm' then 'materialized view' when 'i' then 'index' when 's' then 'sequence' when 's' then 'special' when 't' then 'toast table' when 'f' then 'foreign table' when 'p' then 'partitioned table' when 'i' then 'partitioned index' end as \"type\",\n pg_catalog.pg_get_userbyid(c.relowner) as \"owner\"\nfrom pg_catalog.pg_class c\n left join pg_catalog.pg_namespace n on n.oid = c.relnamespace\n left join pg_catalog.pg_am am on am.oid = c.relam\nwhere c.relkind in ('r','p','v','m','s','f','')\n and n.nspname <> 'pg_catalog'\n and n.nspname !~ '^pg_toast'\n and n.nspname <> 'information_schema'\n and pg_catalog.pg_table_is_visible(c.oid)\norder by 1,2;" {
return true, h.query(ConvertedQuery{
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Name", IF(TABLE_TYPE = 'VIEW', 'view', 'table') AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' OR TABLE_TYPE = 'VIEW' ORDER BY 2;`,
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Subscription", IF(TABLE_TYPE = 'VIEW', 'view', 'table') AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'BASE TABLE' OR TABLE_TYPE = 'VIEW' ORDER BY 2;`,
StatementTag: "SELECT",
})
}
Expand All @@ -1103,21 +1103,21 @@ func (h *ConnectionHandler) handledPSQLCommands(statement string) (bool, error)
// Command: \dn
if statement == "select n.nspname as \"name\",\n pg_catalog.pg_get_userbyid(n.nspowner) as \"owner\"\nfrom pg_catalog.pg_namespace n\nwhere n.nspname !~ '^pg_' and n.nspname <> 'information_schema'\norder by 1;" {
return true, h.query(ConvertedQuery{
String: `SELECT 'public' AS "Name", 'pg_database_owner' AS "Owner";`,
String: `SELECT 'public' AS "Subscription", 'pg_database_owner' AS "Owner";`,
StatementTag: "SELECT",
})
}
// Command: \df
if statement == "select n.nspname as \"schema\",\n p.proname as \"name\",\n pg_catalog.pg_get_function_result(p.oid) as \"result data type\",\n pg_catalog.pg_get_function_arguments(p.oid) as \"argument data types\",\n case p.prokind\n when 'a' then 'agg'\n when 'w' then 'window'\n when 'p' then 'proc'\n else 'func'\n end as \"type\"\nfrom pg_catalog.pg_proc p\n left join pg_catalog.pg_namespace n on n.oid = p.pronamespace\nwhere pg_catalog.pg_function_is_visible(p.oid)\n and n.nspname <> 'pg_catalog'\n and n.nspname <> 'information_schema'\norder by 1, 2, 4;" {
return true, h.query(ConvertedQuery{
String: `SELECT '' AS "Schema", '' AS "Name", '' AS "Result data type", '' AS "Argument data types", '' AS "Type" LIMIT 0;`,
String: `SELECT '' AS "Schema", '' AS "Subscription", '' AS "Result data type", '' AS "Argument data types", '' AS "Type" LIMIT 0;`,
StatementTag: "SELECT",
})
}
// Command: \dv
if statement == "select n.nspname as \"schema\",\n c.relname as \"name\",\n case c.relkind when 'r' then 'table' when 'v' then 'view' when 'm' then 'materialized view' when 'i' then 'index' when 's' then 'sequence' when 't' then 'toast table' when 'f' then 'foreign table' when 'p' then 'partitioned table' when 'i' then 'partitioned index' end as \"type\",\n pg_catalog.pg_get_userbyid(c.relowner) as \"owner\"\nfrom pg_catalog.pg_class c\n left join pg_catalog.pg_namespace n on n.oid = c.relnamespace\nwhere c.relkind in ('v','')\n and n.nspname <> 'pg_catalog'\n and n.nspname !~ '^pg_toast'\n and n.nspname <> 'information_schema'\n and pg_catalog.pg_table_is_visible(c.oid)\norder by 1,2;" {
return true, h.query(ConvertedQuery{
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Name", 'view' AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'VIEW' ORDER BY 2;`,
String: `SELECT table_schema AS "Schema", TABLE_NAME AS "Subscription", 'view' AS "Type", 'postgres' AS "Owner" FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA <> 'pg_catalog' AND TABLE_SCHEMA <> 'information_schema' AND TABLE_TYPE = 'VIEW' ORDER BY 2;`,
StatementTag: "SELECT",
})
}
Expand Down
25 changes: 2 additions & 23 deletions pgserver/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/binlog"
"github.com/apecloud/myduckserver/catalog"
"github.com/apecloud/myduckserver/delta"
"github.com/apecloud/myduckserver/pgtypes"
"github.com/dolthub/go-mysql-server/sql"
Expand Down Expand Up @@ -222,7 +221,7 @@ func (r *LogicalReplicator) StartReplication(sqlCtx *sql.Context, slotName strin
standbyMessageTimeout := 10 * time.Second
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

lastWrittenLsn, err := r.readWALPosition(sqlCtx, slotName)
lastWrittenLsn, err := SelectSubscriptionLsn(sqlCtx, slotName)
if err != nil {
return err
}
Expand Down Expand Up @@ -881,26 +880,6 @@ func (r *LogicalReplicator) processMessage(
return false, nil
}

// readWALPosition reads the recorded WAL position from the WAL position table
func (r *LogicalReplicator) readWALPosition(ctx *sql.Context, slotName string) (pglogrepl.LSN, error) {
var lsn string
if err := adapter.QueryRowCatalog(ctx, catalog.InternalTables.PgReplicationLSN.SelectStmt(), slotName).Scan(&lsn); err != nil {
if errors.Is(err, stdsql.ErrNoRows) {
// if the LSN doesn't exist, consider this a cold start and return 0
return pglogrepl.LSN(0), nil
}
return 0, err
}

return pglogrepl.ParseLSN(lsn)
}

// WriteWALPosition writes the recorded WAL position to the WAL position table
func (r *LogicalReplicator) WriteWALPosition(ctx *sql.Context, slotName string, lsn pglogrepl.LSN) error {
_, err := adapter.ExecCatalogInTxn(ctx, catalog.InternalTables.PgReplicationLSN.UpsertStmt(), slotName, lsn.String())
return err
}

// whereClause returns a WHERE clause string with the contents of the builder if it's non-empty, or the empty
// string otherwise
func whereClause(str strings.Builder) string {
Expand Down Expand Up @@ -1000,7 +979,7 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso
}

r.logger.Debugf("Writing LSN %s\n", state.lastCommitLSN)
if err = r.WriteWALPosition(state.replicaCtx, state.slotName, state.lastCommitLSN); err != nil {
if err = UpdateSubscriptionLsn(state.replicaCtx, state.slotName, state.lastCommitLSN.String()); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 75a9cd7

Please sign in to comment.