Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Postgres Plugin for dbt-duckdb to Use ATTACH Syntax and Improve Compatibility Gexar/update pgsql plugin #478 #480

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
// Set *default* container specific settings.json values on container create.
"settings": {
"python.defaultInterpreterPath": "/usr/local/bin/python",
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.linting.mypyEnabled": true,
Expand Down Expand Up @@ -49,11 +51,21 @@
]
}
},
// "features": {
// // Allow the devcontainer to run host docker commands, see https://github.com/devcontainers/templates/tree/main/src/docker-outside-of-docker
// "ghcr.io/devcontainers/features/docker-outside-of-docker:1": {
// "enableNonRootDocker": true
// }
// },
// "mounts": [
// "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
// ],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",
// Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode",
"workspaceFolder": "/workspaces/dbt-duckdb",
"postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt"
}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ target/
.idea/
.vscode/
.env

.venv
141 changes: 119 additions & 22 deletions dbt/adapters/duckdb/plugins/postgres.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,132 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from duckdb import DuckDBPyConnection

from . import BasePlugin
from dbt.adapters.events.logging import AdapterLogger

PG_EXT = "postgres"


class Plugin(BasePlugin):
logger = AdapterLogger("DuckDB_PostgresPlugin")

def __init__(self, name: str, plugin_config: Dict[str, Any]):
"""
Initialize the Plugin with a name and configuration.
"""
super().__init__(name, plugin_config)
self.logger.debug(
"Plugin __init__ called with name: %s and config: %s", name, plugin_config
)
self.initialize(plugin_config)

def initialize(self, config: Dict[str, Any]):
self._dsn = config.get("dsn")
if self._dsn is None:
raise Exception("'dsn' is a required argument for the postgres plugin!")
self._source_schema = config.get("source_schema", "public")
self._sink_schema = config.get("sink_schema", "main")
self._overwrite = config.get("overwrite", False)
self._filter_pushdown = config.get("filter_pushdown", False)
"""
Initialize the plugin with the provided configuration.
"""
self.logger.debug("Initializing PostgreSQL plugin with config: %s", config)

self._dsn: str = config["dsn"]
if not self._dsn:
self.logger.error(
"Initialization failed: 'dsn' is a required argument for the postgres plugin!"
)
raise ValueError("'dsn' is a required argument for the postgres plugin!")

self._pg_schema: Optional[str] = config.get("pg_schema") # Can be None
self._duckdb_alias: str = config.get("duckdb_alias", "postgres_db")
self._read_only: bool = config.get("read_only", False)
self._secret: Optional[str] = config.get("secret")
self._attach_options: Dict[str, Any] = config.get(
"attach_options", {}
) # Additional ATTACH options
self._settings: Dict[str, Any] = config.get(
"settings", {}
) # Extension settings via SET commands

self.logger.info(
"PostgreSQL plugin initialized with dsn='%s', pg_schema='%s', "
"duckdb_alias='%s', read_only=%s, secret='%s'",
self._dsn,
self._pg_schema,
self._duckdb_alias,
self._read_only,
self._secret,
)

def configure_connection(self, conn: DuckDBPyConnection):
conn.install_extension("postgres")
conn.load_extension("postgres")

if self._sink_schema:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {self._sink_schema}")

attach_args = [
("source_schema", f"'{self._source_schema}'"),
("sink_schema", f"'{self._sink_schema}'"),
("overwrite", str(self._overwrite).lower()),
("filter_pushdown", str(self._filter_pushdown).lower()),
]
attach_stmt = (
f"CALL postgres_attach('{self._dsn}', {', '.join(f'{k}={v}' for k, v in attach_args)})"
"""
Configure the DuckDB connection to attach the PostgreSQL database.
"""
self.logger.debug("Configuring DuckDB connection for PostgreSQL plugin.")

conn.install_extension(PG_EXT)
conn.load_extension(PG_EXT)
self.logger.info("PostgreSQL extension installed and loaded.")

# Set any extension settings provided
self._set_extension_settings(conn)

# Build and execute the ATTACH command
attach_stmt = self._build_attach_statement()
self.logger.debug("Executing ATTACH statement: %s", attach_stmt)
try:
conn.execute(attach_stmt)
self.logger.info("Successfully attached PostgreSQL database with DSN: %s", self._dsn)
except Exception as e:
self.logger.error("Failed to attach PostgreSQL database: %s", e)
raise

def _set_extension_settings(self, conn: DuckDBPyConnection):
"""
Set extension settings via SET commands.
"""
for setting, value in self._settings.items():
# Quote string values
if isinstance(value, str):
value = f"'{value}'"
elif isinstance(value, bool):
value = "true" if value else "false"
set_stmt = f"SET {setting} = {value};"
self.logger.debug("Setting extension option: %s", set_stmt)
try:
conn.execute(set_stmt)
except Exception as e:
self.logger.error("Failed to set option %s: %s", setting, e)
raise

def _build_attach_statement(self) -> str:
"""
Build the ATTACH statement for connecting to the PostgreSQL database.
"""
attach_options: List[Tuple[str, Optional[str]]] = [("TYPE", "POSTGRES")]

if self._pg_schema:
attach_options.append(("SCHEMA", f"'{self._pg_schema}'"))

if self._secret:
attach_options.append(("SECRET", f"'{self._secret}'"))

# Additional attach options
for k, v in self._attach_options.items():
if isinstance(v, bool):
v = "true" if v else "false"
elif isinstance(v, str):
v = f"'{v}'"
attach_options.append((k.upper(), v))

if self._read_only:
attach_options.append(("READ_ONLY", None)) # No value assigned

# Convert options to string
attach_options_str = ", ".join(
f"{k} {v}" if v is not None else k for k, v in attach_options
)
conn.execute(attach_stmt)

attach_stmt = f"ATTACH '{self._dsn}' AS {self._duckdb_alias} ({attach_options_str});"
return attach_stmt
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ openpyxl
pip-tools
pre-commit
psycopg2-binary
psycopg[binary]
pyiceberg
pytest
pytest-dotenv
pytest-logbook
pytest-csv
pytest-xdist
pytest-mock
testcontainers[postgres]
pytz
ruff
sqlalchemy
Expand Down
Loading
Loading