Skip to content

Commit

Permalink
fix(postgres-plugin #478): Correct attribute access, plugin initializ…
Browse files Browse the repository at this point in the history
…ation, and test assertions
  • Loading branch information
Gexar committed Nov 25, 2024
1 parent 1fbecd5 commit 5b953b3
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 54 deletions.
9 changes: 9 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@
]
}
},
// "features": {
// // Allow the devcontainer to run host docker commands, see https://github.com/devcontainers/templates/tree/main/src/docker-outside-of-docker
// "ghcr.io/devcontainers/features/docker-outside-of-docker:1": {
// "enableNonRootDocker": true
// }
// },
// "mounts": [
// "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
// ],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ target/
.idea/
.vscode/
.env

.venv
141 changes: 119 additions & 22 deletions dbt/adapters/duckdb/plugins/postgres.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,132 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from duckdb import DuckDBPyConnection

from . import BasePlugin
from dbt.adapters.events.logging import AdapterLogger

PG_EXT = "postgres"


class Plugin(BasePlugin):
logger = AdapterLogger("DuckDB_PostgresPlugin")

def __init__(self, name: str, plugin_config: Dict[str, Any]):
"""
Initialize the Plugin with a name and configuration.
"""
super().__init__(name, plugin_config)
self.logger.debug(
"Plugin __init__ called with name: %s and config: %s", name, plugin_config
)
self.initialize(plugin_config)

def initialize(self, config: Dict[str, Any]):
self._dsn = config.get("dsn")
if self._dsn is None:
raise Exception("'dsn' is a required argument for the postgres plugin!")
self._source_schema = config.get("source_schema", "public")
self._sink_schema = config.get("sink_schema", "main")
self._overwrite = config.get("overwrite", False)
self._filter_pushdown = config.get("filter_pushdown", False)
"""
Initialize the plugin with the provided configuration.
"""
self.logger.debug("Initializing PostgreSQL plugin with config: %s", config)

self._dsn: str = config["dsn"]
if not self._dsn:
self.logger.error(
"Initialization failed: 'dsn' is a required argument for the postgres plugin!"
)
raise ValueError("'dsn' is a required argument for the postgres plugin!")

self._pg_schema: Optional[str] = config.get("pg_schema") # Can be None
self._duckdb_alias: str = config.get("duckdb_alias", "postgres_db")
self._read_only: bool = config.get("read_only", False)
self._secret: Optional[str] = config.get("secret")
self._attach_options: Dict[str, Any] = config.get(
"attach_options", {}
) # Additional ATTACH options
self._settings: Dict[str, Any] = config.get(
"settings", {}
) # Extension settings via SET commands

self.logger.info(
"PostgreSQL plugin initialized with dsn='%s', pg_schema='%s', "
"duckdb_alias='%s', read_only=%s, secret='%s'",
self._dsn,
self._pg_schema,
self._duckdb_alias,
self._read_only,
self._secret,
)

def configure_connection(self, conn: DuckDBPyConnection):
conn.install_extension("postgres")
conn.load_extension("postgres")

if self._sink_schema:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {self._sink_schema}")

attach_args = [
("source_schema", f"'{self._source_schema}'"),
("sink_schema", f"'{self._sink_schema}'"),
("overwrite", str(self._overwrite).lower()),
("filter_pushdown", str(self._filter_pushdown).lower()),
]
attach_stmt = (
f"CALL postgres_attach('{self._dsn}', {', '.join(f'{k}={v}' for k, v in attach_args)})"
"""
Configure the DuckDB connection to attach the PostgreSQL database.
"""
self.logger.debug("Configuring DuckDB connection for PostgreSQL plugin.")

conn.install_extension(PG_EXT)
conn.load_extension(PG_EXT)
self.logger.info("PostgreSQL extension installed and loaded.")

# Set any extension settings provided
self._set_extension_settings(conn)

# Build and execute the ATTACH command
attach_stmt = self._build_attach_statement()
self.logger.debug("Executing ATTACH statement: %s", attach_stmt)
try:
conn.execute(attach_stmt)
self.logger.info("Successfully attached PostgreSQL database with DSN: %s", self._dsn)
except Exception as e:
self.logger.error("Failed to attach PostgreSQL database: %s", e)
raise

def _set_extension_settings(self, conn: DuckDBPyConnection):
"""
Set extension settings via SET commands.
"""
for setting, value in self._settings.items():
# Quote string values
if isinstance(value, str):
value = f"'{value}'"
elif isinstance(value, bool):
value = "true" if value else "false"
set_stmt = f"SET {setting} = {value};"
self.logger.debug("Setting extension option: %s", set_stmt)
try:
conn.execute(set_stmt)
except Exception as e:
self.logger.error("Failed to set option %s: %s", setting, e)
raise

def _build_attach_statement(self) -> str:
"""
Build the ATTACH statement for connecting to the PostgreSQL database.
"""
attach_options: List[Tuple[str, Optional[str]]] = [("TYPE", "POSTGRES")]

if self._pg_schema:
attach_options.append(("SCHEMA", f"'{self._pg_schema}'"))

if self._secret:
attach_options.append(("SECRET", f"'{self._secret}'"))

# Additional attach options
for k, v in self._attach_options.items():
if isinstance(v, bool):
v = "true" if v else "false"
elif isinstance(v, str):
v = f"'{v}'"
attach_options.append((k.upper(), v))

if self._read_only:
attach_options.append(("READ_ONLY", None)) # No value assigned

# Convert options to string
attach_options_str = ", ".join(
f"{k} {v}" if v is not None else k for k, v in attach_options
)
conn.execute(attach_stmt)

attach_stmt = (f"ATTACH '{self._dsn}' AS {self._duckdb_alias} ({attach_options_str});")
return attach_stmt
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ openpyxl
pip-tools
pre-commit
psycopg2-binary
psycopg[binary]
pyiceberg
pytest
pytest-dotenv
pytest-logbook
pytest-csv
pytest-xdist
pytest-mock
testcontainers[postgres]
pytz
ruff
sqlalchemy
Expand Down
Loading

0 comments on commit 5b953b3

Please sign in to comment.