Skip to content

Commit

Permalink
feat: support COPY ... TO STDOUT (FORMAT parquet|json)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 committed Nov 21, 2024
1 parent 9d087d6 commit 7839a3b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 16 deletions.
50 changes: 39 additions & 11 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
gms "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -602,8 +603,15 @@ func (h *ConnectionHandler) handleQueryOutsideEngine(query ConvertedQuery) (hand
return true, false, h.handleCopyFromStdinQuery(query, stmt, h.Conn())
}
case *tree.CopyTo:
return true, true, h.handleCopyToStdout(query, stmt)
return true, true, h.handleCopyToStdout(query, stmt, "", tree.CopyFormatBinary /* unused */)

Check failure on line 606 in pgserver/connection_handler.go

View workflow job for this annotation

GitHub Actions / build

not enough arguments in call to h.handleCopyToStdout

Check failure on line 606 in pgserver/connection_handler.go

View workflow job for this annotation

GitHub Actions / build

not enough arguments in call to h.handleCopyToStdout

Check failure on line 606 in pgserver/connection_handler.go

View workflow job for this annotation

GitHub Actions / build (true)

not enough arguments in call to h.handleCopyToStdout

Check failure on line 606 in pgserver/connection_handler.go

View workflow job for this annotation

GitHub Actions / build

not enough arguments in call to h.handleCopyToStdout
}

if query.StatementTag == "COPY" {
if subquery, format, options, ok := ParseCopy(query.String); ok {
return true, true, h.handleCopyToStdout(query, nil, subquery, format, options)
}
}

return false, true, nil
}

Expand Down Expand Up @@ -1293,7 +1301,7 @@ func returnsRow(tag string) bool {
}
}

func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tree.CopyTo) error {
func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tree.CopyTo, subquery string, format tree.CopyFormat, rawOptions string) error {
ctx, err := h.duckHandler.NewContext(context.Background(), h.mysqlConn, query.String)
if err != nil {
return err
Expand All @@ -1305,20 +1313,40 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre
defer cancel()
ctx = ctx.WithContext(childCtx)

table, err := ValidateCopyTo(copyTo, ctx)
if err != nil {
return err
}
var (
schema string
table sql.Table
columns tree.NameList
stmt string
options *tree.CopyOptions
)

var stmt string
if copyTo.Statement != nil {
stmt = copyTo.Statement.String()
if copyTo != nil {
// PG-parsable COPY TO
table, err = ValidateCopyTo(copyTo, ctx)
if err != nil {
return err
}
if copyTo.Statement != nil {
stmt = `(` + copyTo.Statement.String() + `)`
}
schema = copyTo.Table.Schema()
columns = copyTo.Columns
options = &copyTo.Options
} else {
// Non-PG-parsable COPY TO, which is parsed via regex.
stmt = subquery
options = &tree.CopyOptions{
CopyFormat: format,
HasFormat: true,
}
}

writer, err := NewDataWriter(
ctx, h.duckHandler,
copyTo.Table.Schema(), table, copyTo.Columns,
schema, table, columns,
stmt,
&copyTo.Options,
options, rawOptions,
)
if err != nil {
return err
Expand Down
53 changes: 53 additions & 0 deletions pgserver/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pgserver

import (
"regexp"
"strings"

"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
)

const (
CopyFormatParquet = tree.CopyFormatCSV + 1
CopyFormatJSON = tree.CopyFormatCSV + 2
)

var (
// We are supporting the parquet/... formats for COPY TO, but
// COPY ... TO STDOUT [WITH] (FORMAT PARQUET, OPT1 v1, OPT2, OPT3 v3, ...)
// Let's match them with regex and extract the ... part.
// Update regex to capture FORMAT and other options
reCopyToFormat = regexp.MustCompile(`(?i)^COPY\s+(.*?)\s+TO\s+STDOUT(?:\s+(?:WITH\s*)?\(\s*(?:FORMAT\s+(\w+)\s*,?\s*)?(.*?)\s*\))?$`)
)

func ParseCopy(stmt string) (query string, format tree.CopyFormat, options string, ok bool) {
m := reCopyToFormat.FindStringSubmatch(stmt)
if m == nil {
return "", 0, "", false
}
query = strings.TrimSpace(m[1])

var formatStr string
if m[2] != "" {
formatStr = strings.ToUpper(m[2])
} else {
formatStr = "TEXT"
}

options = strings.TrimSpace(m[3])

switch formatStr {
case "PARQUET":
format = CopyFormatParquet
case "JSON":
format = CopyFormatJSON
case "CSV":
format = tree.CopyFormatCSV
case "BINARY":
format = tree.CopyFormatBinary
case "", "TEXT":
format = tree.CopyFormatText
}

return query, format, options, true
}
32 changes: 27 additions & 5 deletions pgserver/datawriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewDataWriter(
handler *DuckHandler,
schema string, table sql.Table, columns tree.NameList,
query string,
options *tree.CopyOptions,
options *tree.CopyOptions, rawOptions string,
) (*DataWriter, error) {
// Create the FIFO pipe
db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
Expand All @@ -51,18 +51,41 @@ func NewDataWriter(
builder.WriteString(")")
}
} else {
builder.WriteString("(")
// the parentheses have already been added
builder.WriteString(query)
builder.WriteString(")")
}

builder.WriteString(" TO '")
builder.WriteString(pipePath)

switch options.CopyFormat {
case CopyFormatParquet:
builder.WriteString("' (FORMAT PARQUET")
if rawOptions != "" {
builder.WriteString(", ")
builder.WriteString(rawOptions)
}
builder.WriteString(")")

case CopyFormatJSON:
builder.WriteString("' (FORMAT JSON")
if rawOptions != "" {
builder.WriteString(", ")
builder.WriteString(rawOptions)
}
builder.WriteString(")")

case tree.CopyFormatText, tree.CopyFormatCSV:
builder.WriteString("' (FORMAT CSV")

if rawOptions != "" {
// TODO(fan): For TEXT format, we should add some default options if not specified.
builder.WriteString(", ")
builder.WriteString(rawOptions)
builder.WriteString(")")
break
}

builder.WriteString(", HEADER ")
if options.HasHeader && options.Header {
builder.WriteString("true")
Expand Down Expand Up @@ -98,13 +121,12 @@ func NewDataWriter(
} else if options.CopyFormat == tree.CopyFormatText {
builder.WriteString(`, NULLSTR '\N'`)
}
builder.WriteString(")")

case tree.CopyFormatBinary:
return nil, fmt.Errorf("BINARY format is not supported for COPY TO")
}

builder.WriteString(")")

return &DataWriter{
ctx: ctx,
duckSQL: builder.String(),
Expand Down

0 comments on commit 7839a3b

Please sign in to comment.