diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 066f8c7a..6356876e 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -19,6 +19,8 @@ // Set *default* container specific settings.json values on container create. "settings": { "python.defaultInterpreterPath": "/usr/local/bin/python", + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, "python.linting.enabled": true, "python.linting.flake8Enabled": true, "python.linting.mypyEnabled": true, @@ -49,11 +51,21 @@ ] } }, + // "features": { + // // Allow the devcontainer to run host docker commands, see https://github.com/devcontainers/templates/tree/main/src/docker-outside-of-docker + // "ghcr.io/devcontainers/features/docker-outside-of-docker:1": { + // "enableNonRootDocker": true + // } + // }, + // "mounts": [ + // "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind" + // ], // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "pip3 install --user -r requirements.txt", // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", + "workspaceFolder": "/workspaces/dbt-duckdb", "postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt" } diff --git a/.gitignore b/.gitignore index 75c102c1..144946ac 100644 --- a/.gitignore +++ b/.gitignore @@ -80,3 +80,5 @@ target/ .idea/ .vscode/ .env + +.venv diff --git a/dbt/adapters/duckdb/plugins/postgres.py b/dbt/adapters/duckdb/plugins/postgres.py index 2b14f5fb..95132926 100644 --- a/dbt/adapters/duckdb/plugins/postgres.py +++ b/dbt/adapters/duckdb/plugins/postgres.py @@ -1,35 +1,132 @@ from typing import Any from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple from duckdb import DuckDBPyConnection from . import BasePlugin +from dbt.adapters.events.logging import AdapterLogger + +PG_EXT = "postgres" class Plugin(BasePlugin): + logger = AdapterLogger("DuckDB_PostgresPlugin") + + def __init__(self, name: str, plugin_config: Dict[str, Any]): + """ + Initialize the Plugin with a name and configuration. + """ + super().__init__(name, plugin_config) + self.logger.debug( + "Plugin __init__ called with name: %s and config: %s", name, plugin_config + ) + self.initialize(plugin_config) + def initialize(self, config: Dict[str, Any]): - self._dsn = config.get("dsn") - if self._dsn is None: - raise Exception("'dsn' is a required argument for the postgres plugin!") - self._source_schema = config.get("source_schema", "public") - self._sink_schema = config.get("sink_schema", "main") - self._overwrite = config.get("overwrite", False) - self._filter_pushdown = config.get("filter_pushdown", False) + """ + Initialize the plugin with the provided configuration. + """ + self.logger.debug("Initializing PostgreSQL plugin with config: %s", config) + + self._dsn: str = config["dsn"] + if not self._dsn: + self.logger.error( + "Initialization failed: 'dsn' is a required argument for the postgres plugin!" + ) + raise ValueError("'dsn' is a required argument for the postgres plugin!") + + self._pg_schema: Optional[str] = config.get("pg_schema") # Can be None + self._duckdb_alias: str = config.get("duckdb_alias", "postgres_db") + self._read_only: bool = config.get("read_only", False) + self._secret: Optional[str] = config.get("secret") + self._attach_options: Dict[str, Any] = config.get( + "attach_options", {} + ) # Additional ATTACH options + self._settings: Dict[str, Any] = config.get( + "settings", {} + ) # Extension settings via SET commands + + self.logger.info( + "PostgreSQL plugin initialized with dsn='%s', pg_schema='%s', " + "duckdb_alias='%s', read_only=%s, secret='%s'", + self._dsn, + self._pg_schema, + self._duckdb_alias, + self._read_only, + self._secret, + ) def configure_connection(self, conn: DuckDBPyConnection): - conn.install_extension("postgres") - conn.load_extension("postgres") - - if self._sink_schema: - conn.execute(f"CREATE SCHEMA IF NOT EXISTS {self._sink_schema}") - - attach_args = [ - ("source_schema", f"'{self._source_schema}'"), - ("sink_schema", f"'{self._sink_schema}'"), - ("overwrite", str(self._overwrite).lower()), - ("filter_pushdown", str(self._filter_pushdown).lower()), - ] - attach_stmt = ( - f"CALL postgres_attach('{self._dsn}', {', '.join(f'{k}={v}' for k, v in attach_args)})" + """ + Configure the DuckDB connection to attach the PostgreSQL database. + """ + self.logger.debug("Configuring DuckDB connection for PostgreSQL plugin.") + + conn.install_extension(PG_EXT) + conn.load_extension(PG_EXT) + self.logger.info("PostgreSQL extension installed and loaded.") + + # Set any extension settings provided + self._set_extension_settings(conn) + + # Build and execute the ATTACH command + attach_stmt = self._build_attach_statement() + self.logger.debug("Executing ATTACH statement: %s", attach_stmt) + try: + conn.execute(attach_stmt) + self.logger.info("Successfully attached PostgreSQL database with DSN: %s", self._dsn) + except Exception as e: + self.logger.error("Failed to attach PostgreSQL database: %s", e) + raise + + def _set_extension_settings(self, conn: DuckDBPyConnection): + """ + Set extension settings via SET commands. + """ + for setting, value in self._settings.items(): + # Quote string values + if isinstance(value, str): + value = f"'{value}'" + elif isinstance(value, bool): + value = "true" if value else "false" + set_stmt = f"SET {setting} = {value};" + self.logger.debug("Setting extension option: %s", set_stmt) + try: + conn.execute(set_stmt) + except Exception as e: + self.logger.error("Failed to set option %s: %s", setting, e) + raise + + def _build_attach_statement(self) -> str: + """ + Build the ATTACH statement for connecting to the PostgreSQL database. + """ + attach_options: List[Tuple[str, Optional[str]]] = [("TYPE", "POSTGRES")] + + if self._pg_schema: + attach_options.append(("SCHEMA", f"'{self._pg_schema}'")) + + if self._secret: + attach_options.append(("SECRET", f"'{self._secret}'")) + + # Additional attach options + for k, v in self._attach_options.items(): + if isinstance(v, bool): + v = "true" if v else "false" + elif isinstance(v, str): + v = f"'{v}'" + attach_options.append((k.upper(), v)) + + if self._read_only: + attach_options.append(("READ_ONLY", None)) # No value assigned + + # Convert options to string + attach_options_str = ", ".join( + f"{k} {v}" if v is not None else k for k, v in attach_options ) - conn.execute(attach_stmt) + + attach_stmt = f"ATTACH '{self._dsn}' AS {self._duckdb_alias} ({attach_options_str});" + return attach_stmt diff --git a/dev-requirements.txt b/dev-requirements.txt index bc44c6ca..dcc07d1f 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -20,6 +20,7 @@ openpyxl pip-tools pre-commit psycopg2-binary +psycopg[binary] pyiceberg pytest pytest-dotenv @@ -27,6 +28,7 @@ pytest-logbook pytest-csv pytest-xdist pytest-mock +testcontainers[postgres] pytz ruff sqlalchemy diff --git a/tests/functional/plugins/test_postgres.py b/tests/functional/plugins/test_postgres.py index 5a250250..04ef3ee6 100644 --- a/tests/functional/plugins/test_postgres.py +++ b/tests/functional/plugins/test_postgres.py @@ -1,43 +1,372 @@ +import os + import pytest +from testcontainers.postgres import PostgresContainer + +from dbt.adapters.duckdb.plugins.postgres import Plugin +from dbt.adapters.events.logging import AdapterLogger +from dbt.tests.util import run_dbt, run_sql_with_adapter + +logger = AdapterLogger("DuckDB_PostgresPlugin") + -from dbt.tests.util import ( - run_dbt, +@pytest.mark.skip( + reason=""" +Skipping this b/c it requires running a properly setup Postgres server +when testing it locally and also b/c I think there is something +wrong with profiles_config_update since it can't be used in multiple +tests in the same pytest session + +Exercise locally with: pytest --profile=file tests/functional/plugins/test_postgres.py +there are networking issues testing this in a devcontainer, more here https://github.com/testcontainers/testcontainers-python/issues/538 +the test works outside the devcontainer though""" ) +@pytest.mark.usefixtures("postgres_container") +class TestPostgresPluginWithDbt: + @pytest.fixture(scope="class") + def postgres_container(self): + """ + Fixture to set up a PostgreSQL container using Testcontainers. + """ + postgres = PostgresContainer("postgres:16-alpine") + postgres.start() + + # Set environment variables for the PostgreSQL connection + os.environ["DB_HOST"] = postgres.get_container_host_ip() + os.environ["DB_PORT"] = postgres.get_exposed_port(5432) + os.environ["DB_USERNAME"] = postgres.username + os.environ["DB_PASSWORD"] = postgres.password + os.environ["DB_NAME"] = postgres.dbname + + # Log environment variables for debugging + logger.info(f"DB_HOST={os.getenv('DB_HOST')}") + logger.info(f"DB_PORT={os.getenv('DB_PORT')}") + logger.info(f"DB_USERNAME={os.getenv('DB_USERNAME')}") + logger.info(f"DB_PASSWORD={os.getenv('DB_PASSWORD')}") + logger.info(f"DB_NAME={os.getenv('DB_NAME')}") + + yield postgres + + # Cleanup after tests + postgres.stop() + + @pytest.fixture(scope="class", autouse=True) + def attach_postgres_db(self, project, postgres_container): + import time + + from psycopg import OperationalError, connect + + db_host = os.getenv("DB_HOST") + db_port = os.getenv("DB_PORT") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") + # Wait for the PostgreSQL container to be ready + retries = 5 + while retries > 0: + try: + with connect( + host=db_host, + port=db_port, + user=db_username, + password=db_password, + dbname=db_name, + ) as conn: + logger.info("PostgreSQL container is ready.") + break + except OperationalError as e: + logger.warning(f"PostgreSQL not ready, retrying... ({e})") + retries -= 1 + time.sleep(2) + else: + raise RuntimeError("PostgreSQL container not ready after retries.") + + # Load DuckDB's postgres extension + run_sql_with_adapter(project.adapter, "INSTALL postgres;") + run_sql_with_adapter(project.adapter, "LOAD postgres;") + + # Build the DSN + dsn = ( + f"host={db_host} " + f"port={db_port} " + f"dbname={db_name} " + f"user={db_username} " + f"password={db_password}" + ) + + # Attach the Postgres database + attach_sql = f""" + ATTACH '{dsn}' AS postgres_db (TYPE POSTGRES); + """ + run_sql_with_adapter(project.adapter, attach_sql) + + # Create the 'foo' table in PostgreSQL via DuckDB connection + create_table_sql = """ + CREATE TABLE postgres_db.public.foo ( + id INTEGER PRIMARY KEY, + i INTEGER + ); + """ + run_sql_with_adapter(project.adapter, create_table_sql) + + # Insert data into 'foo' table + insert_sql = """ + INSERT INTO postgres_db.public.foo (id, i) VALUES (1, 2), (2, 2), (3, 2); + """ + run_sql_with_adapter(project.adapter, insert_sql) -# Skipping this b/c it requires running a properly setup Postgres server -# when testing it locally and also b/c I think there is something -# wrong with profiles_config_update since it can't be used in multiple -# tests in the same pytest session -# -# Exercise locally with: pytest --profile=file tests/functional/plugins/test_postgres.py -@pytest.mark.skip -class TestPostgresPlugin: @pytest.fixture(scope="class") - def profiles_config_update(self, dbt_profile_target): - config = {"dsn": "dbname=postgres", "sink_schema": "plugins", "overwrite": True} - if "path" not in dbt_profile_target: - return {} + def models(self): return { - "test": { - "outputs": { - "dev": { - "type": "duckdb", - "path": dbt_profile_target["path"], - "plugins": [{"module": "postgres", "config": config}], - } - }, - "target": "dev", - } + "my_model.sql": """ + select * from postgres_db.public.foo + """ } - @pytest.fixture(scope="class") - def models(self): - return {"pg_model.sql": "SELECT * FROM plugins.foo"} + def test_postgres_plugin_attach(self, project): + """ + Test to verify that the PostgreSQL database is attached correctly. + """ + # Run SQL to show databases + result = run_sql_with_adapter(project.adapter, "SHOW DATABASES;", fetch="all") + aliases = [row[0] for row in result] + assert "postgres_db" in aliases, "Database 'postgres_db' should be attached." + + def test_postgres_plugin_read(self, project): + """ + Test to verify reading data from the PostgreSQL table. + """ + # Run dbt models + results = run_dbt(["run"]) + assert len(results) > 0, "Expected at least one model to run successfully." + + # Get the test schema + test_schema = project.test_schema + + # Verify the data using the fully qualified table name + sum_i = run_sql_with_adapter( + project.adapter, f"SELECT SUM(i) FROM {test_schema}.my_model;", fetch="one" + )[0] + assert sum_i == 6, "The sum of 'i' should be 6." + + def test_postgres_plugin_write(self, project): + """ + Test to verify writing data to the PostgreSQL database via DuckDB. + """ + # Insert a record into 'foo' via DuckDB + insert_sql = """ + INSERT INTO postgres_db.public.foo (id, i) VALUES (4, 4); + """ + run_sql_with_adapter(project.adapter, insert_sql) + + # Verify the inserted record + res = run_sql_with_adapter( + project.adapter, + "SELECT i FROM postgres_db.public.foo WHERE id = 4;", + fetch="one", + ) + assert res[0] == 4, "The value of 'i' should be 4." + + def test_postgres_plugin_read_only(self, project): + """ + Test to verify that write operations fail when read_only is True. + """ + # Detach the existing postgres_db + run_sql_with_adapter(project.adapter, "DETACH DATABASE postgres_db;") + + # Re-attach with read_only=True + db_host = os.getenv("DB_HOST") + db_port = os.getenv("DB_PORT") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") + + dsn = ( + f"host={db_host} " + f"port={db_port} " + f"dbname={db_name} " + f"user={db_username} " + f"password={db_password}" + ) + + sql = f""" + ATTACH '{dsn}' AS postgres_db_ro (TYPE POSTGRES, READ_ONLY); + """ + run_sql_with_adapter(project.adapter, sql) + + # Try to perform a write operation and expect it to fail + with pytest.raises(Exception) as exc_info: + run_sql_with_adapter( + project.adapter, + """ + CREATE TABLE postgres_db_ro.public.bar ( + id INTEGER PRIMARY KEY, + value TEXT + ); + """, + ) + assert ( + "read-only" in str(exc_info.value).lower() + ), "Write operation should fail in read-only mode." + + def test_postgres_plugin_extension_settings(self, project): + """ + Test to verify that extension settings are applied correctly. + """ + # Set the extension setting + run_sql_with_adapter(project.adapter, "SET pg_use_binary_copy = true;") + + # Verify the current setting of 'pg_use_binary_copy' + res = run_sql_with_adapter( + project.adapter, + "SELECT current_setting('pg_use_binary_copy');", + fetch="one", + ) + assert res[0] == True, "The setting 'pg_use_binary_copy' should be 'true'." + + def test_postgres_plugin_attach_options(self, project): + """ + Test to verify that additional ATTACH options are applied correctly. + """ + # Detach any existing databases + run_sql_with_adapter(project.adapter, "DETACH DATABASE IF EXISTS postgres_db;") + run_sql_with_adapter( + project.adapter, "DETACH DATABASE IF EXISTS postgres_db_custom;" + ) + + # Re-attach with a custom alias and additional settings + db_host = os.getenv("DB_HOST") + db_port = os.getenv("DB_PORT") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") + + # Construct the DSN + dsn = ( + f"host={db_host} " + f"port={db_port} " + f"dbname={db_name} " + f"user={db_username} " + f"password={db_password}" + ) + + # Attach the Postgres database with a custom alias + attach_sql = f""" + ATTACH '{dsn}' AS postgres_db_custom (TYPE POSTGRES); + """ + run_sql_with_adapter(project.adapter, attach_sql) + + # Set the additional setting for pg_null_byte_replacement + set_sql = "SET pg_null_byte_replacement = '?';" + + # Verify that the setting was applied + validate_sql = "SELECT current_setting('pg_null_byte_replacement');" + res = run_sql_with_adapter(project.adapter, set_sql + validate_sql, fetch="one") + logger.info(f"pg_null_byte_replacement setting: {res[0]}") + assert res[0] == "?", "The 'pg_null_byte_replacement' should be '?'." + + # Verify that the database is attached with the custom alias + result = run_sql_with_adapter(project.adapter, "SHOW DATABASES;", fetch="all") + aliases = [row[0] for row in result] + assert ( + "postgres_db_custom" in aliases + ), "Database 'postgres_db_custom' should be attached." + + # Detach the database after validation + run_sql_with_adapter(project.adapter, "DETACH DATABASE postgres_db_custom;") + + def test_postgres_plugin_pg_array_as_varchar(self, project): + """ + Test to verify that 'pg_array_as_varchar' setting works correctly. + """ + # Detach the existing postgres_db + run_sql_with_adapter(project.adapter, "DETACH DATABASE IF EXISTS postgres_db;") + + # Re-attach without SETTINGS + db_host = os.getenv("DB_HOST") + db_port = os.getenv("DB_PORT") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") + + dsn = ( + f"host={db_host} " + f"port={db_port} " + f"dbname={db_name} " + f"user={db_username} " + f"password={db_password}" + ) + + sql = f""" + ATTACH '{dsn}' AS postgres_db_array (TYPE POSTGRES); + """ + run_sql_with_adapter(project.adapter, sql) + + # Set the 'pg_array_as_varchar' setting + set_sql = "SET pg_array_as_varchar = true;" + run_sql_with_adapter(project.adapter, set_sql) + + # Create a table with an array column + create_table_sql = """ + CREATE TABLE postgres_db_array.public.array_table ( + id INTEGER PRIMARY KEY, + arr INTEGER[] + ); + """ + run_sql_with_adapter(project.adapter, create_table_sql) + + # Insert data into the array_table + insert_sql = """ + INSERT INTO postgres_db_array.public.array_table (id, arr) VALUES (1, ARRAY[1,2,3]); + """ + run_sql_with_adapter(project.adapter, insert_sql) + + # Fetch the array and expect a list + res = run_sql_with_adapter( + project.adapter, + "SELECT arr FROM postgres_db_array.public.array_table WHERE id = 1;", + fetch="one", + ) + assert res[0] == [1, 2, 3], "The array should be fetched as a list [1, 2, 3]." + + def test_postgres_plugin_error_handling(self, project): + """ + Test to verify that invalid configurations raise appropriate errors. + """ + # Try to attach without a DSN + with pytest.raises(Exception) as exc_info: + run_sql_with_adapter( + project.adapter, "ATTACH '' AS postgres_db_error (TYPE POSTGRES);" + ) + assert ( + "unable to connect to postgres" in str(exc_info.value).lower() + ), "Should raise exception for missing 'dsn'" + + # Provide a valid DSN for the next test + db_host = os.getenv("DB_HOST") + db_port = os.getenv("DB_PORT") + db_username = os.getenv("DB_USERNAME") + db_password = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") - def test_postgres_plugin(self, project): - results = run_dbt() - assert len(results) == 1 + dsn = ( + f"host={db_host} " + f"port={db_port} " + f"dbname={db_name} " + f"user={db_username} " + f"password={db_password}" + ) - res = project.run_sql("SELECT SUM(i) FROM pg_model", fetch="one") - assert res[0] == 6 \ No newline at end of file + # Try to set an invalid setting + with pytest.raises(Exception) as exc_info: + run_sql_with_adapter( + project.adapter, + f""" + ATTACH '{dsn}' AS postgres_db_error (TYPE POSTGRES); + SET invalid_setting = 'true'; + """, + ) + assert ( + "unrecognized configuration parameter" in str(exc_info.value).lower() + ), "Should raise syntax error for invalid setting"