From b21e3bbb9be1fc9b4ae816e23717de079497e120 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 20 Nov 2024 19:55:14 +0800 Subject: [PATCH 1/8] Implement COPY TO STDOUT --- backend/pipe.go | 0 pgserver/connection_data.go | 3 + pgserver/connection_handler.go | 123 +++++++++++++++++++++++++++------ pgserver/dataloader.go | 16 ++--- pgserver/datawriter.go | 118 +++++++++++++++++++++++++++++++ pgserver/validate.go | 46 ++++++------ 6 files changed, 256 insertions(+), 50 deletions(-) create mode 100644 backend/pipe.go create mode 100644 pgserver/datawriter.go diff --git a/backend/pipe.go b/backend/pipe.go new file mode 100644 index 00000000..e69de29b diff --git a/pgserver/connection_data.go b/pgserver/connection_data.go index 18b6d98c..4d001579 100644 --- a/pgserver/connection_data.go +++ b/pgserver/connection_data.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree" + "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/vitess/go/vt/proto/query" "github.com/jackc/pgx/v5/pgproto3" "github.com/lib/pq/oid" @@ -66,6 +67,8 @@ type copyFromStdinState struct { // node is used to look at what parameters were specified, such as which table to load data into, file format, // delimiters, etc. copyFromStdinNode *tree.CopyFrom + // targetTable stores the targetTable that the data is being loaded into. + targetTable sql.InsertableTable // dataLoader is the implementation of DataLoader that is used to load each individual CopyData chunk into the // target table. dataLoader DataLoader diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 0f8b425d..ad8e337d 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -27,16 +27,17 @@ import ( "runtime/debug" "slices" "strings" + "sync/atomic" "github.com/cockroachdb/cockroachdb-parser/pkg/sql/parser" "github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree" gms "github.com/dolthub/go-mysql-server" "github.com/dolthub/go-mysql-server/server" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/vitess/go/mysql" "github.com/jackc/pgx/v5/pgproto3" "github.com/jackc/pgx/v5/pgtype" "github.com/sirupsen/logrus" + // Import the new datawriter package ) // ConnectionHandler is responsible for the entire lifecycle of a user connection: receiving messages they send, @@ -348,7 +349,7 @@ func (h *ConnectionHandler) receiveMessage() (bool, error) { // handleMessages processes the message provided and returns status flags indicating what the connection should do next. // If the |stop| response parameter is true, it indicates that the connection should be closed by the caller. If the // |endOfMessages| response parameter is true, it indicates that no more messages are expected for the current operation -// and a READY FOR QUERY message should be sent back to the client, so it can send the next query. +// and a READY FOR QUERY message should be sent back to the client so it can send the next query. func (h *ConnectionHandler) handleMessage(msg pgproto3.Message) (stop, endOfMessages bool, err error) { logrus.Tracef("Handling message: %T", msg) switch message := msg.(type) { @@ -448,11 +449,12 @@ func (h *ConnectionHandler) handleQueryOutsideEngine(query ConvertedQuery) (hand if stmt.Stdin { return true, false, h.handleCopyFromStdinQuery(query, stmt, h.Conn()) } + case *tree.CopyTo: + return true, true, h.handleCopyToStdout(query, stmt) } return false, true, nil } -// handleParse handles a parse message, returning any error that occurs func (h *ConnectionHandler) handleParse(message *pgproto3.Parse) error { h.waitForSync = true @@ -662,20 +664,9 @@ func (h *ConnectionHandler) handleCopyDataHelper(message *pgproto3.CopyData) (st if copyFrom == nil { return false, false, fmt.Errorf("no COPY FROM STDIN node found") } - - // TODO: It would be better to get the table from the copyFromStdinNode – not by calling core.GetSqlTableFromContext - schemaName := copyFrom.Table.Schema() - tableName := copyFrom.Table.Table() - table, err := GetSqlTableFromContext(sqlCtx, schemaName, tableName) - if err != nil { - return false, true, err - } + table := h.copyFromStdinState.targetTable if table == nil { - return false, true, fmt.Errorf(`relation "%s" does not exist`, tableName) - } - insertableTable, ok := table.(sql.InsertableTable) - if !ok { - return false, true, fmt.Errorf(`table "%s" is read-only`, tableName) + return false, true, fmt.Errorf("no target table found") } switch copyFrom.Options.CopyFormat { @@ -693,12 +684,15 @@ func (h *ConnectionHandler) handleCopyDataHelper(message *pgproto3.CopyData) (st } fallthrough case tree.CopyFormatCSV: - dataLoader, err = NewCsvDataLoader(sqlCtx, h.duckHandler, schemaName, insertableTable, copyFrom.Columns, ©From.Options) + dataLoader, err = NewCsvDataLoader( + sqlCtx, h.duckHandler, + copyFrom.Table.Schema(), table, copyFrom.Columns, + ©From.Options, + ) case tree.CopyFormatBinary: err = fmt.Errorf("BINARY format is not supported for COPY FROM") default: - err = fmt.Errorf("unknown format specified for COPY FROM: %v", - copyFrom.Options.CopyFormat) + err = fmt.Errorf("unknown format specified for COPY FROM: %v", copyFrom.Options.CopyFormat) } if err != nil { @@ -1097,12 +1091,14 @@ func (h *ConnectionHandler) handleCopyFromStdinQuery(query ConvertedQuery, copyF } sqlCtx.SetLogger(sqlCtx.GetLogger().WithField("query", query.String)) - if err := ValidateCopyFrom(copyFrom, sqlCtx); err != nil { + table, err := ValidateCopyFrom(copyFrom, sqlCtx) + if err != nil { return err } h.copyFromStdinState = ©FromStdinState{ copyFromStdinNode: copyFrom, + targetTable: table, } return h.send(&pgproto3.CopyInResponse{ @@ -1140,3 +1136,90 @@ func returnsRow(tag string) bool { return false } } + +func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tree.CopyTo) error { + ctx, err := h.duckHandler.NewContext(context.Background(), h.mysqlConn, query.String) + if err != nil { + return err + } + ctx.SetLogger(ctx.GetLogger().WithField("query", query.String)) + + table, err := ValidateCopyTo(copyTo, ctx) + if err != nil { + return err + } + + var stmt string + if copyTo.Statement != nil { + stmt = copyTo.Statement.String() + } + dataWriter, err := NewDataWriter(ctx, h.duckHandler, table, copyTo.Table, copyTo.Columns, stmt, ©To.Options) + if err != nil { + return err + } + defer dataWriter.Cancel() + + // Send CopyOutResponse to the client + copyOutResponse := &pgproto3.CopyOutResponse{ + OverallFormat: 0, // 0 for text format + } + if err := h.send(copyOutResponse); err != nil { + return err + } + + // Create a channel to receive the result from the goroutine + type copyResult struct { + rowCount int + err error + } + done := make(chan copyResult, 1) + + pipe, ch, err := dataWriter.Start() + if err != nil { + return err + } + var sendErr atomic.Value + go func() { + defer pipe.Close() + defer close(done) + buf := make([]byte, 1<<20) // 1MB buffer + for { + n, err := pipe.Read(buf) + if n > 0 { + copyData := &pgproto3.CopyData{ + Data: buf[:n], + } + if err := h.send(copyData); err != nil { + sendErr.Store(err) + return + } + } + if err != nil { + if err == io.EOF { + break + } + sendErr.Store(err) + return + } + } + }() + + result := <-ch + if result.Err != nil { + return fmt.Errorf("failed to copy data: %w", result.Err) + } + + if err := sendErr.Load(); err != nil { + return err.(error) + } + + // After data is sent and CopyToPipe is finished without errors, send CopyDone + if err := h.send(&pgproto3.CopyDone{}); err != nil { + return err + } + + // Send CommandComplete with the number of rows copied + return h.send(&pgproto3.CommandComplete{ + CommandTag: []byte(fmt.Sprintf("COPY %d", result.RowCount)), + }) +} diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index fef7750b..5202c375 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -63,28 +63,28 @@ type CsvDataLoader struct { var _ DataLoader = (*CsvDataLoader)(nil) -func NewCsvDataLoader(sqlCtx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { +func NewCsvDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) dataDir := duckBuilder.Provider().DataDir() // Create the FIFO pipe - pipeDir := filepath.Join(dataDir, "pipes", "load-data") + pipeDir := filepath.Join(dataDir, "pipes", "pg-copy-from") if err := os.MkdirAll(pipeDir, 0755); err != nil { return nil, err } - pipeName := strconv.Itoa(int(sqlCtx.ID())) + ".pipe" + pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" pipePath := filepath.Join(pipeDir, pipeName) - sqlCtx.GetLogger().Traceln("Creating FIFO pipe for COPY operation:", pipePath) + ctx.GetLogger().Traceln("Creating FIFO pipe for COPY FROM operation:", pipePath) if err := syscall.Mkfifo(pipePath, 0600); err != nil { return nil, err } // Create cancelable context - childCtx, cancel := context.WithCancel(sqlCtx) - sqlCtx.Context = childCtx + childCtx, cancel := context.WithCancel(ctx) + ctx.Context = childCtx loader := &CsvDataLoader{ - ctx: sqlCtx, + ctx: ctx, cancel: cancel, schema: schema, table: table, @@ -103,7 +103,7 @@ func NewCsvDataLoader(sqlCtx *sql.Context, handler *DuckHandler, schema string, // Open the pipe for writing. // This operation will block until the reader opens the pipe for reading. - pipe, err := os.OpenFile(pipePath, os.O_WRONLY, 0600) + pipe, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe) if err != nil { return nil, err } diff --git a/pgserver/datawriter.go b/pgserver/datawriter.go new file mode 100644 index 00000000..bee656f9 --- /dev/null +++ b/pgserver/datawriter.go @@ -0,0 +1,118 @@ +package pgserver + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "syscall" + + "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/backend" + "github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree" + "github.com/dolthub/go-mysql-server/sql" +) + +type DataWriter struct { + ctx *sql.Context + cancel context.CancelFunc + duckSQL string + options *tree.CopyOptions + pipePath string +} + +func NewDataWriter( + ctx *sql.Context, + handler *DuckHandler, + table sql.Table, tableName tree.TableName, columns tree.NameList, + query string, + options *tree.CopyOptions, +) (*DataWriter, error) { + // https://www.postgresql.org/docs/current/sql-copy.html + // https://duckdb.org/docs/sql/statements/copy.html#csv-options + var format string + switch options.CopyFormat { + case tree.CopyFormatText: + format = `FORMAT CSV, DELIMITER '\t', QUOTE '', ESCAPE '', NULLSTR '\N'` + case tree.CopyFormatCSV: + format = `FORMAT CSV` + case tree.CopyFormatBinary: + return nil, fmt.Errorf("BINARY format is not supported for COPY TO") + } + + var source string + if table != nil { + source = tableName.FQString() + if columns != nil { + source += "(" + columns.String() + ")" + } + } else { + source = "(" + query + ")" + } + + duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) + dataDir := duckBuilder.Provider().DataDir() + + // Create the FIFO pipe + pipeDir := filepath.Join(dataDir, "pipes", "pg-copy-to") + if err := os.MkdirAll(pipeDir, 0755); err != nil { + return nil, err + } + pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" + pipePath := filepath.Join(pipeDir, pipeName) + ctx.GetLogger().Traceln("Creating FIFO pipe for COPY TO operation:", pipePath) + if err := syscall.Mkfifo(pipePath, 0600); err != nil { + return nil, err + } + + // Create cancelable context + childCtx, cancel := context.WithCancel(ctx) + ctx.Context = childCtx + + // Initialize DataWriter + writer := &DataWriter{ + ctx: ctx, + cancel: cancel, + duckSQL: fmt.Sprintf("COPY %s TO '%s' (%s)", source, pipePath, format), + options: options, + pipePath: pipePath, + rowCount: make(chan int64, 1), + } + + return writer, nil +} + +type copyToResult struct { + RowCount int64 + Err error +} + +func (dw *DataWriter) Start() (*os.File, chan int64, error) { + // Open the pipe for reading. + pipe, err := os.OpenFile(dw.pipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + return nil, nil, fmt.Errorf("failed to open pipe for reading: %w", err) + } + + go func() { + defer dw.cancel() + defer pipe.Close() + defer os.Remove(dw.pipePath) + defer close(dw.rowCount) + // This operation will block until the reader opens the pipe for reading. + result, err := adapter.ExecCatalog(dw.ctx, dw.duckSQL) + if err != nil { + dw.err.Store(&err) + return + } + affected, _ := result.RowsAffected() + dw.rowCount <- affected + }() + + return pipe, dw.rowCount, nil +} + +func (dw *DataWriter) Cancel() { + dw.cancel() +} diff --git a/pgserver/validate.go b/pgserver/validate.go index 5e8202f4..e61c4e4d 100644 --- a/pgserver/validate.go +++ b/pgserver/validate.go @@ -16,40 +16,42 @@ package pgserver import ( "fmt" - "strings" "github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree" "github.com/dolthub/go-mysql-server/sql" ) -// Validate returns an error if the CopyFrom node is invalid, for example if it contains columns that -// are not in the table schema. -func ValidateCopyFrom(cf *tree.CopyFrom, ctx *sql.Context) error { +// ValidateCopyFrom returns an error if the CopyFrom node is invalid. +func ValidateCopyFrom(cf *tree.CopyFrom, ctx *sql.Context) (sql.InsertableTable, error) { table, err := GetSqlTableFromContext(ctx, cf.Table.Schema(), cf.Table.Table()) if err != nil { - return err + return nil, err } if table == nil { - return fmt.Errorf(`relation "%s" does not exist`, cf.Table.Table()) + return nil, fmt.Errorf(`relation "%s" does not exist`, cf.Table.Table()) } - if _, ok := table.(sql.InsertableTable); !ok { - return fmt.Errorf(`table "%s" is read-only`, cf.Table.Table()) + if it, ok := table.(sql.InsertableTable); !ok { + return nil, fmt.Errorf(`table "%s" is read-only`, cf.Table.Table()) + } else { + return it, nil } +} - // If a set of columns was explicitly specified, validate them - if len(cf.Columns) > 0 { - if len(table.Schema()) != len(cf.Columns) { - return fmt.Errorf("invalid column name list for table %s: %v", table.Name(), cf.Columns) - } - - for i, col := range table.Schema() { - name := cf.Columns[i] - nameString := strings.Trim(name.String(), `"`) - if nameString != col.Name { - return fmt.Errorf("invalid column name list for table %s: %v", table.Name(), cf.Columns) - } +// ValidateCopyTo returns an error if the CopyTo node is invalid, for example if it contains columns that +// are not in the table schema. +func ValidateCopyTo(ct *tree.CopyTo, ctx *sql.Context) (sql.Table, error) { + if ct.Table.Table() == "" { + if ct.Statement == nil { + return nil, fmt.Errorf("no table specified") } + return nil, nil } - - return nil + table, err := GetSqlTableFromContext(ctx, ct.Table.Schema(), ct.Table.Table()) + if err != nil { + return nil, err + } + if table == nil { + return nil, fmt.Errorf(`relation "%s" does not exist`, ct.Table.Table()) + } + return table, nil } From f793ec8e4a271b7e012ad0fe25cd7b4988f8c3c1 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 20 Nov 2024 22:44:54 +0800 Subject: [PATCH 2/8] Make it work --- .gitignore | 1 + backend/loaddata.go | 13 ++---- backend/pipe.go | 25 +++++++++++ pgserver/connection_handler.go | 80 ++++++++++++++++++++++------------ pgserver/dataloader.go | 16 ++----- pgserver/datawriter.go | 60 +++++++++---------------- 6 files changed, 106 insertions(+), 89 deletions(-) diff --git a/.gitignore b/.gitignore index 9b4ebe9c..8ff8ac38 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ pipes/ *.sock __debug_* .DS_Store +*.csv diff --git a/backend/loaddata.go b/backend/loaddata.go index 891db0ad..50127793 100644 --- a/backend/loaddata.go +++ b/backend/loaddata.go @@ -8,7 +8,6 @@ import ( "runtime" "strconv" "strings" - "syscall" "github.com/apecloud/myduckserver/adapter" "github.com/apecloud/myduckserver/catalog" @@ -80,21 +79,15 @@ func (db *DuckBuilder) buildClientSideLoadData(ctx *sql.Context, insert *plan.In } defer reader.Close() - // Create the FIFO pipe - pipeDir := filepath.Join(db.provider.DataDir(), "pipes", "load-data") - if err := os.MkdirAll(pipeDir, 0755); err != nil { - return nil, err - } - pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" - pipePath := filepath.Join(pipeDir, pipeName) - if err := syscall.Mkfifo(pipePath, 0600); err != nil { + pipePath, err := db.CreatePipe(ctx, "load-data") + if err != nil { return nil, err } defer os.Remove(pipePath) // Write the data to the FIFO pipe. go func() { - pipe, err := os.OpenFile(pipePath, os.O_WRONLY, 0600) + pipe, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe) if err != nil { return } diff --git a/backend/pipe.go b/backend/pipe.go index e69de29b..f8ea1d7f 100644 --- a/backend/pipe.go +++ b/backend/pipe.go @@ -0,0 +1,25 @@ +package backend + +import ( + "os" + "path/filepath" + "strconv" + "syscall" + + "github.com/dolthub/go-mysql-server/sql" +) + +func (h *DuckBuilder) CreatePipe(ctx *sql.Context, subdir string) (string, error) { + // Create the FIFO pipe + pipeDir := filepath.Join(h.provider.DataDir(), "pipes", subdir) + if err := os.MkdirAll(pipeDir, 0755); err != nil { + return "", err + } + pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" + pipePath := filepath.Join(pipeDir, pipeName) + ctx.GetLogger().Debugln("Creating FIFO pipe for COPY FROM operation:", pipePath) + if err := syscall.Mkfifo(pipePath, 0600); err != nil { + return "", err + } + return pipePath, nil +} diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index ad8e337d..8d87e7f6 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net" @@ -1144,6 +1145,11 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre } ctx.SetLogger(ctx.GetLogger().WithField("query", query.String)) + // Create cancelable context + childCtx, cancel := context.WithCancel(ctx) + defer cancel() + ctx = ctx.WithContext(childCtx) + table, err := ValidateCopyTo(copyTo, ctx) if err != nil { return err @@ -1153,35 +1159,45 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre if copyTo.Statement != nil { stmt = copyTo.Statement.String() } - dataWriter, err := NewDataWriter(ctx, h.duckHandler, table, copyTo.Table, copyTo.Columns, stmt, ©To.Options) + writer, err := NewDataWriter( + ctx, h.duckHandler, + copyTo.Table.Schema(), table, copyTo.Columns, + stmt, + ©To.Options, + ) if err != nil { return err } - defer dataWriter.Cancel() + defer writer.Close() // Send CopyOutResponse to the client + ctx.GetLogger().Debug("sending CopyOutResponse to the client") copyOutResponse := &pgproto3.CopyOutResponse{ - OverallFormat: 0, // 0 for text format + OverallFormat: 0, // 0 for text format + ColumnFormatCodes: []uint16{0}, // 0 for text format } if err := h.send(copyOutResponse); err != nil { return err } - // Create a channel to receive the result from the goroutine - type copyResult struct { - rowCount int - err error - } - done := make(chan copyResult, 1) - - pipe, ch, err := dataWriter.Start() + pipePath, ch, err := writer.Start() if err != nil { return err } + var sendErr atomic.Value go func() { + // Open the pipe for reading. + ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath) + pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + sendErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err)) + cancel() + return + } defer pipe.Close() - defer close(done) + + ctx.GetLogger().Debug("Copying data from the pipe to the client") buf := make([]byte, 1<<20) // 1MB buffer for { n, err := pipe.Read(buf) @@ -1189,8 +1205,10 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre copyData := &pgproto3.CopyData{ Data: buf[:n], } + ctx.GetLogger().Debugf("sending CopyData (%d bytes) to the client", n) if err := h.send(copyData); err != nil { sendErr.Store(err) + cancel() return } } @@ -1199,27 +1217,35 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre break } sendErr.Store(err) + cancel() return } } }() - result := <-ch - if result.Err != nil { - return fmt.Errorf("failed to copy data: %w", result.Err) - } + select { + case <-ctx.Done(): // Context is canceled + err, _ := sendErr.Load().(error) + return errors.Join(ctx.Err(), err) + case result := <-ch: + if result.Err != nil { + return fmt.Errorf("failed to copy data: %w", result.Err) + } - if err := sendErr.Load(); err != nil { - return err.(error) - } + if err, ok := sendErr.Load().(error); ok { + return err + } - // After data is sent and CopyToPipe is finished without errors, send CopyDone - if err := h.send(&pgproto3.CopyDone{}); err != nil { - return err - } + // After data is sent and the producer side is finished without errors, send CopyDone + ctx.GetLogger().Debug("sending CopyDone to the client") + if err := h.send(&pgproto3.CopyDone{}); err != nil { + return err + } - // Send CommandComplete with the number of rows copied - return h.send(&pgproto3.CommandComplete{ - CommandTag: []byte(fmt.Sprintf("COPY %d", result.RowCount)), - }) + // Send CommandComplete with the number of rows copied + ctx.GetLogger().Debugf("sending CommandComplete to the client") + return h.send(&pgproto3.CommandComplete{ + CommandTag: []byte(fmt.Sprintf("COPY %d", result.RowCount)), + }) + } } diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index 5202c375..2208975d 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -7,11 +7,9 @@ import ( "fmt" "io" "os" - "path/filepath" "strconv" "strings" "sync/atomic" - "syscall" "github.com/apecloud/myduckserver/adapter" "github.com/apecloud/myduckserver/backend" @@ -64,18 +62,10 @@ type CsvDataLoader struct { var _ DataLoader = (*CsvDataLoader)(nil) func NewCsvDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { - duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) - dataDir := duckBuilder.Provider().DataDir() - // Create the FIFO pipe - pipeDir := filepath.Join(dataDir, "pipes", "pg-copy-from") - if err := os.MkdirAll(pipeDir, 0755); err != nil { - return nil, err - } - pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" - pipePath := filepath.Join(pipeDir, pipeName) - ctx.GetLogger().Traceln("Creating FIFO pipe for COPY FROM operation:", pipePath) - if err := syscall.Mkfifo(pipePath, 0600); err != nil { + duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) + pipePath, err := duckBuilder.CreatePipe(ctx, "pg-copy-from") + if err != nil { return nil, err } diff --git a/pgserver/datawriter.go b/pgserver/datawriter.go index bee656f9..bbc3ed2d 100644 --- a/pgserver/datawriter.go +++ b/pgserver/datawriter.go @@ -1,22 +1,18 @@ package pgserver import ( - "context" "fmt" "os" - "path/filepath" - "strconv" - "syscall" "github.com/apecloud/myduckserver/adapter" "github.com/apecloud/myduckserver/backend" + "github.com/apecloud/myduckserver/catalog" "github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree" "github.com/dolthub/go-mysql-server/sql" ) type DataWriter struct { ctx *sql.Context - cancel context.CancelFunc duckSQL string options *tree.CopyOptions pipePath string @@ -25,7 +21,7 @@ type DataWriter struct { func NewDataWriter( ctx *sql.Context, handler *DuckHandler, - table sql.Table, tableName tree.TableName, columns tree.NameList, + schema string, table sql.Table, columns tree.NameList, query string, options *tree.CopyOptions, ) (*DataWriter, error) { @@ -43,7 +39,10 @@ func NewDataWriter( var source string if table != nil { - source = tableName.FQString() + if schema != "" { + source += catalog.QuoteIdentifierANSI(schema) + "." + } + source += catalog.QuoteIdentifierANSI(table.Name()) if columns != nil { source += "(" + columns.String() + ")" } @@ -51,33 +50,19 @@ func NewDataWriter( source = "(" + query + ")" } - duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) - dataDir := duckBuilder.Provider().DataDir() - // Create the FIFO pipe - pipeDir := filepath.Join(dataDir, "pipes", "pg-copy-to") - if err := os.MkdirAll(pipeDir, 0755); err != nil { - return nil, err - } - pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" - pipePath := filepath.Join(pipeDir, pipeName) - ctx.GetLogger().Traceln("Creating FIFO pipe for COPY TO operation:", pipePath) - if err := syscall.Mkfifo(pipePath, 0600); err != nil { + db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) + pipePath, err := db.CreatePipe(ctx, "pg-copy-to") + if err != nil { return nil, err } - // Create cancelable context - childCtx, cancel := context.WithCancel(ctx) - ctx.Context = childCtx - // Initialize DataWriter writer := &DataWriter{ ctx: ctx, - cancel: cancel, duckSQL: fmt.Sprintf("COPY %s TO '%s' (%s)", source, pipePath, format), options: options, pipePath: pipePath, - rowCount: make(chan int64, 1), } return writer, nil @@ -88,31 +73,28 @@ type copyToResult struct { Err error } -func (dw *DataWriter) Start() (*os.File, chan int64, error) { - // Open the pipe for reading. - pipe, err := os.OpenFile(dw.pipePath, os.O_RDONLY, os.ModeNamedPipe) - if err != nil { - return nil, nil, fmt.Errorf("failed to open pipe for reading: %w", err) - } - +func (dw *DataWriter) Start() (string, chan copyToResult, error) { + // Execute the COPY TO statement in a separate goroutine. + ch := make(chan copyToResult, 1) go func() { - defer dw.cancel() - defer pipe.Close() defer os.Remove(dw.pipePath) - defer close(dw.rowCount) + defer close(ch) + + dw.ctx.GetLogger().Tracef("Executing COPY TO statement: %s", dw.duckSQL) + // This operation will block until the reader opens the pipe for reading. result, err := adapter.ExecCatalog(dw.ctx, dw.duckSQL) if err != nil { - dw.err.Store(&err) + ch <- copyToResult{Err: err} return } affected, _ := result.RowsAffected() - dw.rowCount <- affected + ch <- copyToResult{RowCount: affected} }() - return pipe, dw.rowCount, nil + return dw.pipePath, ch, nil } -func (dw *DataWriter) Cancel() { - dw.cancel() +func (dw *DataWriter) Close() { + os.Remove(dw.pipePath) } From fbd6d2c441cb67cc9b42b14d773523ab6ddb11c6 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 20 Nov 2024 23:03:50 +0800 Subject: [PATCH 3/8] Update debug log message in pipe.go --- backend/pipe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/pipe.go b/backend/pipe.go index f8ea1d7f..8daf3fbc 100644 --- a/backend/pipe.go +++ b/backend/pipe.go @@ -17,7 +17,7 @@ func (h *DuckBuilder) CreatePipe(ctx *sql.Context, subdir string) (string, error } pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" pipePath := filepath.Join(pipeDir, pipeName) - ctx.GetLogger().Debugln("Creating FIFO pipe for COPY FROM operation:", pipePath) + ctx.GetLogger().Debugln("Creating FIFO pipe for COPY operation:", pipePath) if err := syscall.Mkfifo(pipePath, 0600); err != nil { return "", err } From ac075bfa3f329557add509170933561831a75784 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 21 Nov 2024 13:55:00 +0800 Subject: [PATCH 4/8] Handle options & add tests --- pgserver/connection_handler.go | 1 - pgserver/dataloader.go | 55 ++++++++++++--------- pgserver/datawriter.go | 90 ++++++++++++++++++++++++---------- pgtest/psql/copy/basic.sql | 4 +- pgtest/psql/copy/stdout.sql | 15 ++++++ 5 files changed, 115 insertions(+), 50 deletions(-) create mode 100644 pgtest/psql/copy/stdout.sql diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index f12237e9..681f20ae 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -672,7 +672,6 @@ func (h *ConnectionHandler) handleCopyDataHelper(message *pgproto3.CopyData) (st switch copyFrom.Options.CopyFormat { case tree.CopyFormatText: - copyFrom.Options.Delimiter = tree.NewStrVal("\t") // Remove trailing backslash, comma and newline characters from the data if bytes.HasSuffix(message.Data, []byte{'\n'}) { message.Data = message.Data[:len(message.Data)-1] diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index 2208975d..5be82aaa 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -61,7 +61,7 @@ type CsvDataLoader struct { var _ DataLoader = (*CsvDataLoader)(nil) -func NewCsvDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { +func NewCsvDataxLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { // Create the FIFO pipe duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) pipePath, err := duckBuilder.CreatePipe(ctx, "pg-copy-from") @@ -128,32 +128,43 @@ func (loader *CsvDataLoader) buildSQL() string { b.WriteString(" FROM '") b.WriteString(loader.pipePath) - b.WriteString("' (AUTO_DETECT false") + b.WriteString("' (FORMAT CSV") options := loader.options - if options.HasHeader && options.Header { - b.WriteString(", HEADER") - } - - if options.Delimiter != nil { - b.WriteString(", SEP ") - b.WriteString(options.Delimiter.String()) - } - - if options.Quote != nil { - b.WriteString(", QUOTE ") - b.WriteString(singleQuotedDuckChar(options.Quote.RawString())) - } - - if options.Escape != nil { - b.WriteString(", ESCAPE ") - b.WriteString(singleQuotedDuckChar(options.Escape.RawString())) + switch options.CopyFormat { + case tree.CopyFormatText, tree.CopyFormatCSV: + if options.Delimiter != nil { + b.WriteString(", SEP ") + b.WriteString(options.Delimiter.String()) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, SEP '\t'`) + } + + if options.Quote != nil { + b.WriteString(", QUOTE ") + b.WriteString(singleQuotedDuckChar(options.Quote.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, QUOTE ''`) + } + + if options.Escape != nil { + b.WriteString(", ESCAPE ") + b.WriteString(singleQuotedDuckChar(options.Escape.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, ESCAPE ''`) + } + + if options.Null != nil { + b.WriteString(", NULLSTR ") + b.WriteString(options.Null.String()) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, NULLSTR '\N'`) + } } - if options.Null != nil { - b.WriteString(", NULLSTR ") - b.WriteString(loader.options.Null.String()) + if options.HasHeader && options.Header { + b.WriteString(", HEADER") } b.WriteString(")") diff --git a/pgserver/datawriter.go b/pgserver/datawriter.go index bbc3ed2d..972c7efe 100644 --- a/pgserver/datawriter.go +++ b/pgserver/datawriter.go @@ -3,6 +3,7 @@ package pgserver import ( "fmt" "os" + "strings" "github.com/apecloud/myduckserver/adapter" "github.com/apecloud/myduckserver/backend" @@ -25,47 +26,86 @@ func NewDataWriter( query string, options *tree.CopyOptions, ) (*DataWriter, error) { + // Create the FIFO pipe + db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) + pipePath, err := db.CreatePipe(ctx, "pg-copy-to") + if err != nil { + return nil, err + } + // https://www.postgresql.org/docs/current/sql-copy.html // https://duckdb.org/docs/sql/statements/copy.html#csv-options - var format string - switch options.CopyFormat { - case tree.CopyFormatText: - format = `FORMAT CSV, DELIMITER '\t', QUOTE '', ESCAPE '', NULLSTR '\N'` - case tree.CopyFormatCSV: - format = `FORMAT CSV` - case tree.CopyFormatBinary: - return nil, fmt.Errorf("BINARY format is not supported for COPY TO") - } + var builder strings.Builder + builder.Grow(128) - var source string + builder.WriteString("COPY ") if table != nil { if schema != "" { - source += catalog.QuoteIdentifierANSI(schema) + "." + builder.WriteString(catalog.QuoteIdentifierANSI(schema)) + builder.WriteString(".") } - source += catalog.QuoteIdentifierANSI(table.Name()) + builder.WriteString(catalog.QuoteIdentifierANSI(table.Name())) if columns != nil { - source += "(" + columns.String() + ")" + builder.WriteString("(") + builder.WriteString(columns.String()) + builder.WriteString(")") } } else { - source = "(" + query + ")" + builder.WriteString("(") + builder.WriteString(query) + builder.WriteString(")") } - // Create the FIFO pipe - db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) - pipePath, err := db.CreatePipe(ctx, "pg-copy-to") - if err != nil { - return nil, err + builder.WriteString(" TO '") + builder.WriteString(pipePath) + builder.WriteString("' (FORMAT CSV") + + switch options.CopyFormat { + case tree.CopyFormatText, tree.CopyFormatCSV: + if options.Delimiter != nil { + builder.WriteString(", DELIMITER ") + builder.WriteString(options.Delimiter.String()) + } else if options.CopyFormat == tree.CopyFormatText { + builder.WriteString(`, DELIMITER '\t'`) + } + + if options.Quote != nil { + builder.WriteString(", QUOTE ") + builder.WriteString(singleQuotedDuckChar(options.Quote.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + builder.WriteString(`, QUOTE ''`) + } + + if options.Escape != nil { + builder.WriteString(", ESCAPE ") + builder.WriteString(singleQuotedDuckChar(options.Escape.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + builder.WriteString(`, ESCAPE ''`) + } + + if options.Null != nil { + builder.WriteString(`, NULL '`) + builder.WriteString(options.Null.String()) + builder.WriteString(`'`) + } else if options.CopyFormat == tree.CopyFormatText { + builder.WriteString(`, NULL '\N'`) + } + + if options.HasHeader && options.Header { + builder.WriteString(", HEADER") + } + case tree.CopyFormatBinary: + return nil, fmt.Errorf("BINARY format is not supported for COPY TO") } - // Initialize DataWriter - writer := &DataWriter{ + builder.WriteString(")") + + return &DataWriter{ ctx: ctx, - duckSQL: fmt.Sprintf("COPY %s TO '%s' (%s)", source, pipePath, format), + duckSQL: builder.String(), options: options, pipePath: pipePath, - } - - return writer, nil + }, nil } type copyToResult struct { diff --git a/pgtest/psql/copy/basic.sql b/pgtest/psql/copy/basic.sql index 2f37dd75..5cef336c 100644 --- a/pgtest/psql/copy/basic.sql +++ b/pgtest/psql/copy/basic.sql @@ -1,6 +1,6 @@ -CREATE SCHEMA IF NOT EXISTS test_psql_copy; +CREATE SCHEMA IF NOT EXISTS test_psql_copy_from; -USE test_psql_copy; +USE test_psql_copy_from; CREATE TABLE t (a int, b text); diff --git a/pgtest/psql/copy/stdout.sql b/pgtest/psql/copy/stdout.sql new file mode 100644 index 00000000..2abf41f4 --- /dev/null +++ b/pgtest/psql/copy/stdout.sql @@ -0,0 +1,15 @@ +CREATE SCHEMA IF NOT EXISTS test_psql_copy_to; + +USE test_psql_copy_to; + +CREATE TABLE t (a int, b text, c float); + +\o 'stdout.csv' + +COPY t TO STDOUT; + +\copy t (a, b) TO STDOUT (FORMAT CSV); + +COPY t TO STDOUT (FORMAT CSV, HEADER false, DELIMITER '|'); + +\copy (SELECT a * a, b, c + a FROM t) TO STDOUT (FORMAT CSV, HEADER false, DELIMITER '|'); \ No newline at end of file From bf18e146785150d986ef21a95ef51d082738b1c6 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 21 Nov 2024 14:00:42 +0800 Subject: [PATCH 5/8] Typo --- pgserver/dataloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index 5be82aaa..e18a06a2 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -61,7 +61,7 @@ type CsvDataLoader struct { var _ DataLoader = (*CsvDataLoader)(nil) -func NewCsvDataxLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { +func NewCsvDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options *tree.CopyOptions) (DataLoader, error) { // Create the FIFO pipe duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder) pipePath, err := duckBuilder.CreatePipe(ctx, "pg-copy-from") From 435110fecb277fa1144fb6f5c6ef92de6b328485 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 21 Nov 2024 14:36:02 +0800 Subject: [PATCH 6/8] Fix COPY FROM STDIN --- backend/pipe.go | 2 +- pgserver/connection_handler.go | 8 ++--- pgserver/dataloader.go | 63 ++++++++++++++++------------------ pgserver/datawriter.go | 11 +++--- pgtest/psql/copy/basic.sql | 6 +++- 5 files changed, 46 insertions(+), 44 deletions(-) diff --git a/backend/pipe.go b/backend/pipe.go index 8daf3fbc..4a23263a 100644 --- a/backend/pipe.go +++ b/backend/pipe.go @@ -17,7 +17,7 @@ func (h *DuckBuilder) CreatePipe(ctx *sql.Context, subdir string) (string, error } pipeName := strconv.Itoa(int(ctx.ID())) + ".pipe" pipePath := filepath.Join(pipeDir, pipeName) - ctx.GetLogger().Debugln("Creating FIFO pipe for COPY operation:", pipePath) + ctx.GetLogger().Debugln("Creating FIFO pipe for LOAD/COPY operation:", pipePath) if err := syscall.Mkfifo(pipePath, 0600); err != nil { return "", err } diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 681f20ae..dad9cc16 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -38,7 +38,6 @@ import ( "github.com/jackc/pgx/v5/pgproto3" "github.com/jackc/pgx/v5/pgtype" "github.com/sirupsen/logrus" - // Import the new datawriter package ) // ConnectionHandler is responsible for the entire lifecycle of a user connection: receiving messages they send, @@ -297,7 +296,7 @@ func (h *ConnectionHandler) receiveMessage() (bool, error) { if HandlePanics { defer func() { if r := recover(); r != nil { - fmt.Printf("Listener recovered panic: %v\n%s\n", r, string(debug.Stack())) + h.logger.Debugf("Listener recovered panic: %v\n%s\n", r, string(debug.Stack())) var eomErr error if rErr, ok := r.(error); ok { @@ -308,7 +307,7 @@ func (h *ConnectionHandler) receiveMessage() (bool, error) { if !endOfMessages && h.waitForSync { if syncErr := h.discardToSync(); syncErr != nil { - fmt.Println(syncErr.Error()) + h.logger.Error(syncErr.Error()) } } h.endOfMessages(eomErr) @@ -350,7 +349,7 @@ func (h *ConnectionHandler) receiveMessage() (bool, error) { // handleMessages processes the message provided and returns status flags indicating what the connection should do next. // If the |stop| response parameter is true, it indicates that the connection should be closed by the caller. If the // |endOfMessages| response parameter is true, it indicates that no more messages are expected for the current operation -// and a READY FOR QUERY message should be sent back to the client so it can send the next query. +// and a READY FOR QUERY message should be sent back to the client, so it can send the next query. func (h *ConnectionHandler) handleMessage(msg pgproto3.Message) (stop, endOfMessages bool, err error) { logrus.Tracef("Handling message: %T", msg) switch message := msg.(type) { @@ -456,6 +455,7 @@ func (h *ConnectionHandler) handleQueryOutsideEngine(query ConvertedQuery) (hand return false, true, nil } +// handleParse handles a parse message, returning any error that occurs func (h *ConnectionHandler) handleParse(message *pgproto3.Parse) error { h.waitForSync = true diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index e18a06a2..606d3a03 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -128,45 +128,42 @@ func (loader *CsvDataLoader) buildSQL() string { b.WriteString(" FROM '") b.WriteString(loader.pipePath) - b.WriteString("' (FORMAT CSV") + b.WriteString("' (FORMAT CSV, AUTO_DETECT false") options := loader.options - switch options.CopyFormat { - case tree.CopyFormatText, tree.CopyFormatCSV: - if options.Delimiter != nil { - b.WriteString(", SEP ") - b.WriteString(options.Delimiter.String()) - } else if options.CopyFormat == tree.CopyFormatText { - b.WriteString(`, SEP '\t'`) - } - - if options.Quote != nil { - b.WriteString(", QUOTE ") - b.WriteString(singleQuotedDuckChar(options.Quote.RawString())) - } else if options.CopyFormat == tree.CopyFormatText { - b.WriteString(`, QUOTE ''`) - } - - if options.Escape != nil { - b.WriteString(", ESCAPE ") - b.WriteString(singleQuotedDuckChar(options.Escape.RawString())) - } else if options.CopyFormat == tree.CopyFormatText { - b.WriteString(`, ESCAPE ''`) - } - - if options.Null != nil { - b.WriteString(", NULLSTR ") - b.WriteString(options.Null.String()) - } else if options.CopyFormat == tree.CopyFormatText { - b.WriteString(`, NULLSTR '\N'`) - } - } - if options.HasHeader && options.Header { b.WriteString(", HEADER") } + if options.Delimiter != nil { + b.WriteString(", SEP ") + b.WriteString(options.Delimiter.String()) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, SEP '\t'`) + } + + if options.Quote != nil { + b.WriteString(", QUOTE ") + b.WriteString(singleQuotedDuckChar(options.Quote.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, QUOTE ''`) + } + + if options.Escape != nil { + b.WriteString(", ESCAPE ") + b.WriteString(singleQuotedDuckChar(options.Escape.RawString())) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, ESCAPE ''`) + } + + if options.Null != nil { + b.WriteString(", NULLSTR ") + b.WriteString(options.Null.String()) + } else if options.CopyFormat == tree.CopyFormatText { + b.WriteString(`, NULLSTR '\N'`) + } + b.WriteString(")") return b.String() @@ -179,7 +176,7 @@ func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) { loader.ctx.GetLogger().Error(err) loader.err.Store(&err) // Open the pipe once to unblock the writer - loader.errPipe, _ = os.OpenFile(pipePath, os.O_RDONLY, 0600) + loader.errPipe, _ = os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) return } diff --git a/pgserver/datawriter.go b/pgserver/datawriter.go index 972c7efe..ddfbdc81 100644 --- a/pgserver/datawriter.go +++ b/pgserver/datawriter.go @@ -60,6 +60,10 @@ func NewDataWriter( builder.WriteString(pipePath) builder.WriteString("' (FORMAT CSV") + if options.HasHeader && options.Header { + builder.WriteString(", HEADER") + } + switch options.CopyFormat { case tree.CopyFormatText, tree.CopyFormatCSV: if options.Delimiter != nil { @@ -84,16 +88,13 @@ func NewDataWriter( } if options.Null != nil { - builder.WriteString(`, NULL '`) + builder.WriteString(`, NULLSTR '`) builder.WriteString(options.Null.String()) builder.WriteString(`'`) } else if options.CopyFormat == tree.CopyFormatText { - builder.WriteString(`, NULL '\N'`) + builder.WriteString(`, NULLSTR '\N'`) } - if options.HasHeader && options.Header { - builder.WriteString(", HEADER") - } case tree.CopyFormatBinary: return nil, fmt.Errorf("BINARY format is not supported for COPY TO") } diff --git a/pgtest/psql/copy/basic.sql b/pgtest/psql/copy/basic.sql index 5cef336c..79e6e00d 100644 --- a/pgtest/psql/copy/basic.sql +++ b/pgtest/psql/copy/basic.sql @@ -4,4 +4,8 @@ USE test_psql_copy_from; CREATE TABLE t (a int, b text); -\copy t FROM 'pgtest/testdata/basic.csv' WITH DELIMITER ',' CSV HEADER; \ No newline at end of file +\copy t FROM 'pgtest/testdata/basic.csv' (FORMAT CSV, DELIMITER ',', HEADER); + +\copy t FROM 'pgtest/testdata/basic.csv' WITH DELIMITER ',' CSV HEADER; + +SELECT * FROM t; \ No newline at end of file From 1858ae20814f3fe8bd8907deb202c7424af884af Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 21 Nov 2024 15:20:09 +0800 Subject: [PATCH 7/8] Fix deadlock in COPY FROM --- pgserver/dataloader.go | 5 ++++- pgserver/datawriter.go | 14 +++++++++----- pgtest/psql/copy/stdout.sql | 6 +++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/pgserver/dataloader.go b/pgserver/dataloader.go index 606d3a03..8bc6056f 100644 --- a/pgserver/dataloader.go +++ b/pgserver/dataloader.go @@ -132,8 +132,11 @@ func (loader *CsvDataLoader) buildSQL() string { options := loader.options + b.WriteString(", HEADER ") if options.HasHeader && options.Header { - b.WriteString(", HEADER") + b.WriteString("true") + } else { + b.WriteString("false") } if options.Delimiter != nil { diff --git a/pgserver/datawriter.go b/pgserver/datawriter.go index ddfbdc81..fc15e893 100644 --- a/pgserver/datawriter.go +++ b/pgserver/datawriter.go @@ -58,14 +58,18 @@ func NewDataWriter( builder.WriteString(" TO '") builder.WriteString(pipePath) - builder.WriteString("' (FORMAT CSV") - - if options.HasHeader && options.Header { - builder.WriteString(", HEADER") - } switch options.CopyFormat { case tree.CopyFormatText, tree.CopyFormatCSV: + builder.WriteString("' (FORMAT CSV") + + builder.WriteString(", HEADER ") + if options.HasHeader && options.Header { + builder.WriteString("true") + } else { + builder.WriteString("false") + } + if options.Delimiter != nil { builder.WriteString(", DELIMITER ") builder.WriteString(options.Delimiter.String()) diff --git a/pgtest/psql/copy/stdout.sql b/pgtest/psql/copy/stdout.sql index 2abf41f4..6993e655 100644 --- a/pgtest/psql/copy/stdout.sql +++ b/pgtest/psql/copy/stdout.sql @@ -4,6 +4,8 @@ USE test_psql_copy_to; CREATE TABLE t (a int, b text, c float); +INSERT INTO t VALUES (1, 'one', 1.1), (2, 'two', 2.2), (3, 'three', 3.3), (4, 'four', 4.4), (5, 'five', 5.5); + \o 'stdout.csv' COPY t TO STDOUT; @@ -12,4 +14,6 @@ COPY t TO STDOUT; COPY t TO STDOUT (FORMAT CSV, HEADER false, DELIMITER '|'); -\copy (SELECT a * a, b, c + a FROM t) TO STDOUT (FORMAT CSV, HEADER false, DELIMITER '|'); \ No newline at end of file +\copy (SELECT a * a, b, c + a FROM t) TO STDOUT (FORMAT CSV, HEADER false, DELIMITER '|'); + +\echo `cat stdout.csv` \ No newline at end of file From 458403cec19ad73f28da2238b2c95e0067019312 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 21 Nov 2024 15:44:31 +0800 Subject: [PATCH 8/8] Try to eliminate extra message --- pgserver/connection_handler.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index 0a82b02e..26d9f1df 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -1340,8 +1340,11 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre return err } + done := make(chan struct{}) var sendErr atomic.Value go func() { + defer close(done) + // Open the pipe for reading. ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath) pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) @@ -1353,6 +1356,10 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre defer pipe.Close() ctx.GetLogger().Debug("Copying data from the pipe to the client") + defer func() { + ctx.GetLogger().Debug("Finished copying data from the pipe to the client") + }() + buf := make([]byte, 1<<20) // 1MB buffer for { n, err := pipe.Read(buf) @@ -1380,9 +1387,12 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre select { case <-ctx.Done(): // Context is canceled + <-done err, _ := sendErr.Load().(error) return errors.Join(ctx.Err(), err) case result := <-ch: + <-done + if result.Err != nil { return fmt.Errorf("failed to copy data: %w", result.Err) }