Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: skip index creation for replicated table #274

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/configuration"
"github.com/apecloud/myduckserver/mycontext"
"github.com/dolthub/go-mysql-server/sql"
)

Expand Down Expand Up @@ -72,7 +74,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
return nil, err
}
for _, t := range tables {
t.WithSchema(ctx)
t.withSchema(ctx)
}
return tables, nil
}
Expand All @@ -91,7 +93,7 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
if err := rows.Scan(&tblName, &comment); err != nil {
return nil, ErrDuckDB.New(err)
}
t := NewTable(tblName, d).WithComment(DecodeComment[any](comment.String))
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
tbls = append(tbls, t)
}
if err := rows.Err(); err != nil {
Expand All @@ -106,13 +108,13 @@ func (d *Database) Name() string {
return d.name
}

func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, is_temp bool) error {
func (d *Database) createAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, temporary bool) error {

var columns []string
var columnCommentSQLs []string
var fullTableName string

if is_temp {
if temporary {
fullTableName = FullTableName("temp", "main", name)
} else {
fullTableName = FullTableName(d.catalog, d.name, name)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim

var fullColumnName string

if is_temp {
if temporary {
fullColumnName = FullColumnName("temp", "main", name, col.Name)
} else {
fullColumnName = FullColumnName(d.catalog, d.name, name, col.Name)
Expand All @@ -151,41 +153,46 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim
if col.Comment != "" || typ.mysql.Name != "" || col.Default != nil {
columnCommentSQLs = append(columnCommentSQLs,
fmt.Sprintf(`COMMENT ON COLUMN %s IS '%s'`, fullColumnName,
NewCommentWithMeta[MySQLType](col.Comment, typ.mysql).Encode()))
NewCommentWithMeta(col.Comment, typ.mysql).Encode()))
}
}

var sqlsBuild strings.Builder
var b strings.Builder

if is_temp {
sqlsBuild.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", ")))
if temporary {
b.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", ")))
} else {
sqlsBuild.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", ")))
b.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", ")))
}

var primaryKeys []string
for _, pkord := range schema.PkOrdinals {
primaryKeys = append(primaryKeys, schema.Schema[pkord].Name)
}

if len(primaryKeys) > 0 {
sqlsBuild.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", ")))
// https://github.com/apecloud/myduckserver/issues/272
if !(mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex()) {
if len(primaryKeys) > 0 {
b.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", ")))
}
}

sqlsBuild.WriteString(")")
b.WriteString(")")

// Add comment to the table
if comment != "" {
sqlsBuild.WriteString(fmt.Sprintf("; COMMENT ON TABLE %s IS '%s'", fullTableName, NewComment[any](comment).Encode()))
}
b.WriteString(fmt.Sprintf(
"; COMMENT ON TABLE %s IS '%s'",
fullTableName,
NewCommentWithMeta(comment, ExtraTableInfo{schema.PkOrdinals}).Encode(),
))

// Add column comments
for _, s := range columnCommentSQLs {
sqlsBuild.WriteString(";")
sqlsBuild.WriteString(s)
b.WriteString(";")
b.WriteString(s)
}

_, err := adapter.Exec(ctx, sqlsBuild.String())
_, err := adapter.Exec(ctx, b.String())
if err != nil {
if IsDuckDBTableAlreadyExistsError(err) {
return sql.ErrTableAlreadyExists.New(name)
Expand All @@ -202,14 +209,14 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim
func (d *Database) CreateTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string) error {
d.mu.Lock()
defer d.mu.Unlock()
return d.CreateAllTable(ctx, name, schema, collation, comment, false)
return d.createAllTable(ctx, name, schema, collation, comment, false)
}

// CreateTemporaryTable implements sql.CreateTemporaryTable.
func (d *Database) CreateTemporaryTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID) error {
d.mu.Lock()
defer d.mu.Unlock()
return d.CreateAllTable(ctx, name, schema, collation, "", true)
return d.createAllTable(ctx, name, schema, collation, "", true)
}

// DropTable implements sql.TableDropper.
Expand Down
34 changes: 28 additions & 6 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/configuration"
"github.com/apecloud/myduckserver/mycontext"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/marcboeker/go-duckdb"
Expand All @@ -17,10 +19,14 @@ type Table struct {
mu *sync.RWMutex
name string
db *Database
comment *Comment[any] // save the comment to avoid querying duckdb everytime
comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb everytime
schema sql.PrimaryKeySchema
}

type ExtraTableInfo struct {
PkOrdinals []int
}

type ColumnInfo struct {
ColumnName string
ColumnIndex int
Expand Down Expand Up @@ -53,19 +59,30 @@ func NewTable(name string, db *Database) *Table {
}
}

func (t *Table) WithComment(comment *Comment[any]) *Table {
func (t *Table) withComment(comment *Comment[ExtraTableInfo]) *Table {
t.comment = comment
return t
}

func (t *Table) WithSchema(ctx *sql.Context) *Table {
t.mu.Lock()
defer t.mu.Unlock()

func (t *Table) withSchema(ctx *sql.Context) *Table {
t.schema = getPKSchema(ctx, t.db.catalog, t.db.name, t.name)

// https://github.com/apecloud/myduckserver/issues/272
if len(t.schema.PkOrdinals) == 0 && configuration.IsReplicationWithoutIndex() {
// Pretend that the primary key exists
for _, idx := range t.comment.Meta.PkOrdinals {
t.schema.Schema[idx].PrimaryKey = true
}
t.schema = sql.NewPrimaryKeySchema(t.schema.Schema, t.comment.Meta.PkOrdinals...)
}

return t
}

func (t *Table) ExtraTableInfo() ExtraTableInfo {
return t.comment.Meta
}

// Collation implements sql.Table.
func (t *Table) Collation() sql.CollationID {
return sql.Collation_Default
Expand Down Expand Up @@ -333,6 +350,11 @@ func (t *Table) CreateIndex(ctx *sql.Context, indexDef sql.IndexDef) error {
t.mu.Lock()
defer t.mu.Unlock()

// https://github.com/apecloud/myduckserver/issues/272
if mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex() {
return nil
}

if indexDef.IsPrimary() {
return fmt.Errorf("primary key cannot be created with CreateIndex, use ALTER TABLE ... ADD PRIMARY KEY instead")
}
Expand Down
18 changes: 18 additions & 0 deletions configuration/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package configuration

import (
"os"
"strings"
)

const (
replicationWithoutIndex = "REPLICATION_WITHOUT_INDEX"
)

func IsReplicationWithoutIndex() bool {
switch strings.ToLower(os.Getenv(replicationWithoutIndex)) {
case "", "t", "1", "true":
return true
}
return false
}
Loading
Loading