Skip to content

Commit

Permalink
Merge branch 'main' into 119-zero-etl-pg
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 committed Nov 22, 2024
2 parents 9b19ba1 + 34d91fd commit 1de37f2
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pipes/
__debug_*
.DS_Store
*.csv
*.parquet
Binary file added logo/myduck-logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
50 changes: 39 additions & 11 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
gms "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -602,8 +603,15 @@ func (h *ConnectionHandler) handleQueryOutsideEngine(query ConvertedQuery) (hand
return true, false, h.handleCopyFromStdinQuery(query, stmt, h.Conn())
}
case *tree.CopyTo:
return true, true, h.handleCopyToStdout(query, stmt)
return true, true, h.handleCopyToStdout(query, stmt, "" /* unused */, tree.CopyFormatBinary, "")
}

if query.StatementTag == "COPY" {
if subquery, format, options, ok := ParseCopy(query.String); ok {
return true, true, h.handleCopyToStdout(query, nil, subquery, format, options)
}
}

return false, true, nil
}

Expand Down Expand Up @@ -1307,7 +1315,7 @@ func returnsRow(tag string) bool {
}
}

func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tree.CopyTo) error {
func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tree.CopyTo, subquery string, format tree.CopyFormat, rawOptions string) error {
ctx, err := h.duckHandler.NewContext(context.Background(), h.mysqlConn, query.String)
if err != nil {
return err
Expand All @@ -1319,20 +1327,40 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre
defer cancel()
ctx = ctx.WithContext(childCtx)

table, err := ValidateCopyTo(copyTo, ctx)
if err != nil {
return err
}
var (
schema string
table sql.Table
columns tree.NameList
stmt string
options *tree.CopyOptions
)

var stmt string
if copyTo.Statement != nil {
stmt = copyTo.Statement.String()
if copyTo != nil {
// PG-parsable COPY TO
table, err = ValidateCopyTo(copyTo, ctx)
if err != nil {
return err
}
if copyTo.Statement != nil {
stmt = `(` + copyTo.Statement.String() + `)`
}
schema = copyTo.Table.Schema()
columns = copyTo.Columns
options = &copyTo.Options
} else {
// Non-PG-parsable COPY TO, which is parsed via regex.
stmt = subquery
options = &tree.CopyOptions{
CopyFormat: format,
HasFormat: true,
}
}

writer, err := NewDataWriter(
ctx, h.duckHandler,
copyTo.Table.Schema(), table, copyTo.Columns,
schema, table, columns,
stmt,
&copyTo.Options,
options, rawOptions,
)
if err != nil {
return err
Expand Down
56 changes: 56 additions & 0 deletions pgserver/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package pgserver

import (
"regexp"
"strings"

"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
)

const (
CopyFormatParquet = tree.CopyFormatCSV + 1
CopyFormatJSON = tree.CopyFormatCSV + 2
)

var (
// We are supporting the parquet/... formats for COPY TO, but
// COPY ... TO STDOUT [WITH] (FORMAT PARQUET, OPT1 v1, OPT2, OPT3 v3, ...)
// Let's match them with regex and extract the ... part.
// Update regex to capture FORMAT and other options
reCopyToFormat = regexp.MustCompile(`(?i)^COPY\s+(.*?)\s+TO\s+STDOUT(?:\s+(?:WITH\s*)?\(\s*(?:FORMAT\s+(\w+)\s*,?\s*)?(.*?)\s*\))?$`)
)

func ParseCopy(stmt string) (query string, format tree.CopyFormat, options string, ok bool) {
stmt = RemoveComments(stmt)
stmt = sql.RemoveSpaceAndDelimiter(stmt, ';')
m := reCopyToFormat.FindStringSubmatch(stmt)
if m == nil {
return "", 0, "", false
}
query = strings.TrimSpace(m[1])

var formatStr string
if m[2] != "" {
formatStr = strings.ToUpper(m[2])
} else {
formatStr = "TEXT"
}

options = strings.TrimSpace(m[3])

switch formatStr {
case "PARQUET":
format = CopyFormatParquet
case "JSON":
format = CopyFormatJSON
case "CSV":
format = tree.CopyFormatCSV
case "BINARY":
format = tree.CopyFormatBinary
case "", "TEXT":
format = tree.CopyFormatText
}

return query, format, options, true
}
32 changes: 27 additions & 5 deletions pgserver/datawriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewDataWriter(
handler *DuckHandler,
schema string, table sql.Table, columns tree.NameList,
query string,
options *tree.CopyOptions,
options *tree.CopyOptions, rawOptions string,
) (*DataWriter, error) {
// Create the FIFO pipe
db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
Expand All @@ -51,18 +51,41 @@ func NewDataWriter(
builder.WriteString(")")
}
} else {
builder.WriteString("(")
// the parentheses have already been added
builder.WriteString(query)
builder.WriteString(")")
}

builder.WriteString(" TO '")
builder.WriteString(pipePath)

switch options.CopyFormat {
case CopyFormatParquet:
builder.WriteString("' (FORMAT PARQUET")
if rawOptions != "" {
builder.WriteString(", ")
builder.WriteString(rawOptions)
}
builder.WriteString(")")

case CopyFormatJSON:
builder.WriteString("' (FORMAT JSON")
if rawOptions != "" {
builder.WriteString(", ")
builder.WriteString(rawOptions)
}
builder.WriteString(")")

case tree.CopyFormatText, tree.CopyFormatCSV:
builder.WriteString("' (FORMAT CSV")

if rawOptions != "" {
// TODO(fan): For TEXT format, we should add some default options if not specified.
builder.WriteString(", ")
builder.WriteString(rawOptions)
builder.WriteString(")")
break
}

builder.WriteString(", HEADER ")
if options.HasHeader && options.Header {
builder.WriteString("true")
Expand Down Expand Up @@ -98,13 +121,12 @@ func NewDataWriter(
} else if options.CopyFormat == tree.CopyFormatText {
builder.WriteString(`, NULLSTR '\N'`)
}
builder.WriteString(")")

case tree.CopyFormatBinary:
return nil, fmt.Errorf("BINARY format is not supported for COPY TO")
}

builder.WriteString(")")

return &DataWriter{
ctx: ctx,
duckSQL: builder.String(),
Expand Down
151 changes: 143 additions & 8 deletions pgserver/stmt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pgserver

import (
"bytes"
"strings"
"unicode"

Expand Down Expand Up @@ -82,18 +83,18 @@ func GetStatementTag(stmt *duckdb.Stmt) string {
}

func GuessStatementTag(query string) string {
// Remove leading line and block comments
// Remove leading comments
query = RemoveLeadingComments(query)
// Remove trailing semicolon
query = sql.RemoveSpaceAndDelimiter(query, ';')

// Guess the statement tag by looking for the first space in the query.
// Guess the statement tag by looking for the first non-identifier character
for i, c := range query {
if unicode.IsSpace(c) {
if !unicode.IsLetter(c) && c != '_' {
return strings.ToUpper(query[:i])
}
}
return ""
return strings.ToUpper(query)
}

func RemoveLeadingComments(query string) string {
Expand All @@ -109,12 +110,28 @@ func RemoveLeadingComments(query string) string {
}
i += end + 1
} else if strings.HasPrefix(query[i:], "/*") {
// Skip block comment
end := strings.Index(query[i+2:], "*/")
if end == -1 {
// Skip block comment with nesting support
nestLevel := 1
pos := i + 2
for pos < n && nestLevel > 0 {
if pos+1 < n {
if query[pos] == '/' && query[pos+1] == '*' {
nestLevel++
pos += 2
continue
}
if query[pos] == '*' && query[pos+1] == '/' {
nestLevel--
pos += 2
continue
}
}
pos++
}
if nestLevel > 0 {
return ""
}
i += end + 4
i = pos
} else if unicode.IsSpace(rune(query[i])) {
// Skip whitespace
i++
Expand All @@ -124,3 +141,121 @@ func RemoveLeadingComments(query string) string {
}
return query[i:]
}

// RemoveComments removes comments from a query string.
// It supports line comments (--), block comments (/* ... */), and quoted strings.
// Author: Claude Sonnet 3.5
func RemoveComments(query string) string {
var buf bytes.Buffer
runes := []rune(query)
length := len(runes)
pos := 0

for pos < length {
// Handle line comments
if pos+1 < length && runes[pos] == '-' && runes[pos+1] == '-' {
pos += 2
for pos < length && runes[pos] != '\n' {
pos++
}
if pos < length {
buf.WriteRune('\n')
pos++
}
continue
}

// Handle block comments
if pos+1 < length && runes[pos] == '/' && runes[pos+1] == '*' {
nestLevel := 1
pos += 2
for pos < length && nestLevel > 0 {
if pos+1 < length {
if runes[pos] == '/' && runes[pos+1] == '*' {
nestLevel++
pos += 2
continue
}
if runes[pos] == '*' && runes[pos+1] == '/' {
nestLevel--
pos += 2
continue
}
}
pos++
}
continue
}

// Handle string literals
if runes[pos] == '\'' || (pos+1 < length && runes[pos] == 'E' && runes[pos+1] == '\'') {
if runes[pos] == 'E' {
buf.WriteRune('E')
pos++
}
buf.WriteRune('\'')
pos++
for pos < length {
if runes[pos] == '\'' {
buf.WriteRune('\'')
pos++
break
}
if pos+1 < length && runes[pos] == '\\' {
buf.WriteRune('\\')
buf.WriteRune(runes[pos+1])
pos += 2
continue
}
buf.WriteRune(runes[pos])
pos++
}
continue
}

// Handle dollar-quoted strings
if runes[pos] == '$' {
start := pos
tagEnd := pos + 1
for tagEnd < length && (unicode.IsLetter(runes[tagEnd]) || unicode.IsDigit(runes[tagEnd]) || runes[tagEnd] == '_') {
tagEnd++
}
if tagEnd < length && runes[tagEnd] == '$' {
tag := string(runes[start : tagEnd+1])
buf.WriteString(tag)
pos = tagEnd + 1
for pos < length {
if pos+len(tag) <= length && string(runes[pos:pos+len(tag)]) == tag {
buf.WriteString(tag)
pos += len(tag)
break
}
buf.WriteRune(runes[pos])
pos++
}
continue
}
}

// Handle quoted identifiers
if runes[pos] == '"' {
buf.WriteRune('"')
pos++
for pos < length {
if runes[pos] == '"' {
buf.WriteRune('"')
pos++
break
}
buf.WriteRune(runes[pos])
pos++
}
continue
}

buf.WriteRune(runes[pos])
pos++
}

return buf.String()
}
Loading

0 comments on commit 1de37f2

Please sign in to comment.