Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add compatibility test for python data tools, fix some compatibility bugs (#209) #248

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/clients-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
# curl -L -o ./java/postgresql-42.7.4.jar https://jdbc.postgresql.org/download/postgresql-42.7.4.jar
npm install pg
sudo cpanm --notest DBD::Pg
pip3 install psycopg2
pip3 install psycopg pandas pyarrow polars
# sudo R -e "install.packages('RPostgres', repos='http://cran.r-project.org')"
sudo gem install pg

Expand All @@ -123,3 +123,7 @@ jobs:
- name: Run the Compatibility Test for PostgreSQL Client
run: |
bats ./compatibility/pg/test.bats

- name: Run the Compatibility Test for Python Data Tools
run: |
bats ./compatibility/pg-pytools/test.bats
1 change: 0 additions & 1 deletion compatibility/mysql/python/mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def run_tests(self):
for test in self.tests:
cursor = None
try:
self.conn.autocommit = False
cursor = self.conn.cursor()
if not test.run(cursor):
return False
Expand Down
54 changes: 54 additions & 0 deletions compatibility/pg-pytools/polars_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import io

import pandas as pd
import pyarrow as pa
import polars as pl
import psycopg

# Create a pandas DataFrame
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)

# Convert the DataFrame to an Arrow Table
table = pa.Table.from_pandas(df)

with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
cur.execute("CREATE SCHEMA test")

# Create a new table
cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")

# Use psycopg to write the DataFrame to MyDuck Server
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())

# Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)

# Read the Arrow data into a Polars DataFrame
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
arrow_df = reader.read_all()
polars_df = pl.from_arrow(arrow_df)

# Convert the original pandas DataFrame to Polars DataFrame for comparison
polars_df_original = pl.from_pandas(df)

# Compare the original Polars DataFrame with the DataFrame from PostgreSQL
assert polars_df.equals(polars_df_original), "DataFrames are not equal"
53 changes: 53 additions & 0 deletions compatibility/pg-pytools/psycopg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from psycopg import sql
import psycopg

rows = [
(1, 100, "aaa"),
(2, 200, "bbb"),
(3, 300, "ccc"),
(4, 400, "ddd"),
(5, 500, "eee"),
]

# Connect to an existing database
with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
TianyuZhang1214 marked this conversation as resolved.
Show resolved Hide resolved
cur.execute("CREATE SCHEMA test")

cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")


# Pass data to fill a query placeholders and let Psycopg perform the correct conversion
cur.execute(
NoyException marked this conversation as resolved.
Show resolved Hide resolved
"INSERT INTO test.tb1 (id, num, data) VALUES (%s, %s, %s)",
rows[0])

# Query the database and obtain data as Python objects
cur.execute("SELECT * FROM test.tb1")
row = cur.fetchone()
assert row == rows[0], "Row is not equal"

# Copy data from a file-like object to a table
print("Copy data from a file-like object to a table")
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
for row in rows[1:3]:
copy.write(f"{row[0]}\t{row[1]}\t{row[2]}\n".encode())
for row in rows[3:]:
copy.write_row(row)

# Copy data from a table to a file-like object
print("Copy data from a table to a file-like object")
with cur.copy(
"COPY (SELECT * FROM test.tb1 LIMIT %s) TO STDOUT",
(4,)
) as copy:
copy.set_types(["int4", "int4", "text"])
for i, row in enumerate(copy.rows()):
assert row == rows[i], f"Row {i} is not equal"
50 changes: 50 additions & 0 deletions compatibility/pg-pytools/pyarrow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import io

import pandas as pd
import pyarrow as pa
import psycopg

# Create a pandas DataFrame
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)

# Convert the DataFrame to an Arrow Table
table = pa.Table.from_pandas(df)

with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute("DROP SCHEMA IF EXISTS test CASCADE")
cur.execute("CREATE SCHEMA test")

# Create a new table
cur.execute("""
CREATE TABLE test.tb1 (
id integer PRIMARY KEY,
num integer,
data text)
""")

# Use psycopg to write the DataFrame to MyDuck Server
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())

# Copy the data from MyDuck Server back into a pandas DataFrame using Arrow format
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)

# Read the Arrow data into a pandas DataFrame
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
df_from_pg = reader.read_pandas()
df = df.astype({'id': 'int64', 'num': 'int64'})
df_from_pg = df_from_pg.astype({'id': 'int64', 'num': 'int64'})
# Compare the original DataFrame with the DataFrame from PostgreSQL
assert df.equals(df_from_pg), "DataFrames are not equal"
49 changes: 49 additions & 0 deletions compatibility/pg-pytools/test.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bats

setup() {
psql -h 127.0.0.1 -p 5432 -U postgres -c "DROP SCHEMA IF EXISTS test CASCADE;"
touch /tmp/test_pids
}

custom_teardown=""

set_custom_teardown() {
custom_teardown="$1"
}

teardown() {
if [ -n "$custom_teardown" ]; then
eval "$custom_teardown"
custom_teardown=""
fi

while read -r pid; do
if kill -0 "$pid" 2>/dev/null; then
kill "$pid"
wait "$pid" 2>/dev/null
fi
done < /tmp/test_pids
rm /tmp/test_pids
}

start_process() {
run timeout 2m "$@"
echo $! >> /tmp/test_pids
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
fi
[ "$status" -eq 0 ]
}

@test "pg-psycopg" {
start_process python3 $BATS_TEST_DIRNAME/psycopg_test.py
}

@test "pg-pyarrow" {
start_process python3 $BATS_TEST_DIRNAME/pyarrow_test.py
}

@test "pg-polars" {
start_process python3 $BATS_TEST_DIRNAME/polars_test.py
}
8 changes: 4 additions & 4 deletions compatibility/pg/python/pg_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import psycopg2
import psycopg

class PGTest:
class Test:
Expand Down Expand Up @@ -39,12 +39,13 @@ def __init__(self):

def connect(self, ip, port, user, password):
try:
self.conn = psycopg2.connect(
self.conn = psycopg.connect(
host=ip,
port=port,
dbname="postgres",
user=user,
password=password
password=password,
autocommit=True
)
except Exception as e:
raise RuntimeError(e)
Expand All @@ -59,7 +60,6 @@ def run_tests(self):
for test in self.tests:
cursor = None
try:
self.conn.autocommit = False
cursor = self.conn.cursor()
if not test.run(cursor):
return False
Expand Down
6 changes: 3 additions & 3 deletions compatibility/pg/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ start_process() {
# start_process php $BATS_TEST_DIRNAME/php/pg_test.php 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
# }

# @test "pg-python" {
# start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
# }
@test "pg-python" {
start_process python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
}

# @test "pg-r" {
# start_process Rscript $BATS_TEST_DIRNAME/r/PGTest.R 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
Expand Down
114 changes: 114 additions & 0 deletions docs/tutorial/pg-python-data-tools.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Tutorial: Accessing MyDuck Server with PostgreSQL using psycopg, pyarrow, and polars

## 0. Connecting to MyDuck Server using psycopg

`psycopg` is a popular PostgreSQL adapter for Python. Here is how you can connect to MyDuck Server using `psycopg`:

```python
import psycopg
with psycopg.connect("dbname=postgres user=postgres host=127.0.0.1 port=5432", autocommit=True) as conn:
with conn.cursor() as cur:
...
```

## 1. Using COPY Operation for Direct Interaction

The `COPY` command in PostgreSQL is a powerful tool for bulk data transfer. Here is how you can use it with the `psycopg` library to interact directly with MyDuck Server:

### Writing Data Directly

```python
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
copy.write(b"1\t100\taaa\n")
```

### Writing Data Row by Row

```python
with cur.copy("COPY test.tb1 (id, num, data) FROM STDIN") as copy:
copy.write_row((1, 100, "aaa"))
```

### Reading Data Directly

```python
with cur.copy("COPY test.tb1 TO STDOUT") as copy:
for block in copy:
print(block)
```

### Reading Data Row by Row

```python
with cur.copy("COPY test.tb1 TO STDOUT") as copy:
for row in copy.rows():
print(row)
```

## 2. Importing and Exporting Data in pyarrow Format

`pyarrow` allows efficient data interchange between pandas DataFrames and MyDuck Server. Here is how to import and export data in `pyarrow` format:

### Creating a pandas DataFrame and Converting to Arrow Table

```python
data = {
'id': [1, 2, 3],
'num': [100, 200, 300],
'data': ['aaa', 'bbb', 'ccc']
}
df = pd.DataFrame(data)
table = pa.Table.from_pandas(df)
```

### Writing Data to MyDuck Server in Arrow Format

```python
output_stream = io.BytesIO()
with pa.ipc.RecordBatchStreamWriter(output_stream, table.schema) as writer:
writer.write_table(table)
with cur.copy("COPY test.tb1 FROM STDIN (FORMAT arrow)") as copy:
copy.write(output_stream.getvalue())
```

### Reading Data from MyDuck Server in Arrow Format

```python
arrow_data = io.BytesIO()
with cur.copy("COPY test.tb1 TO STDOUT (FORMAT arrow)") as copy:
for block in copy:
arrow_data.write(block)
print(arrow_data.getvalue())
```

### Converting Arrow Data to Arrow DataFrame

```python
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
arrow_df = reader.read_all()
print(arrow_df)
```

### Converting Arrow Data to Pandas DataFrame

```python
with pa.ipc.open_stream(arrow_data.getvalue()) as reader:
pandas_df = reader.read_pandas()
print(pandas_df)
```

## 3. Using polars to Convert pyarrow Format Data

`polars` is a fast DataFrame library that can work with `pyarrow` data. Here is how to use `polars` to convert `pyarrow` format data:

### Converting Pandas DataFrame to polars DataFrame

```python
polars_df = pl.from_pandas(pandas_df)
```

### Converting Arrow DataFrame to polars DataFrame

```python
polars_df = pl.from_arrow(arrow_df)
```
Loading
Loading