Skip to content

Commit

Permalink
Make mysqlsh copy-instance/load-dump work
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 committed Oct 16, 2024
1 parent 0f0271a commit 34a4221
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 15 deletions.
4 changes: 3 additions & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ func IsPureDataQuery(n sql.Node) bool {
hasDataTable := false
for _, tn := range c.tables {
switch tn.Database().Name() {
case "mysql", "information_schema", "performance_schema", "sys":
case "mysql", "information_schema", "sys":
return false
case "performance_schema":
// performance_schema is materialized in DuckDB, so it's fine to query it.
}
switch tn.UnderlyingTable().(type) {
case *catalog.Table, *catalog.IndexedTable:
Expand Down
14 changes: 14 additions & 0 deletions backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package backend
import (
"context"
"fmt"
"strings"

"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/vitess/go/mysql"
Expand All @@ -40,6 +41,19 @@ func (h *MyHandler) ComInitDB(c *mysql.Conn, schemaName string) error {
return h.Handler.ComInitDB(c, schemaName)
}

// Naive query rewriting. This is just a temporary solution
// and should be replaced with a more robust implementation.
func (h *MyHandler) ComQuery(
ctx context.Context,
c *mysql.Conn,
query string,
callback mysql.ResultSpoolFn,
) error {
// https://github.com/dolthub/dolt/issues/8455
query = strings.ReplaceAll(query, "CHARACTER SET 'utf8mb4'", "CHARACTER SET utf8mb4")
return h.Handler.ComQuery(ctx, c, query, callback)
}

func WrapHandler(pool *ConnectionPool) server.HandlerWrapper {
return func(h mysql.Handler) (mysql.Handler, error) {
handler, ok := h.(*server.Handler)
Expand Down
36 changes: 30 additions & 6 deletions catalog/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,28 @@ import (
)

type rowInserter struct {
db string
table string
schema sql.Schema
db string
table string
schema sql.Schema
replace bool

once sync.Once
conn *stdsql.Conn
tmpTable string
stmt *stdsql.Stmt
err error
flushSQL string
}

var _ sql.RowInserter = &rowInserter{}
var _ sql.RowReplacer = &rowInserter{}

// Delete implements sql.RowReplacer.
// Since REPLACE is handled in the same way as INSERT,
// we don't need to implement it and it should never be called.
func (ri *rowInserter) Delete(ctx *sql.Context, row sql.Row) error {
return errors.New("unexpected call to Delete")
}

func (ri *rowInserter) init(ctx *sql.Context) {
ri.tmpTable = fmt.Sprintf("%s_%s_%d", ri.db, ri.table, ctx.ID())
Expand All @@ -43,7 +53,8 @@ func (ri *rowInserter) init(ctx *sql.Context) {

// TODO(fan): Appender is faster, but it requires strict type alignment.
var insert strings.Builder
insert.WriteString("INSERT INTO ")
insert.Grow(64)
insert.WriteString("INSERT INTO ") // the temp table is keyless, so REPLACE is not needed
insert.WriteString(QuoteIdentifierANSI(ri.tmpTable))
insert.WriteString(" VALUES (")
insert.WriteByte('?')
Expand All @@ -52,6 +63,20 @@ func (ri *rowInserter) init(ctx *sql.Context) {
}
insert.WriteByte(')')
ri.stmt, ri.err = ri.conn.PrepareContext(ctx, insert.String())
if ri.err != nil {
return
}

insert.Reset()
insert.WriteString("INSERT ")
if ri.replace {
insert.WriteString(" OR REPLACE")
}
insert.WriteString(" INTO ")
insert.WriteString(ConnectIdentifiersANSI(ri.db, ri.table))
insert.WriteString(" SELECT * FROM ")
insert.WriteString(QuoteIdentifierANSI(ri.tmpTable))
ri.flushSQL = insert.String()
}

func (ri *rowInserter) StatementBegin(ctx *sql.Context) {
Expand All @@ -71,8 +96,7 @@ func (ri *rowInserter) StatementComplete(ctx *sql.Context) error {
func (ri *rowInserter) Close(ctx *sql.Context) error {
defer ri.clear(ctx)
if ri.err == nil {
sql := fmt.Sprintf("INSERT INTO %s SELECT * FROM temp.main.%s", ConnectIdentifiersANSI(ri.db, ri.table), QuoteIdentifierANSI(ri.tmpTable))
_, ri.err = ri.conn.ExecContext(ctx, sql)
_, ri.err = ri.conn.ExecContext(ctx, ri.flushSQL)
}
return ri.err
}
Expand Down
30 changes: 26 additions & 4 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ package catalog
import "strings"

type InternalTable struct {
Schema string
Name string
KeyColumns []string
ValueColumns []string
DDL string
InitialData [][]any
}

func (it *InternalTable) QualifiedName() string {
return "main." + it.Name
return it.Schema + "." + it.Name
}

func (it *InternalTable) UpsertStmt() string {
var b strings.Builder
b.Grow(128)
b.WriteString("INSERT OR REPLACE INTO main.")
b.WriteString("INSERT OR REPLACE INTO ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" VALUES (?")
for range it.KeyColumns[1:] {
Expand All @@ -32,7 +36,9 @@ func (it *InternalTable) UpsertStmt() string {
func (it *InternalTable) DeleteStmt() string {
var b strings.Builder
b.Grow(128)
b.WriteString("DELETE FROM main.")
b.WriteString("DELETE FROM ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" WHERE ")
b.WriteString(it.KeyColumns[0])
Expand All @@ -53,7 +59,9 @@ func (it *InternalTable) SelectStmt() string {
b.WriteString(", ")
b.WriteString(c)
}
b.WriteString(" FROM main.")
b.WriteString(" FROM ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" WHERE ")
b.WriteString(it.KeyColumns[0])
Expand All @@ -69,22 +77,36 @@ func (it *InternalTable) SelectStmt() string {
var InternalTables = struct {
PersistentVariable InternalTable
BinlogPosition InternalTable
GlobalStatus InternalTable
}{
PersistentVariable: InternalTable{
Schema: "main",
Name: "persistent_variable",
KeyColumns: []string{"name"},
ValueColumns: []string{"value", "vtype"},
DDL: "name TEXT PRIMARY KEY, value TEXT, vtype TEXT",
},
BinlogPosition: InternalTable{
Schema: "main",
Name: "binlog_position",
KeyColumns: []string{"channel"},
ValueColumns: []string{"position"},
DDL: "channel TEXT PRIMARY KEY, position TEXT",
},
GlobalStatus: InternalTable{
Schema: "performance_schema",
Name: "global_status",
KeyColumns: []string{"VARIABLE_NAME"},
ValueColumns: []string{"VARIABLE_VALUE"},
DDL: "VARIABLE_NAME TEXT PRIMARY KEY, VARIABLE_VALUE TEXT",
InitialData: [][]any{
{"Innodb_redo_log_enabled", "OFF"},
},
},
}

var internalTables = []InternalTable{
InternalTables.PersistentVariable,
InternalTables.BinlogPosition,
InternalTables.GlobalStatus,
}
15 changes: 15 additions & 0 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,27 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) {
}

for _, t := range internalTables {
if _, err := storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+t.Schema,
); err != nil {
return nil, fmt.Errorf("failed to create internal schema %q: %w", t.Schema, err)
}
if _, err := storage.ExecContext(
context.Background(),
"CREATE TABLE IF NOT EXISTS "+t.QualifiedName()+"("+t.DDL+")",
); err != nil {
return nil, fmt.Errorf("failed to create internal table %q: %w", t.Name, err)
}
for _, row := range t.InitialData {
if _, err := storage.ExecContext(
context.Background(),
t.UpsertStmt(),
row...,
); err != nil {
return nil, fmt.Errorf("failed to insert initial data into internal table %q: %w", t.Name, err)
}
}
}

return &DatabaseProvider{
Expand Down
12 changes: 12 additions & 0 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var _ sql.IndexAddressableTable = (*Table)(nil)
var _ sql.InsertableTable = (*Table)(nil)
var _ sql.UpdatableTable = (*Table)(nil)
var _ sql.DeletableTable = (*Table)(nil)
var _ sql.ReplaceableTable = (*Table)(nil)
var _ sql.CommentedTable = (*Table)(nil)

func NewTable(name string, db *Database) *Table {
Expand Down Expand Up @@ -315,6 +316,17 @@ func (t *Table) Deleter(*sql.Context) sql.RowDeleter {
return nil
}

// Replacer implements sql.ReplaceableTable.
func (t *Table) Replacer(*sql.Context) sql.RowReplacer {
hasKey := len(t.schema.PkOrdinals) > 0 || !sql.IsKeyless(t.schema.Schema)
return &rowInserter{
db: t.db.Name(),
table: t.name,
schema: t.schema.Schema,
replace: hasKey,
}
}

// CreateIndex implements sql.IndexAlterableTable.
func (t *Table) CreateIndex(ctx *sql.Context, indexDef sql.IndexDef) error {
// Lock the table to ensure thread-safety during index creation
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/Shopify/toxiproxy/v2 v2.9.0
github.com/apache/arrow/go/v17 v17.0.0
github.com/cockroachdb/apd/v3 v3.2.1
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc
github.com/go-sql-driver/mysql v1.8.1
github.com/jmoiron/sqlx v1.4.0
github.com/marcboeker/go-duckdb v1.8.2-0.20241002112231-62d5fa8c0697
Expand All @@ -22,8 +22,8 @@ require (
)

replace (
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196 => github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772 => github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8 => github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc => github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a
)

require (
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/apecloud/dolt-vitess v0.0.0-20240919225659-2ad81685e772 h1:OgHbYQJXAE
github.com/apecloud/dolt-vitess v0.0.0-20240919225659-2ad81685e772/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da h1:+sOwYwbN/kZZd0Ggsz+ozKa6gdAUYz/bgVMJkoDmuMc=
github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a h1:ARRxRvo7HUKFvqKhXh+nRVEV5K+AuyVP6QlntDzQPww=
github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
Expand Down Expand Up @@ -75,12 +77,16 @@ github.com/dolthub/go-mysql-server v0.18.2-0.20240923181307-5aacdb13e45a h1:rpCm
github.com/dolthub/go-mysql-server v0.18.2-0.20240923181307-5aacdb13e45a/go.mod h1:lGbU2bK+QNnlETdUjOOaE+UnlEUu31VaQOFKAFGyZN4=
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196 h1:H4bKFiOdjmhBrdjrNvYAuhfplpHM3aVFcbLXlGoD/Fc=
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196/go.mod h1:lGbU2bK+QNnlETdUjOOaE+UnlEUu31VaQOFKAFGyZN4=
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8 h1:opC/9GtHMpPf5v0eRdngp166LcJTTyQ+YZfyjAchHaY=
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8/go.mod h1:Z8tket+3sYcU3d4yW90Ggld2d+C2DUgnpB8cBP0+GvI=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71/go.mod h1:2/2zjLQ/JOOSbbSboojeg+cAwcRV0fDLzIiWch/lhqI=
github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216 h1:JWkKRE4EHUcEVQCMRBej8DYxjYjRz/9MdF/NNQh0o70=
github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216/go.mod h1:e/FIZVvT2IR53HBCAo41NjqgtEnjMJGKca3Y/dAmZaA=
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772 h1:vDwBX7Lc8DnA8Zk0iRIu6slCw0GIUfYfFlYDYJQw8GQ=
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc h1:ZZgTRuxEwd3X67njtK30buHeZScLAd4W0rbRV8CORhE=
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
Expand All @@ -91,6 +97,10 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7 h1:QSqlxTk6pjF/11KV2JsBNGLmIlMccdzpUkJmNp1lsTs=
github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7/go.mod h1:GYFkohqx2Nr8NNjcEwPV1XsALRl3/l0eKhMTVqUiPmM=
github.com/fanyang01/go-mysql-server v0.0.0-20241016030019-a1d92d867df4 h1:l+EazqTiTuD54N0lXpadkv7LoSxSNJQw/pLK82OKmIU=
github.com/fanyang01/go-mysql-server v0.0.0-20241016030019-a1d92d867df4/go.mod h1:muA0iXUB7NjOojVRfJBKE2dL4OAABq3ZSbISlTGvmQY=
github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb h1:FjAhczeEu2P/F2+YbKjR/Q4lgnD/0nCWPmaKHul797o=
github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb/go.mod h1:muA0iXUB7NjOojVRfJBKE2dL4OAABq3ZSbISlTGvmQY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/catalog"
"github.com/apecloud/myduckserver/myfunc"
"github.com/apecloud/myduckserver/plugin"
"github.com/apecloud/myduckserver/replica"
"github.com/apecloud/myduckserver/transpiler"
sqle "github.com/dolthub/go-mysql-server"
Expand Down Expand Up @@ -98,6 +99,7 @@ func main() {
builder := backend.NewDuckBuilder(engine.Analyzer.ExecBuilder, pool)
engine.Analyzer.ExecBuilder = builder
engine.Analyzer.Catalog.RegisterFunction(sql.NewContext(context.Background()), myfunc.ExtraBuiltIns...)
engine.Analyzer.Catalog.MySQLDb.SetPlugins(plugin.AuthPlugins)

if err := setPersister(provider, engine); err != nil {
logrus.Fatalln("Failed to set the persister:", err)
Expand Down
16 changes: 16 additions & 0 deletions plugin/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package plugin

import "github.com/dolthub/go-mysql-server/sql/mysql_db"

var AuthPlugins = map[string]mysql_db.PlaintextAuthPlugin{
"caching_sha2_password": &NoopPlaintextPlugin{},
}

// NoopPlaintextPlugin is used to authenticate plaintext user plugins
type NoopPlaintextPlugin struct{}

var _ mysql_db.PlaintextAuthPlugin = &NoopPlaintextPlugin{}

func (p *NoopPlaintextPlugin) Authenticate(db *mysql_db.MySQLDb, user string, userEntry *mysql_db.User, pass string) (bool, error) {
return true, nil
}

0 comments on commit 34a4221

Please sign in to comment.