Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pg): transaction batching and columnar buffer for replication #160

Merged
merged 8 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,3 @@ jobs:
go test -v -cover --timeout 600s . | tee query.log
cat query.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat query.log | grep -q "FAIL" && exit 1 || exit 0

- name: Test Binlog Replication With GTID Enabled
run: |
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0

- name: Test Binlog Replication With GTID Disabled
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
17 changes: 6 additions & 11 deletions .github/workflows/mysql-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ on:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
strategy:
matrix:
GTID_ENABLED: [true, false]

steps:
- uses: actions/checkout@v4

Expand All @@ -26,9 +29,7 @@ jobs:
- name: Install dependencies
run: |
go get .

pip3 install "sqlglot[rs]"

curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
chmod +x duckdb
Expand All @@ -39,14 +40,8 @@ jobs:
- name: Build
run: go build -v

- name: Test Binlog Replication With GTID Enabled
run: |
GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0

- name: Test Binlog Replication With GTID Disabled
- name: Test Binlog Replication With GTID ${{ matrix.GTID_ENABLED }}
run: |
GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
GTID_ENABLED=${{ matrix.GTID_ENABLED }} go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log
cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}'
cat replication.log | grep -q "FAIL" && exit 1 || exit 0
28 changes: 24 additions & 4 deletions delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"unsafe"

"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/binlog"
"github.com/apecloud/myduckserver/catalog"
"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -28,12 +27,10 @@ type FlushStats struct {
type DeltaController struct {
mutex sync.Mutex
tables map[tableIdentifier]*DeltaAppender
pool *backend.ConnectionPool
}

func NewController(pool *backend.ConnectionPool) *DeltaController {
func NewController() *DeltaController {
return &DeltaController{
pool: pool,
tables: make(map[tableIdentifier]*DeltaAppender),
}
}
Expand Down Expand Up @@ -142,6 +139,10 @@ func (c *DeltaController) updateTable(
buf *bytes.Buffer,
stats *FlushStats,
) error {
if tx == nil {
return fmt.Errorf("no active transaction")
}

buf.Reset()

schema := appender.BaseSchema() // schema of the base table
Expand Down Expand Up @@ -284,6 +285,25 @@ func (c *DeltaController) updateTable(
}
stats.Deletions += affected

// For debugging:
//
// rows, err := tx.QueryContext(ctx, "SELECT * FROM "+qualifiedTableName)
// if err != nil {
// return err
// }
// defer rows.Close()
// row := make([]any, len(schema))
// pointers := make([]any, len(row))
// for i := range row {
// pointers[i] = &row[i]
// }
// for rows.Next() {
// if err := rows.Scan(pointers...); err != nil {
// return err
// }
// fmt.Printf("row:%+v\n", row)
// }

if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.TraceLevel) {
log.WithFields(logrus.Fields{
"table": qualifiedTableName,
Expand Down
4 changes: 2 additions & 2 deletions delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type DeltaAppender struct {
// https://mariadb.com/kb/en/gtid/
// https://dev.mysql.com/doc/refman/9.0/en/replication-gtids-concepts.html
func newDeltaAppender(schema sql.Schema) (*DeltaAppender, error) {
augmented := make(sql.Schema, 0, len(schema)+5)
augmented := make(sql.Schema, 0, len(schema)+6)
augmented = append(augmented, &sql.Column{
Name: "action", // delete = 0, update = 1, insert = 2
Type: types.Int8,
Expand Down Expand Up @@ -73,7 +73,7 @@ func (a *DeltaAppender) Schema() sql.Schema {
}

func (a *DeltaAppender) BaseSchema() sql.Schema {
return a.schema[5:]
return a.schema[6:]
}

func (a *DeltaAppender) Action() *array.Int8Builder {
Expand Down
4 changes: 4 additions & 0 deletions myarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package myarrow

import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apecloud/myduckserver/pgtypes"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -38,6 +39,9 @@ func ToArrowType(t sql.Type) (arrow.DataType, error) {
}

func toArrowType(t sql.Type) arrow.DataType {
if pgType, ok := t.(pgtypes.PostgresType); ok {
return pgtypes.PostgresTypeToArrowType(pgType.PG.OID)
}
switch t.Type() {
case query.Type_UINT8:
return arrow.PrimitiveTypes.Uint8
Expand Down
1 change: 1 addition & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type PortalData struct {
IsEmptyQuery bool
Fields []pgproto3.FieldDescription
Stmt *duckdb.Stmt
Vars []any
}

type PreparedStatementData struct {
Expand Down
21 changes: 14 additions & 7 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"net"
"os"
"runtime/debug"
"strings"
"unicode"

Expand Down Expand Up @@ -106,7 +107,7 @@ func (h *ConnectionHandler) HandleConnection() {
if HandlePanics {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Listener recovered panic: %v", r)
fmt.Printf("Listener recovered panic: %v\n%s\n", r, string(debug.Stack()))

var eomErr error
if returnErr != nil {
Expand All @@ -120,7 +121,7 @@ func (h *ConnectionHandler) HandleConnection() {
// Sending eom can panic, which means we must recover again
defer func() {
if r := recover(); r != nil {
fmt.Printf("Listener recovered panic: %v", r)
fmt.Printf("Listener recovered panic: %v\n%s\n", r, string(debug.Stack()))
}
}()
h.endOfMessages(eomErr)
Expand Down Expand Up @@ -288,7 +289,7 @@ func (h *ConnectionHandler) receiveMessage() (bool, error) {
if HandlePanics {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Listener recovered panic: %v", r)
fmt.Printf("Listener recovered panic: %v\n%s\n", r, string(debug.Stack()))

var eomErr error
if rErr, ok := r.(error); ok {
Expand Down Expand Up @@ -553,6 +554,7 @@ func (h *ConnectionHandler) handleBind(message *pgproto3.Bind) error {
Query: preparedData.Query,
Fields: fields,
Stmt: preparedData.Stmt,
Vars: bindVars,
}
return h.send(&pgproto3.BindComplete{})
}
Expand Down Expand Up @@ -668,7 +670,7 @@ func (h *ConnectionHandler) handleCopyDataHelper(message *pgproto3.CopyData) (st
}
fallthrough
case tree.CopyFormatCSV:
dataLoader, err = NewCsvDataLoader(sqlCtx, h.duckHandler, &schemaName, insertableTable, copyFrom.Columns, &copyFrom.Options)
dataLoader, err = NewCsvDataLoader(sqlCtx, h.duckHandler, schemaName, insertableTable, copyFrom.Columns, &copyFrom.Options)
case tree.CopyFormatBinary:
err = fmt.Errorf("BINARY format is not supported for COPY FROM")
default:
Expand Down Expand Up @@ -787,19 +789,24 @@ func (h *ConnectionHandler) deletePortal(name string) {
}

// convertBindParameters handles the conversion from bind parameters to variable values.
func (h *ConnectionHandler) convertBindParameters(types []uint32, formatCodes []int16, values [][]byte) ([]string, error) {
func (h *ConnectionHandler) convertBindParameters(types []uint32, formatCodes []int16, values [][]byte) ([]any, error) {
if len(types) != len(values) {
return nil, fmt.Errorf("number of values does not match number of parameters")
}
bindings := make([]string, len(values))
bindings := make([]pgtype.Text, len(values))
for i := range values {
typ := types[i]
// We'll rely on a library to decode each format, which will deal with text and binary representations for us
if err := h.pgTypeMap.Scan(typ, formatCodes[i], values[i], &bindings[i]); err != nil {
return nil, err
}
}
return bindings, nil

vars := make([]any, len(bindings))
for i, b := range bindings {
vars[i] = b.String
}
return vars, nil
}

// query runs the given query and sends a CommandComplete message to the client
Expand Down
24 changes: 18 additions & 6 deletions pgserver/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pgserver
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -49,19 +50,20 @@ var ErrCopyAborted = fmt.Errorf("COPY operation aborted")
type CsvDataLoader struct {
ctx *sql.Context
cancel context.CancelFunc
schema *string
schema string
table sql.InsertableTable
columns tree.NameList
options *tree.CopyOptions
pipePath string
pipe *os.File
errPipe *os.File // for error handling
rowCount chan int64
err atomic.Pointer[error]
}

var _ DataLoader = (*CsvDataLoader)(nil)

func NewCsvDataLoader(sqlCtx *sql.Context, handler *DuckHandler, schema *string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) {
func NewCsvDataLoader(sqlCtx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) {
duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
dataDir := duckBuilder.Provider().DataDir()

Expand Down Expand Up @@ -95,14 +97,22 @@ func NewCsvDataLoader(sqlCtx *sql.Context, handler *DuckHandler, schema *string,
// Execute the DuckDB COPY statement in a goroutine.
sql := loader.buildSQL()
loader.ctx.GetLogger().Trace(sql)
go loader.executeCopy(sql)
go loader.executeCopy(sql, pipePath)

// TODO(fan): If the reader fails to open the pipe, the writer will block forever.

// Open the pipe for writing.
// This operation will block until the reader opens the pipe for reading.
pipe, err := os.OpenFile(pipePath, os.O_WRONLY, 0600)
if err != nil {
return nil, err
}

// If the COPY operation failed to start, close the pipe and return the error.
if loader.errPipe != nil {
return nil, errors.Join(*loader.err.Load(), pipe.Close(), loader.errPipe.Close())
}

loader.pipe = pipe

return loader, nil
Expand All @@ -114,8 +124,8 @@ func (loader *CsvDataLoader) buildSQL() string {
b.Grow(256)

b.WriteString("COPY ")
if loader.schema != nil {
b.WriteString(*loader.schema)
if loader.schema != "" {
b.WriteString(loader.schema)
b.WriteString(".")
}
b.WriteString(loader.table.Name())
Expand Down Expand Up @@ -161,12 +171,14 @@ func (loader *CsvDataLoader) buildSQL() string {
return b.String()
}

func (loader *CsvDataLoader) executeCopy(sql string) {
func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) {
defer close(loader.rowCount)
result, err := adapter.Exec(loader.ctx, sql)
if err != nil {
loader.ctx.GetLogger().Error(err)
loader.err.Store(&err)
// Open the pipe once to unblock the writer
loader.errPipe, _ = os.OpenFile(pipePath, os.O_RDONLY, 0600)
return
}

Expand Down
Loading
Loading