Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: arrow in, arrow out #205

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/psql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: |
go get .

pip3 install "sqlglot[rs]"
pip3 install "sqlglot[rs]" pyarrow pandas

curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ __debug_*
.DS_Store
*.csv
*.parquet
*.arrow
147 changes: 147 additions & 0 deletions pgserver/arrowloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package pgserver

import (
"context"
"os"
"strconv"
"strings"

"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/backend"
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
"github.com/marcboeker/go-duckdb"
)

type ArrowDataLoader struct {
PipeDataLoader
arrowName string
options string
}

var _ DataLoader = (*ArrowDataLoader)(nil)

func NewArrowDataLoader(ctx *sql.Context, handler *DuckHandler, schema string, table sql.InsertableTable, columns tree.NameList, options string) (DataLoader, error) {
// Create the FIFO pipe
duckBuilder := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
pipePath, err := duckBuilder.CreatePipe(ctx, "pg-from-arrow")
if err != nil {
return nil, err
}
arrowName := "__sys_copy_from_arrow_" + strconv.Itoa(int(ctx.ID())) + "__"

// Create cancelable context
childCtx, cancel := context.WithCancel(ctx)
ctx.Context = childCtx

loader := &ArrowDataLoader{
PipeDataLoader: PipeDataLoader{
ctx: ctx,
cancel: cancel,
schema: schema,
table: table,
columns: columns,
pipePath: pipePath,
rowCount: make(chan int64, 1),
logger: ctx.GetLogger(),
},
arrowName: arrowName,
options: options,
}
loader.read = func() {
loader.executeInsert(loader.buildSQL(), pipePath)
}

return loader, nil
}

// buildSQL builds the DuckDB INSERT statement.
func (loader *ArrowDataLoader) buildSQL() string {
var b strings.Builder
b.Grow(256)

b.WriteString("INSERT INTO ")
if loader.schema != "" {
b.WriteString(loader.schema)
b.WriteString(".")
}
b.WriteString(loader.table.Name())

if len(loader.columns) > 0 {
b.WriteString(" (")
b.WriteString(loader.columns.String())
b.WriteString(")")
}

b.WriteString(" FROM ")
b.WriteString(loader.arrowName)

return b.String()
}

func (loader *ArrowDataLoader) executeInsert(sql string, pipePath string) {
defer close(loader.rowCount)

// Open the pipe for reading.
loader.logger.Debugf("Opening pipe for reading: %s", pipePath)
pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
loader.err.Store(&err)
// Open the pipe once to unblock the writer
pipe, _ = os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
loader.errPipe.Store(pipe)
return
}

// Create an Arrow IPC reader from the pipe.
loader.logger.Debugf("Creating Arrow IPC reader from pipe: %s", pipePath)
arrowReader, err := ipc.NewReader(pipe)
if err != nil {
loader.err.Store(&err)
return
}
defer arrowReader.Release()

conn, err := adapter.GetConn(loader.ctx)
if err != nil {
loader.err.Store(&err)
return
}

// Register the Arrow IPC reader to DuckDB.
loader.logger.Debugf("Registering Arrow IPC reader into DuckDB: %s", loader.arrowName)
var release func()
if err := conn.Raw(func(driverConn any) error {
conn := driverConn.(*duckdb.Conn)
arrow, err := duckdb.NewArrowFromConn(conn)
if err != nil {
return err
}

release, err = arrow.RegisterView(arrowReader, loader.arrowName)
return err
}); err != nil {
loader.err.Store(&err)
return
}
defer release()

// Execute the INSERT statement.
// This will block until the reader has finished reading the data.
loader.logger.Debugln("Executing SQL:", sql)
result, err := conn.ExecContext(loader.ctx, sql)
if err != nil {
loader.err.Store(&err)
return
}

rows, err := result.RowsAffected()
if err != nil {
loader.err.Store(&err)
return
}

loader.logger.Debugf("Inserted %d rows", rows)
loader.rowCount <- rows
}
129 changes: 129 additions & 0 deletions pgserver/arrowwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package pgserver

import (
"os"
"strings"

"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/catalog"
"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
"github.com/marcboeker/go-duckdb"
)

type ArrowWriter struct {
ctx *sql.Context
duckSQL string
pipePath string
rawOptions string
}

func NewArrowWriter(
ctx *sql.Context,
handler *DuckHandler,
schema string, table sql.Table, columns tree.NameList,
query string,
rawOptions string,
) (*ArrowWriter, error) {
// Create the FIFO pipe
db := handler.e.Analyzer.ExecBuilder.(*backend.DuckBuilder)
pipePath, err := db.CreatePipe(ctx, "pg-to-arrow")
if err != nil {
return nil, err
}

var builder strings.Builder
builder.Grow(128)

if table != nil {
// https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax
// FROM table_name [ SELECT column_list ]
builder.WriteString("FROM ")
if schema != "" {
builder.WriteString(catalog.QuoteIdentifierANSI(schema))
builder.WriteString(".")
}
builder.WriteString(catalog.QuoteIdentifierANSI(table.Name()))
if columns != nil {
builder.WriteString(" SELECT ")
builder.WriteString(columns.String())
}
} else {
builder.WriteString(query)
}

return &ArrowWriter{
ctx: ctx,
duckSQL: builder.String(),
pipePath: pipePath,
rawOptions: rawOptions, // TODO(fan): parse rawOptions
}, nil
}

func (dw *ArrowWriter) Start() (string, chan CopyToResult, error) {
// Execute the statement in a separate goroutine.
ch := make(chan CopyToResult, 1)
go func() {
defer os.Remove(dw.pipePath)
defer close(ch)

dw.ctx.GetLogger().Tracef("Executing statement via Arrow interface: %s", dw.duckSQL)
conn, err := adapter.GetConn(dw.ctx)
if err != nil {
ch <- CopyToResult{Err: err}
return
}

// Open the pipe for writing.
// This operation will block until the reader opens the pipe for reading.
pipe, err := os.OpenFile(dw.pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
ch <- CopyToResult{Err: err}
return
}
defer pipe.Close()

rowCount := int64(0)

if err := conn.Raw(func(driverConn any) error {
conn := driverConn.(*duckdb.Conn)
arrow, err := duckdb.NewArrowFromConn(conn)
if err != nil {
return err
}

// TODO(fan): Currently, this API materializes the entire result set in memory.
// We should consider modifying the API to allow streaming the result set.
recordReader, err := arrow.QueryContext(dw.ctx, dw.duckSQL)
if err != nil {
return err
}
defer recordReader.Release()

writer := ipc.NewWriter(pipe, ipc.WithSchema(recordReader.Schema()))
defer writer.Close()

for recordReader.Next() {
record := recordReader.Record()
rowCount += record.NumRows()
if err := writer.Write(record); err != nil {
return err
}
}
return recordReader.Err()
}); err != nil {
ch <- CopyToResult{Err: err}
return
}

ch <- CopyToResult{RowCount: rowCount}
}()

return dw.pipePath, ch, nil
}

func (dw *ArrowWriter) Close() {
os.Remove(dw.pipePath)
}
4 changes: 4 additions & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type copyFromStdinState struct {
copyFromStdinNode *tree.CopyFrom
// targetTable stores the targetTable that the data is being loaded into.
targetTable sql.InsertableTable

// For non-PG-parsable COPY FROM
rawOptions string

// dataLoader is the implementation of DataLoader that is used to load each individual CopyData chunk into the
// target table.
dataLoader DataLoader
Expand Down
Loading
Loading