Skip to content

Commit

Permalink
feat: add compatibility test for python data tools, fix some compatib…
Browse files Browse the repository at this point in the history
…ility bugs (apecloud#209)
  • Loading branch information
NoyException committed Dec 3, 2024
1 parent c2372a8 commit 6a052cb
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 32 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/clients-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
# curl -L -o ./java/postgresql-42.7.4.jar https://jdbc.postgresql.org/download/postgresql-42.7.4.jar
npm install pg
sudo cpanm --notest DBD::Pg
pip3 install psycopg2
pip3 install psycopg pandas pyarrow polars
# sudo R -e "install.packages('RPostgres', repos='http://cran.r-project.org')"
sudo gem install pg
Expand All @@ -123,3 +123,7 @@ jobs:
- name: Run the Compatibility Test for PostgreSQL Client
run: |
bats ./compatibility/pg/test.bats
- name: Run the Compatibility Test for Python Data Tools
run: |
bats ./compatibility/pg-pytools/test.bats
1 change: 0 additions & 1 deletion compatibility/mysql/python/mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def run_tests(self):
for test in self.tests:
cursor = None
try:
self.conn.autocommit = False
cursor = self.conn.cursor()
if not test.run(cursor):
return False
Expand Down
54 changes: 54 additions & 0 deletions compatibility/pg-pytools/polars_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import io

import pandas as pd
import pyarrow as pa
import polars as pl
import psycopg

# Create a pandas DataFrame
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)

# Convert the DataFrame to an Arrow Table
table = pa.Table.from_pandas(df)

with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
cur.execute("CREATE SCHEMA test")

# Create a new table
cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")

# Use psycopg to write the DataFrame to MyDuck Server
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())

# Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)

# Read the Arrow data into a Polars DataFrame
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
arrow_df = reader.read_all()
polars_df = pl.from_arrow(arrow_df)

# Convert the original pandas DataFrame to Polars DataFrame for comparison
polars_df_original = pl.from_pandas(df)

# Compare the original Polars DataFrame with the DataFrame from PostgreSQL
assert polars_df.equals(polars_df_original), "DataFrames are not equal"
53 changes: 53 additions & 0 deletions compatibility/pg-pytools/psycopg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from psycopg import sql
import psycopg

rows = [
(1, 100, "aaa"),
(2, 200, "bbb"),
(3, 300, "ccc"),
(4, 400, "ddd"),
(5, 500, "eee"),
]

# Connect to an existing database
with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
cur.execute("CREATE SCHEMA test")

cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")


# Pass data to fill a query placeholders and let Psycopg perform the correct conversion
cur.execute(
"INSERT INTO test.tb1 (id, num, data) VALUES (%s, %s, %s)",
rows[0])

# Query the database and obtain data as Python objects
cur.execute("SELECT * FROM test.tb1")
row = cur.fetchone()
assert row == rows[0], "Row is not equal"

# Copy data from a file-like object to a table
print("Copy data from a file-like object to a table")
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
for row in rows[1:3]:
copy.write(f"{row[0]}\t{row[1]}\t{row[2]}\n".encode())
for row in rows[3:]:
copy.write_row(row)

# Copy data from a table to a file-like object
print("Copy data from a table to a file-like object")
with cur.copy(
"COPY (SELECT * FROM test.tb1 LIMIT %s) TO STDOUT",
(4,)
) as copy:
copy.set_types(["int4", "int4", "text"])
for i, row in enumerate(copy.rows()):
assert row == rows[i], f"Row {i} is not equal"
50 changes: 50 additions & 0 deletions compatibility/pg-pytools/pyarrow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import io

import pandas as pd
import pyarrow as pa
import psycopg

# Create a pandas DataFrame
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)

# Convert the DataFrame to an Arrow Table
table = pa.Table.from_pandas(df)

with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
cur.execute("CREATE SCHEMA test")

# Create a new table
cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")

# Use psycopg to write the DataFrame to MyDuck Server
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())

# Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)

# Read the Arrow data into a pandas DataFrame
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
df_from_pg = reader.read_pandas()
df = df.astype({'id': 'int64', 'num': 'int64'})
df_from_pg = df_from_pg.astype({'id': 'int64', 'num': 'int64'})
# Compare the original DataFrame with the DataFrame from PostgreSQL
assert df.equals(df_from_pg), "DataFrames are not equal"
49 changes: 49 additions & 0 deletions compatibility/pg-pytools/test.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bats

setup() {
psql -h 127.0.0.1 -p 5432 -U postgres -c "DROP SCHEMA IF EXISTS test CASCADE;"
touch /tmp/test_pids
}

custom_teardown=""

set_custom_teardown() {
custom_teardown="$1"
}

teardown() {
if [ -n "$custom_teardown" ]; then
eval "$custom_teardown"
custom_teardown=""
fi

while read -r pid; do
if kill -0 "$pid" 2>/dev/null; then
kill "$pid"
wait "$pid" 2>/dev/null
fi
done < /tmp/test_pids
rm /tmp/test_pids
}

start_process() {
run timeout 2m "$@"
echo $! >> /tmp/test_pids
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
fi
[ "$status" -eq 0 ]
}

@test "pg-psycopg" {
start_process python3 $BATS_TEST_DIRNAME/psycopg_test.py
}

@test "pg-pyarrow" {
start_process python3 $BATS_TEST_DIRNAME/pyarrow_test.py
}

@test "pg-polars" {
start_process python3 $BATS_TEST_DIRNAME/polars_test.py
}
8 changes: 4 additions & 4 deletions compatibility/pg/python/pg_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import psycopg2
import psycopg

class PGTest:
class Test:
Expand Down Expand Up @@ -39,12 +39,13 @@ def __init__(self):

def connect(self, ip, port, user, password):
try:
self.conn = psycopg2.connect(
self.conn = psycopg.connect(
host=ip,
port=port,
dbname="postgres",
user=user,
password=password
password=password,
autocommit=True
)
except Exception as e:
raise RuntimeError(e)
Expand All @@ -59,7 +60,6 @@ def run_tests(self):
for test in self.tests:
cursor = None
try:
self.conn.autocommit = False
cursor = self.conn.cursor()
if not test.run(cursor):
return False
Expand Down
6 changes: 3 additions & 3 deletions compatibility/pg/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ start_process() {
# start_process php $BATS_TEST_DIRNAME/php/pg_test.php 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
# }

# @test "pg-python" {
# start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
# }
@test "pg-python" {
start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
}

# @test "pg-r" {
# start_process Rscript $BATS_TEST_DIRNAME/r/PGTest.R 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
Expand Down
114 changes: 114 additions & 0 deletions docs/tutorial/pg-python-data-tools.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Tutorial: Accessing MyDuck Server with PostgreSQL using psycopg, pyarrow, and polars

## 0. Connecting to MyDuck Server using psycopg

`psycopg` is a popular PostgreSQL adapter for Python. Here is how you can connect to MyDuck Server using `psycopg`:

```python
import psycopg
with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
...
```

## 1. Using COPY Operation for Direct Interaction

The `COPY` command in PostgreSQL is a powerful tool for bulk data transfer. Here is how you can use it with the `psycopg` library to interact directly with MyDuck Server:

### Writing Data Directly

```python
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
copy.write(b"1\t100\taaa\n")
```

### Writing Data Row by Row

```python
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
copy.write_row((1, 100, "aaa"))
```

### Reading Data Directly

```python
with cur.copy("COPY test.tb1 TO STDOUT") as copy:
for block in copy:
print(block)
```

### Reading Data Row by Row

```python
with cur.copy("COPY test.tb1 TO STDOUT") as copy:
for row in copy.rows():
print(row)
```

## 2. Importing and Exporting Data in pyarrow Format

`pyarrow` allows efficient data interchange between pandas DataFrames and MyDuck Server. Here is how to import and export data in `pyarrow` format:

### Creating a pandas DataFrame and Converting to Arrow Table

```python
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)
table = pa.Table.from_pandas(df)
```

### Writing Data to MyDuck Server in Arrow Format

```python
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())
```

### Reading Data from MyDuck Server in Arrow Format

```python
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)
print(arrow_data.getvalue())
```

### Converting Arrow Data to Arrow DataFrame

```python
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
arrow_df = reader.read_all()
print(arrow_df)
```

### Converting Arrow Data to Pandas DataFrame

```python
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
pandas_df = reader.read_pandas()
print(pandas_df)
```

## 3. Using polars to Convert pyarrow Format Data

`polars` is a fast DataFrame library that can work with `pyarrow` data. Here is how to use `polars` to convert `pyarrow` format data:

### Converting Pandas DataFrame to polars DataFrame

```python
polars_df = pl.from_pandas(pandas_df)
```

### Converting Arrow DataFrame to polars DataFrame

```python
polars_df = pl.from_arrow(arrow_df)
```
Loading

0 comments on commit 6a052cb

Please sign in to comment.