Skip to content

Commit

Permalink
feat: arrow in, arrow out (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Nov 22, 2024
1 parent 771eb68 commit 2943d53
Show file tree
Hide file tree
Showing 11 changed files with 1,033 additions and 135 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/psql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: |
go get .
pip3 install "sqlglot[rs]"
pip3 install "sqlglot[rs]" pyarrow pandas
curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ __debug_*
.DS_Store
*.csv
*.parquet
*.arrow
147 changes: 147 additions & 0 deletions pgserver/arrowloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package pgserver

import (
"context"
"os"
"strconv"
"strings"

"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/backend"
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
"github.com/marcboeker/go-duckdb"
)

type ArrowDataLoader struct {
PipeDataLoader
arrowName string
options string
}

var _ DataLoader = (*ArrowDataLoader)(nil)

func NewArrowDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options string) (DataLoader, error) {
// Create the FIFO pipe
duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
pipePath, err := duckBuilder.CreatePipe(ctx, "pg-from-arrow")
if err != nil {
return nil, err
}
arrowName := "__sys_copy_from_arrow_" + strconv.Itoa(int(ctx.ID())) + "__"

// Create cancelable context
childCtx, cancel := context.WithCancel(ctx)
ctx.Context = childCtx

loader := &ArrowDataLoader{
PipeDataLoader: PipeDataLoader{
ctx: ctx,
cancel: cancel,
schema: schema,
table: table,
columns: columns,
pipePath: pipePath,
rowCount: make(chan int64, 1),
logger: ctx.GetLogger(),
},
arrowName: arrowName,
options: options,
}
loader.read = func() {
loader.executeInsert(loader.buildSQL(), pipePath)
}

return loader, nil
}

// buildSQL builds the DuckDB INSERT statement.
func (loader *ArrowDataLoader) buildSQL() string {
var b strings.Builder
b.Grow(256)

b.WriteString("INSERT INTO ")
if loader.schema != "" {
b.WriteString(loader.schema)
b.WriteString(".")
}
b.WriteString(loader.table.Name())

if len(loader.columns) > 0 {
b.WriteString(" (")
b.WriteString(loader.columns.String())
b.WriteString(")")
}

b.WriteString(" FROM ")
b.WriteString(loader.arrowName)

return b.String()
}

func (loader *ArrowDataLoader) executeInsert(sql string, pipePath string) {
defer close(loader.rowCount)

// Open the pipe for reading.
loader.logger.Debugf("Opening pipe for reading: %s", pipePath)
pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
loader.err.Store(&err)
// Open the pipe once to unblock the writer
pipe, _ = os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
loader.errPipe.Store(pipe)
return
}

// Create an Arrow IPC reader from the pipe.
loader.logger.Debugf("Creating Arrow IPC reader from pipe: %s", pipePath)
arrowReader, err := ipc.NewReader(pipe)
if err != nil {
loader.err.Store(&err)
return
}
defer arrowReader.Release()

conn, err := adapter.GetConn(loader.ctx)
if err != nil {
loader.err.Store(&err)
return
}

// Register the Arrow IPC reader to DuckDB.
loader.logger.Debugf("Registering Arrow IPC reader into DuckDB: %s", loader.arrowName)
var release func()
if err := conn.Raw(func(driverConn any) error {
conn := driverConn.(*duckdb.Conn)
arrow, err := duckdb.NewArrowFromConn(conn)
if err != nil {
return err
}

release, err = arrow.RegisterView(arrowReader, loader.arrowName)
return err
}); err != nil {
loader.err.Store(&err)
return
}
defer release()

// Execute the INSERT statement.
// This will block until the reader has finished reading the data.
loader.logger.Debugln("Executing SQL:", sql)
result, err := conn.ExecContext(loader.ctx, sql)
if err != nil {
loader.err.Store(&err)
return
}

rows, err := result.RowsAffected()
if err != nil {
loader.err.Store(&err)
return
}

loader.logger.Debugf("Inserted %d rows", rows)
loader.rowCount <- rows
}
129 changes: 129 additions & 0 deletions pgserver/arrowwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package pgserver

import (
"os"
"strings"

"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/catalog"
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
"github.com/marcboeker/go-duckdb"
)

type ArrowWriter struct {
ctx *sql.Context
duckSQL string
pipePath string
rawOptions string
}

func NewArrowWriter(
ctx *sql.Context,
handler *DuckHandler,
schema string, table sql.Table, columns tree.NameList,
query string,
rawOptions string,
) (*ArrowWriter, error) {
// Create the FIFO pipe
db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
pipePath, err := db.CreatePipe(ctx, "pg-to-arrow")
if err != nil {
return nil, err
}

var builder strings.Builder
builder.Grow(128)

if table != nil {
// https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax
// FROM table_name [ SELECT column_list ]
builder.WriteString("FROM ")
if schema != "" {
builder.WriteString(catalog.QuoteIdentifierANSI(schema))
builder.WriteString(".")
}
builder.WriteString(catalog.QuoteIdentifierANSI(table.Name()))
if columns != nil {
builder.WriteString(" SELECT ")
builder.WriteString(columns.String())
}
} else {
builder.WriteString(query)
}

return &ArrowWriter{
ctx: ctx,
duckSQL: builder.String(),
pipePath: pipePath,
rawOptions: rawOptions, // TODO(fan): parse rawOptions
}, nil
}

func (dw *ArrowWriter) Start() (string, chan CopyToResult, error) {
// Execute the statement in a separate goroutine.
ch := make(chan CopyToResult, 1)
go func() {
defer os.Remove(dw.pipePath)
defer close(ch)

dw.ctx.GetLogger().Tracef("Executing statement via Arrow interface: %s", dw.duckSQL)
conn, err := adapter.GetConn(dw.ctx)
if err != nil {
ch <- CopyToResult{Err: err}
return
}

// Open the pipe for writing.
// This operation will block until the reader opens the pipe for reading.
pipe, err := os.OpenFile(dw.pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
ch <- CopyToResult{Err: err}
return
}
defer pipe.Close()

rowCount := int64(0)

if err := conn.Raw(func(driverConn any) error {
conn := driverConn.(*duckdb.Conn)
arrow, err := duckdb.NewArrowFromConn(conn)
if err != nil {
return err
}

// TODO(fan): Currently, this API materializes the entire result set in memory.
// We should consider modifying the API to allow streaming the result set.
recordReader, err := arrow.QueryContext(dw.ctx, dw.duckSQL)
if err != nil {
return err
}
defer recordReader.Release()

writer := ipc.NewWriter(pipe, ipc.WithSchema(recordReader.Schema()))
defer writer.Close()

for recordReader.Next() {
record := recordReader.Record()
rowCount += record.NumRows()
if err := writer.Write(record); err != nil {
return err
}
}
return recordReader.Err()
}); err != nil {
ch <- CopyToResult{Err: err}
return
}

ch <- CopyToResult{RowCount: rowCount}
}()

return dw.pipePath, ch, nil
}

func (dw *ArrowWriter) Close() {
os.Remove(dw.pipePath)
}
4 changes: 4 additions & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type copyFromStdinState struct {
copyFromStdinNode *tree.CopyFrom
// targetTable stores the targetTable that the data is being loaded into.
targetTable sql.InsertableTable

// For non-PG-parsable COPY FROM
rawOptions string

// dataLoader is the implementation of DataLoader that is used to load each individual CopyData chunk into the
// target table.
dataLoader DataLoader
Expand Down
Loading

0 comments on commit 2943d53

Please sign in to comment.