diff --git a/.github/workflows/clients-compatibility.yml b/.github/workflows/clients-compatibility.yml index d05440da..87ac20ad 100644 --- a/.github/workflows/clients-compatibility.yml +++ b/.github/workflows/clients-compatibility.yml @@ -108,7 +108,7 @@ jobs: # curl -L -o ./java/postgresql-42.7.4.jar https://jdbc.postgresql.org/download/postgresql-42.7.4.jar npm install pg sudo cpanm --notest DBD::Pg - pip3 install psycopg2 + pip3 install psycopg pandas pyarrow polars # sudo R -e "install.packages('RPostgres', repos='http://cran.r-project.org')" sudo gem install pg @@ -123,3 +123,7 @@ jobs: - name: Run the Compatibility Test for PostgreSQL Client run: | bats ./compatibility/pg/test.bats + + - name: Run the Compatibility Test for Python Data Tools + run: | + bats ./compatibility/pg-pytools/test.bats \ No newline at end of file diff --git a/compatibility/mysql/python/mysql_test.py b/compatibility/mysql/python/mysql_test.py index 25873171..d39eb7d2 100644 --- a/compatibility/mysql/python/mysql_test.py +++ b/compatibility/mysql/python/mysql_test.py @@ -58,7 +58,6 @@ def run_tests(self): for test in self.tests: cursor = None try: - self.conn.autocommit = False cursor = self.conn.cursor() if not test.run(cursor): return False diff --git a/compatibility/pg-pytools/polars_test.py b/compatibility/pg-pytools/polars_test.py new file mode 100644 index 00000000..e6553c7d --- /dev/null +++ b/compatibility/pg-pytools/polars_test.py @@ -0,0 +1,54 @@ +import io + +import pandas as pd +import pyarrow as pa +import polars as pl +import psycopg + +# Create a pandas DataFrame +data = { + 'id': [1, 2, 3], + 'num': [100, 200, 300], + 'data': ['aaa', 'bbb', 'ccc'] +} +df = pd.DataFrame(data) + +# Convert the DataFrame to an Arrow Table +table = pa.Table.from_pandas(df) + +with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute("DROP SCHEMA IF EXISTS test CASCADE") + cur.execute("CREATE SCHEMA test") + + # Create a new table + cur.execute(""" + CREATE TABLE test.tb1 ( + id integer PRIMARY KEY, + num integer, + data text) + """) + + # Use psycopg to write the DataFrame to MyDuck Server + output_stream = io.BytesIO() + with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer: + writer.write_table(table) + with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy: + copy.write(output_stream.getvalue()) + + # Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format + arrow_data = io.BytesIO() + with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy: + for block in copy: + arrow_data.write(block) + + # Read the Arrow data into a Polars DataFrame + with pa.ipc.open_stream(arrow_data.getvalue()) as reader: + arrow_df = reader.read_all() + polars_df = pl.from_arrow(arrow_df) + + # Convert the original pandas DataFrame to Polars DataFrame for comparison + polars_df_original = pl.from_pandas(df) + + # Compare the original Polars DataFrame with the DataFrame from PostgreSQL + assert polars_df.equals(polars_df_original), "DataFrames are not equal" diff --git a/compatibility/pg-pytools/psycopg_test.py b/compatibility/pg-pytools/psycopg_test.py new file mode 100644 index 00000000..0578c912 --- /dev/null +++ b/compatibility/pg-pytools/psycopg_test.py @@ -0,0 +1,53 @@ +from psycopg import sql +import psycopg + +rows = [ + (1, 100, "aaa"), + (2, 200, "bbb"), + (3, 300, "ccc"), + (4, 400, "ddd"), + (5, 500, "eee"), +] + +# Connect to an existing database +with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn: + # Open a cursor to perform database operations + with conn.cursor() as cur: + cur.execute("DROP SCHEMA IF EXISTS test CASCADE") + cur.execute("CREATE SCHEMA test") + + cur.execute(""" + CREATE TABLE test.tb1 ( + id integer PRIMARY KEY, + num integer, + data text) + """) + + + # Pass data to fill a query placeholders and let Psycopg perform the correct conversion + cur.execute( + "INSERT INTO test.tb1 (id, num, data) VALUES (%s, %s, %s)", + rows[0]) + + # Query the database and obtain data as Python objects + cur.execute("SELECT * FROM test.tb1") + row = cur.fetchone() + assert row == rows[0], "Row is not equal" + + # Copy data from a file-like object to a table + print("Copy data from a file-like object to a table") + with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy: + for row in rows[1:3]: + copy.write(f"{row[0]}\t{row[1]}\t{row[2]}\n".encode()) + for row in rows[3:]: + copy.write_row(row) + + # Copy data from a table to a file-like object + print("Copy data from a table to a file-like object") + with cur.copy( + "COPY (SELECT * FROM test.tb1 LIMIT %s) TO STDOUT", + (4,) + ) as copy: + copy.set_types(["int4", "int4", "text"]) + for i, row in enumerate(copy.rows()): + assert row == rows[i], f"Row {i} is not equal" diff --git a/compatibility/pg-pytools/pyarrow_test.py b/compatibility/pg-pytools/pyarrow_test.py new file mode 100644 index 00000000..186f8232 --- /dev/null +++ b/compatibility/pg-pytools/pyarrow_test.py @@ -0,0 +1,50 @@ +import io + +import pandas as pd +import pyarrow as pa +import psycopg + +# Create a pandas DataFrame +data = { + 'id': [1, 2, 3], + 'num': [100, 200, 300], + 'data': ['aaa', 'bbb', 'ccc'] +} +df = pd.DataFrame(data) + +# Convert the DataFrame to an Arrow Table +table = pa.Table.from_pandas(df) + +with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute("DROP SCHEMA IF EXISTS test CASCADE") + cur.execute("CREATE SCHEMA test") + + # Create a new table + cur.execute(""" + CREATE TABLE test.tb1 ( + id integer PRIMARY KEY, + num integer, + data text) + """) + + # Use psycopg to write the DataFrame to MyDuck Server + output_stream = io.BytesIO() + with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer: + writer.write_table(table) + with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy: + copy.write(output_stream.getvalue()) + + # Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format + arrow_data = io.BytesIO() + with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy: + for block in copy: + arrow_data.write(block) + + # Read the Arrow data into a pandas DataFrame + with pa.ipc.open_stream(arrow_data.getvalue()) as reader: + df_from_pg = reader.read_pandas() + df = df.astype({'id': 'int64', 'num': 'int64'}) + df_from_pg = df_from_pg.astype({'id': 'int64', 'num': 'int64'}) + # Compare the original DataFrame with the DataFrame from PostgreSQL + assert df.equals(df_from_pg), "DataFrames are not equal" diff --git a/compatibility/pg-pytools/test.bats b/compatibility/pg-pytools/test.bats new file mode 100644 index 00000000..94ce5c98 --- /dev/null +++ b/compatibility/pg-pytools/test.bats @@ -0,0 +1,49 @@ +#!/usr/bin/env bats + +setup() { + psql -h 127.0.0.1 -p 5432 -U postgres -c "DROP SCHEMA IF EXISTS test CASCADE;" + touch /tmp/test_pids +} + +custom_teardown="" + +set_custom_teardown() { + custom_teardown="$1" +} + +teardown() { + if [ -n "$custom_teardown" ]; then + eval "$custom_teardown" + custom_teardown="" + fi + + while read -r pid; do + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" + wait "$pid" 2>/dev/null + fi + done < /tmp/test_pids + rm /tmp/test_pids +} + +start_process() { + run timeout 2m "$@" + echo $! >> /tmp/test_pids + if [ "$status" -ne 0 ]; then + echo "$output" + echo "$stderr" + fi + [ "$status" -eq 0 ] +} + +@test "pg-psycopg" { + start_process python3 $BATS_TEST_DIRNAME/psycopg_test.py +} + +@test "pg-pyarrow" { + start_process python3 $BATS_TEST_DIRNAME/pyarrow_test.py +} + +@test "pg-polars" { + start_process python3 $BATS_TEST_DIRNAME/polars_test.py +} \ No newline at end of file diff --git a/compatibility/pg/python/pg_test.py b/compatibility/pg/python/pg_test.py index b1bebd75..92e36840 100644 --- a/compatibility/pg/python/pg_test.py +++ b/compatibility/pg/python/pg_test.py @@ -1,4 +1,4 @@ -import psycopg2 +import psycopg class PGTest: class Test: @@ -39,12 +39,13 @@ def __init__(self): def connect(self, ip, port, user, password): try: - self.conn = psycopg2.connect( + self.conn = psycopg.connect( host=ip, port=port, dbname="postgres", user=user, - password=password + password=password, + autocommit=True ) except Exception as e: raise RuntimeError(e) @@ -59,7 +60,6 @@ def run_tests(self): for test in self.tests: cursor = None try: - self.conn.autocommit = False cursor = self.conn.cursor() if not test.run(cursor): return False diff --git a/compatibility/pg/test.bats b/compatibility/pg/test.bats index 611ec329..de60fd48 100644 --- a/compatibility/pg/test.bats +++ b/compatibility/pg/test.bats @@ -71,9 +71,9 @@ start_process() { # start_process php $BATS_TEST_DIRNAME/php/pg_test.php 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data # } -# @test "pg-python" { -# start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data -# } +@test "pg-python" { + start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data +} # @test "pg-r" { # start_process Rscript $BATS_TEST_DIRNAME/r/PGTest.R 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data diff --git a/docs/tutorial/pg-python-data-tools.md b/docs/tutorial/pg-python-data-tools.md new file mode 100644 index 00000000..a4532a57 --- /dev/null +++ b/docs/tutorial/pg-python-data-tools.md @@ -0,0 +1,114 @@ +# Tutorial: Accessing MyDuck Server with PostgreSQL using psycopg, pyarrow, and polars + +## 0. Connecting to MyDuck Server using psycopg + +`psycopg` is a popular PostgreSQL adapter for Python. Here is how you can connect to MyDuck Server using `psycopg`: + +```python +import psycopg +with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn: + with conn.cursor() as cur: + ... +``` + +## 1. Using COPY Operation for Direct Interaction + +The `COPY` command in PostgreSQL is a powerful tool for bulk data transfer. Here is how you can use it with the `psycopg` library to interact directly with MyDuck Server: + +### Writing Data Directly + +```python +with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy: + copy.write(b"1\t100\taaa\n") +``` + +### Writing Data Row by Row + +```python +with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy: + copy.write_row((1, 100, "aaa")) +``` + +### Reading Data Directly + +```python +with cur.copy("COPY test.tb1 TO STDOUT") as copy: + for block in copy: + print(block) +``` + +### Reading Data Row by Row + +```python +with cur.copy("COPY test.tb1 TO STDOUT") as copy: + for row in copy.rows(): + print(row) +``` + +## 2. Importing and Exporting Data in pyarrow Format + +`pyarrow` allows efficient data interchange between pandas DataFrames and MyDuck Server. Here is how to import and export data in `pyarrow` format: + +### Creating a pandas DataFrame and Converting to Arrow Table + +```python +data = { + 'id': [1, 2, 3], + 'num': [100, 200, 300], + 'data': ['aaa', 'bbb', 'ccc'] +} +df = pd.DataFrame(data) +table = pa.Table.from_pandas(df) +``` + +### Writing Data to MyDuck Server in Arrow Format + +```python +output_stream = io.BytesIO() +with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer: + writer.write_table(table) +with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy: + copy.write(output_stream.getvalue()) +``` + +### Reading Data from MyDuck Server in Arrow Format + +```python +arrow_data = io.BytesIO() +with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy: + for block in copy: + arrow_data.write(block) + print(arrow_data.getvalue()) +``` + +### Converting Arrow Data to Arrow DataFrame + +```python +with pa.ipc.open_stream(arrow_data.getvalue()) as reader: + arrow_df = reader.read_all() + print(arrow_df) +``` + +### Converting Arrow Data to Pandas DataFrame + +```python +with pa.ipc.open_stream(arrow_data.getvalue()) as reader: + pandas_df = reader.read_pandas() + print(pandas_df) +``` + +## 3. Using polars to Convert pyarrow Format Data + +`polars` is a fast DataFrame library that can work with `pyarrow` data. Here is how to use `polars` to convert `pyarrow` format data: + +### Converting Pandas DataFrame to polars DataFrame + +```python +polars_df = pl.from_pandas(pandas_df) +``` + +### Converting Arrow DataFrame to polars DataFrame + +```python +polars_df = pl.from_arrow(arrow_df) +``` \ No newline at end of file diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index fc5ca957..23e38f5f 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -15,6 +15,7 @@ package pgserver import ( + "bufio" "bytes" "context" "crypto/tls" @@ -800,15 +801,12 @@ func (h *ConnectionHandler) handleCopyDataHelper(message *pgproto3.CopyData) (st rawOptions, ) case tree.CopyFormatText: - // Remove trailing backslash, comma and newline characters from the data - if bytes.HasSuffix(message.Data, []byte{'\n'}) { - message.Data = message.Data[:len(message.Data)-1] + // Remove `\.` from the end of the message data, if it exists + if bytes.HasSuffix(message.Data, []byte{'\\', '.', '\n'}) { + message.Data = message.Data[:len(message.Data)-3] } - if bytes.HasSuffix(message.Data, []byte{'\r'}) { - message.Data = message.Data[:len(message.Data)-1] - } - if bytes.HasSuffix(message.Data, []byte{'\\', '.'}) { - message.Data = message.Data[:len(message.Data)-2] + if bytes.HasSuffix(message.Data, []byte{'\\', '.', '\r', '\n'}) { + message.Data = message.Data[:len(message.Data)-4] } fallthrough case tree.CopyFormatCSV: @@ -1405,27 +1403,52 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedQuery, copyTo *tre ctx.GetLogger().Debug("Finished copying data from the pipe to the client") }() - buf := make([]byte, 1<<20) // 1MB buffer - for { - n, err := pipe.Read(buf) - if n > 0 { - copyData := &pgproto3.CopyData{ - Data: buf[:n], - } - ctx.GetLogger().Debugf("sending CopyData (%d bytes) to the client", n) - if err := h.send(copyData); err != nil { + sendCopyData := func(copyData *pgproto3.CopyData) { + ctx.GetLogger().Debugf("sending CopyData (%d bytes) to the client", len(copyData.Data)) + if err := h.send(copyData); err != nil { + sendErr.Store(err) + cancel() + return + } + } + + switch format { + case tree.CopyFormatText: + reader := bufio.NewReader(pipe) + for { + line, err := reader.ReadSlice('\n') + if err != nil { + if err == io.EOF { + break + } sendErr.Store(err) cancel() return } + copyData := &pgproto3.CopyData{ + Data: line, + } + logrus.Warnf("line: %s", line) + sendCopyData(copyData) } - if err != nil { - if err == io.EOF { - break + default: + buf := make([]byte, 1<<20) // 1MB buffer + for { + n, err := pipe.Read(buf) + if n > 0 { + copyData := &pgproto3.CopyData{ + Data: buf[:n], + } + sendCopyData(copyData) + } + if err != nil { + if err == io.EOF { + break + } + sendErr.Store(err) + cancel() + return } - sendErr.Store(err) - cancel() - return } } }()