Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mysqlsh -- load-dump/copy-instance #96

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ func IsPureDataQuery(n sql.Node) bool {
hasDataTable := false
for _, tn := range c.tables {
switch tn.Database().Name() {
case "mysql", "information_schema", "performance_schema", "sys":
case "mysql", "information_schema", "sys":
return false
case "performance_schema":
// performance_schema is materialized in DuckDB, so it's fine to query it.
}
switch tn.UnderlyingTable().(type) {
case *catalog.Table, *catalog.IndexedTable:
Expand Down
14 changes: 14 additions & 0 deletions backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package backend
import (
"context"
"fmt"
"strings"

"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/vitess/go/mysql"
Expand All @@ -40,6 +41,19 @@ func (h *MyHandler) ComInitDB(c *mysql.Conn, schemaName string) error {
return h.Handler.ComInitDB(c, schemaName)
}

// Naive query rewriting. This is just a temporary solution
// and should be replaced with a more robust implementation.
func (h *MyHandler) ComQuery(
ctx context.Context,
c *mysql.Conn,
query string,
callback mysql.ResultSpoolFn,
) error {
// https://github.com/dolthub/dolt/issues/8455
query = strings.ReplaceAll(query, "CHARACTER SET 'utf8mb4'", "CHARACTER SET utf8mb4")
return h.Handler.ComQuery(ctx, c, query, callback)
}

func WrapHandler(pool *ConnectionPool) server.HandlerWrapper {
return func(h mysql.Handler) (mysql.Handler, error) {
handler, ok := h.(*server.Handler)
Expand Down
2 changes: 2 additions & 0 deletions backend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (sess Session) PersistGlobal(sysVarName string, value interface{}) error {
if _, _, ok := sql.SystemVariables.GetGlobal(sysVarName); !ok {
return sql.ErrUnknownSystemVariable.New(sysVarName)
}
sess.GetLogger().Tracef("Persisting global variable %s = %v", sysVarName, value)
_, err := sess.ExecContext(
context.Background(),
catalog.InternalTables.PersistentVariable.UpsertStmt(),
Expand Down Expand Up @@ -162,6 +163,7 @@ func (sess Session) GetPersistedValue(k string) (interface{}, error) {
catalog.InternalTables.PersistentVariable.SelectStmt(),
k,
).Scan(&value, &vtype)
sess.GetLogger().Tracef("Getting persisted global variable %s = %s [%s]", k, value, vtype)
switch {
case err == stdsql.ErrNoRows:
return nil, nil
Expand Down
36 changes: 30 additions & 6 deletions catalog/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,28 @@ import (
)

type rowInserter struct {
db string
table string
schema sql.Schema
db string
table string
schema sql.Schema
replace bool

once sync.Once
conn *stdsql.Conn
tmpTable string
stmt *stdsql.Stmt
err error
flushSQL string
}

var _ sql.RowInserter = &rowInserter{}
var _ sql.RowReplacer = &rowInserter{}

// Delete implements sql.RowReplacer.
// Since REPLACE is handled in the same way as INSERT,
// we don't need to implement it and it should never be called.
func (ri *rowInserter) Delete(ctx *sql.Context, row sql.Row) error {
return errors.New("unexpected call to Delete")
}

func (ri *rowInserter) init(ctx *sql.Context) {
ri.tmpTable = fmt.Sprintf("%s_%s_%d", ri.db, ri.table, ctx.ID())
Expand All @@ -43,7 +53,8 @@ func (ri *rowInserter) init(ctx *sql.Context) {

// TODO(fan): Appender is faster, but it requires strict type alignment.
var insert strings.Builder
insert.WriteString("INSERT INTO ")
insert.Grow(64)
insert.WriteString("INSERT INTO ") // the temp table is keyless, so REPLACE is not needed
insert.WriteString(QuoteIdentifierANSI(ri.tmpTable))
insert.WriteString(" VALUES (")
insert.WriteByte('?')
Expand All @@ -52,6 +63,20 @@ func (ri *rowInserter) init(ctx *sql.Context) {
}
insert.WriteByte(')')
ri.stmt, ri.err = ri.conn.PrepareContext(ctx, insert.String())
if ri.err != nil {
return
}

insert.Reset()
insert.WriteString("INSERT ")
if ri.replace {
insert.WriteString(" OR REPLACE")
}
insert.WriteString(" INTO ")
insert.WriteString(ConnectIdentifiersANSI(ri.db, ri.table))
insert.WriteString(" SELECT * FROM ")
insert.WriteString(QuoteIdentifierANSI(ri.tmpTable))
ri.flushSQL = insert.String()
}

func (ri *rowInserter) StatementBegin(ctx *sql.Context) {
Expand All @@ -71,8 +96,7 @@ func (ri *rowInserter) StatementComplete(ctx *sql.Context) error {
func (ri *rowInserter) Close(ctx *sql.Context) error {
defer ri.clear(ctx)
if ri.err == nil {
sql := fmt.Sprintf("INSERT INTO %s SELECT * FROM temp.main.%s", ConnectIdentifiersANSI(ri.db, ri.table), QuoteIdentifierANSI(ri.tmpTable))
_, ri.err = ri.conn.ExecContext(ctx, sql)
_, ri.err = ri.conn.ExecContext(ctx, ri.flushSQL)
}
return ri.err
}
Expand Down
30 changes: 26 additions & 4 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ package catalog
import "strings"

type InternalTable struct {
Schema string
Name string
KeyColumns []string
ValueColumns []string
DDL string
InitialData [][]any
}

func (it *InternalTable) QualifiedName() string {
return "main." + it.Name
return it.Schema + "." + it.Name
}

func (it *InternalTable) UpsertStmt() string {
var b strings.Builder
b.Grow(128)
b.WriteString("INSERT OR REPLACE INTO main.")
b.WriteString("INSERT OR REPLACE INTO ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" VALUES (?")
for range it.KeyColumns[1:] {
Expand All @@ -32,7 +36,9 @@ func (it *InternalTable) UpsertStmt() string {
func (it *InternalTable) DeleteStmt() string {
var b strings.Builder
b.Grow(128)
b.WriteString("DELETE FROM main.")
b.WriteString("DELETE FROM ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" WHERE ")
b.WriteString(it.KeyColumns[0])
Expand All @@ -53,7 +59,9 @@ func (it *InternalTable) SelectStmt() string {
b.WriteString(", ")
b.WriteString(c)
}
b.WriteString(" FROM main.")
b.WriteString(" FROM ")
b.WriteString(it.Schema)
b.WriteByte('.')
b.WriteString(it.Name)
b.WriteString(" WHERE ")
b.WriteString(it.KeyColumns[0])
Expand All @@ -69,22 +77,36 @@ func (it *InternalTable) SelectStmt() string {
var InternalTables = struct {
PersistentVariable InternalTable
BinlogPosition InternalTable
GlobalStatus InternalTable
}{
PersistentVariable: InternalTable{
Schema: "main",
Name: "persistent_variable",
KeyColumns: []string{"name"},
ValueColumns: []string{"value", "vtype"},
DDL: "name TEXT PRIMARY KEY, value TEXT, vtype TEXT",
},
BinlogPosition: InternalTable{
Schema: "main",
Name: "binlog_position",
KeyColumns: []string{"channel"},
ValueColumns: []string{"position"},
DDL: "channel TEXT PRIMARY KEY, position TEXT",
},
GlobalStatus: InternalTable{
Schema: "performance_schema",
Name: "global_status",
KeyColumns: []string{"VARIABLE_NAME"},
ValueColumns: []string{"VARIABLE_VALUE"},
DDL: "VARIABLE_NAME TEXT PRIMARY KEY, VARIABLE_VALUE TEXT",
InitialData: [][]any{
{"Innodb_redo_log_enabled", "OFF"}, // Queried by MySQL Shell
},
},
}

var internalTables = []InternalTable{
InternalTables.PersistentVariable,
InternalTables.BinlogPosition,
InternalTables.GlobalStatus,
}
15 changes: 15 additions & 0 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,27 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) {
}

for _, t := range internalTables {
if _, err := storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+t.Schema,
); err != nil {
return nil, fmt.Errorf("failed to create internal schema %q: %w", t.Schema, err)
}
if _, err := storage.ExecContext(
context.Background(),
"CREATE TABLE IF NOT EXISTS "+t.QualifiedName()+"("+t.DDL+")",
); err != nil {
return nil, fmt.Errorf("failed to create internal table %q: %w", t.Name, err)
}
for _, row := range t.InitialData {
if _, err := storage.ExecContext(
context.Background(),
t.UpsertStmt(),
row...,
); err != nil {
return nil, fmt.Errorf("failed to insert initial data into internal table %q: %w", t.Name, err)
}
}
}

return &DatabaseProvider{
Expand Down
12 changes: 12 additions & 0 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var _ sql.IndexAddressableTable = (*Table)(nil)
var _ sql.InsertableTable = (*Table)(nil)
var _ sql.UpdatableTable = (*Table)(nil)
var _ sql.DeletableTable = (*Table)(nil)
var _ sql.ReplaceableTable = (*Table)(nil)
var _ sql.CommentedTable = (*Table)(nil)

func NewTable(name string, db *Database) *Table {
Expand Down Expand Up @@ -315,6 +316,17 @@ func (t *Table) Deleter(*sql.Context) sql.RowDeleter {
return nil
}

// Replacer implements sql.ReplaceableTable.
func (t *Table) Replacer(*sql.Context) sql.RowReplacer {
hasKey := len(t.schema.PkOrdinals) > 0 || !sql.IsKeyless(t.schema.Schema)
return &rowInserter{
db: t.db.Name(),
table: t.name,
schema: t.schema.Schema,
replace: hasKey,
}
}

// CreateIndex implements sql.IndexAlterableTable.
func (t *Table) CreateIndex(ctx *sql.Context, indexDef sql.IndexDef) error {
// Lock the table to ensure thread-safety during index creation
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/Shopify/toxiproxy/v2 v2.9.0
github.com/apache/arrow/go/v17 v17.0.0
github.com/cockroachdb/apd/v3 v3.2.1
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc
github.com/go-sql-driver/mysql v1.8.1
github.com/jmoiron/sqlx v1.4.0
github.com/marcboeker/go-duckdb v1.8.2-0.20241002112231-62d5fa8c0697
Expand All @@ -22,8 +22,8 @@ require (
)

replace (
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196 => github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772 => github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8 => github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc => github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a
)

require (
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/apecloud/dolt-vitess v0.0.0-20240919225659-2ad81685e772 h1:OgHbYQJXAE
github.com/apecloud/dolt-vitess v0.0.0-20240919225659-2ad81685e772/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da h1:+sOwYwbN/kZZd0Ggsz+ozKa6gdAUYz/bgVMJkoDmuMc=
github.com/apecloud/dolt-vitess v0.0.0-20240927100428-4ba1490cf5da/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a h1:ARRxRvo7HUKFvqKhXh+nRVEV5K+AuyVP6QlntDzQPww=
github.com/apecloud/dolt-vitess v0.0.0-20241016030916-464ec1ba8a1a/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
Expand Down Expand Up @@ -75,12 +77,16 @@ github.com/dolthub/go-mysql-server v0.18.2-0.20240923181307-5aacdb13e45a h1:rpCm
github.com/dolthub/go-mysql-server v0.18.2-0.20240923181307-5aacdb13e45a/go.mod h1:lGbU2bK+QNnlETdUjOOaE+UnlEUu31VaQOFKAFGyZN4=
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196 h1:H4bKFiOdjmhBrdjrNvYAuhfplpHM3aVFcbLXlGoD/Fc=
github.com/dolthub/go-mysql-server v0.18.2-0.20240926171723-77ed13c03196/go.mod h1:lGbU2bK+QNnlETdUjOOaE+UnlEUu31VaQOFKAFGyZN4=
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8 h1:opC/9GtHMpPf5v0eRdngp166LcJTTyQ+YZfyjAchHaY=
github.com/dolthub/go-mysql-server v0.18.2-0.20241015190154-54bd6d6e1ce8/go.mod h1:Z8tket+3sYcU3d4yW90Ggld2d+C2DUgnpB8cBP0+GvI=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71/go.mod h1:2/2zjLQ/JOOSbbSboojeg+cAwcRV0fDLzIiWch/lhqI=
github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216 h1:JWkKRE4EHUcEVQCMRBej8DYxjYjRz/9MdF/NNQh0o70=
github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216/go.mod h1:e/FIZVvT2IR53HBCAo41NjqgtEnjMJGKca3Y/dAmZaA=
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772 h1:vDwBX7Lc8DnA8Zk0iRIu6slCw0GIUfYfFlYDYJQw8GQ=
github.com/dolthub/vitess v0.0.0-20240919225659-2ad81685e772/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc h1:ZZgTRuxEwd3X67njtK30buHeZScLAd4W0rbRV8CORhE=
github.com/dolthub/vitess v0.0.0-20241010201417-9d4f54b29ccc/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
Expand All @@ -91,6 +97,10 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7 h1:QSqlxTk6pjF/11KV2JsBNGLmIlMccdzpUkJmNp1lsTs=
github.com/fanyang01/go-mysql-server v0.0.0-20240927093603-e7d5b2c91bf7/go.mod h1:GYFkohqx2Nr8NNjcEwPV1XsALRl3/l0eKhMTVqUiPmM=
github.com/fanyang01/go-mysql-server v0.0.0-20241016030019-a1d92d867df4 h1:l+EazqTiTuD54N0lXpadkv7LoSxSNJQw/pLK82OKmIU=
github.com/fanyang01/go-mysql-server v0.0.0-20241016030019-a1d92d867df4/go.mod h1:muA0iXUB7NjOojVRfJBKE2dL4OAABq3ZSbISlTGvmQY=
github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb h1:FjAhczeEu2P/F2+YbKjR/Q4lgnD/0nCWPmaKHul797o=
github.com/fanyang01/go-mysql-server v0.0.0-20241016052333-7fc0e18f41bb/go.mod h1:muA0iXUB7NjOojVRfJBKE2dL4OAABq3ZSbISlTGvmQY=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
Expand Down
Loading