From d1e48ae0dff01ff47dab738e297f71dc6b4d5b6e Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 29 Nov 2023 10:43:18 -0800 Subject: [PATCH 1/6] Add support for retrying certain types of exceptions we see when running models in dbt-duckdb --- dbt/adapters/duckdb/credentials.py | 10 ++++ dbt/adapters/duckdb/environments/__init__.py | 43 ++++++++++++++++ tests/functional/plugins/test_plugins.py | 1 + tests/unit/test_retries.py | 54 ++++++++++++++++++++ 4 files changed, 108 insertions(+) create mode 100644 tests/unit/test_retries.py diff --git a/dbt/adapters/duckdb/credentials.py b/dbt/adapters/duckdb/credentials.py index 16e80772..99e4134a 100644 --- a/dbt/adapters/duckdb/credentials.py +++ b/dbt/adapters/duckdb/credentials.py @@ -70,6 +70,11 @@ class Remote(dbtClassMixin): password: Optional[str] = None +@dataclass +class Retries(dbtClassMixin): + max_attempts: int + + @dataclass class DuckDBCredentials(Credentials): database: str = "main" @@ -135,6 +140,11 @@ class DuckDBCredentials(Credentials): # provide helper functions for dbt Python models. module_paths: Optional[List[str]] = None + # An optional strategy for allowing retries when certain types of + # exceptions occur on a model run (e.g., IOExceptions that were caused + # by networking issues) + retries: Optional[Retries] = None + @classmethod def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: data = super().__pre_deserialize__(data) diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 130a10fd..e6cbe4da 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -3,12 +3,14 @@ import os import sys import tempfile +import time from typing import Dict from typing import Optional import duckdb from ..credentials import DuckDBCredentials +from ..credentials import Retries from ..plugins import BasePlugin from ..utils import SourceConfig from ..utils import TargetConfig @@ -31,6 +33,44 @@ def _ensure_event_loop(): asyncio.set_event_loop(loop) +class RetryableCursor: + _RETRYABLE_EXCEPTIONS = {duckdb.duckdb.IOException} + + def __init__(self, cursor, retries: Retries): + self._cursor = cursor + self._retries = retries + + def execute(self, sql: str, bindings=None): + attempt, success, exc = 0, False, None + while not success and attempt < self._retries.max_attempts: + try: + if bindings is None: + self._cursor.execute(sql) + else: + self._cursor.execute(sql, bindings) + success = True + except Exception as e: + if type(e) not in self._RETRYABLE_EXCEPTIONS: + raise e + else: + # TODO: this is crude and should be made smarter + time.sleep(2**attempt) + exc = e + attempt += 1 + if not success: + if exc: + raise exc + else: + raise RuntimeError( + "execute call failed, but no exceptions raised- this should be impossible" + ) + return self + + # forward along all non-execute() methods/attribute look-ups + def __getattr__(self, name): + return getattr(self._cursor, name) + + class Environment(abc.ABC): """An Environment is an abstraction to describe *where* the code you execute in your dbt-duckdb project actually runs. This could be the local Python process that runs dbt (which is the default), @@ -127,6 +167,9 @@ def initialize_cursor( for df_name, df in registered_df.items(): cursor.register(df_name, df) + if creds.retries: + cursor = RetryableCursor(cursor, creds.retries) + return cursor @classmethod diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py index 4dfecedd..d4273a67 100644 --- a/tests/functional/plugins/test_plugins.py +++ b/tests/functional/plugins/test_plugins.py @@ -91,6 +91,7 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db): "type": "duckdb", "path": dbt_profile_target.get("path", ":memory:"), "plugins": plugins, + "retries": {"max_attempts": 2}, } }, "target": "dev", diff --git a/tests/unit/test_retries.py b/tests/unit/test_retries.py new file mode 100644 index 00000000..d9a5129d --- /dev/null +++ b/tests/unit/test_retries.py @@ -0,0 +1,54 @@ +import pytest +from unittest.mock import MagicMock +from unittest.mock import patch + +import duckdb + +from dbt.adapters.duckdb.credentials import Retries +from dbt.adapters.duckdb.environments import RetryableCursor + +class TestRetryableCursor: + + @pytest.fixture + def mock_cursor(self): + return MagicMock() + + @pytest.fixture + def mock_retries(self): + return Retries(max_attempts=3) + + def test_successful_execute(self, mock_cursor, mock_retries): + """ Test that execute successfully runs the SQL query. """ + retry_cursor = RetryableCursor(mock_cursor, mock_retries) + sql_query = "SELECT * FROM table" + retry_cursor.execute(sql_query) + mock_cursor.execute.assert_called_once_with(sql_query) + + def test_retry_on_failure(self, mock_cursor, mock_retries): + """ Test that execute retries the SQL query on failure. """ + mock_cursor.execute.side_effect = [duckdb.duckdb.IOException, None] + retry_cursor = RetryableCursor(mock_cursor, mock_retries) + sql_query = "SELECT * FROM table" + retry_cursor.execute(sql_query) + assert mock_cursor.execute.call_count == 2 + + def test_no_retry_on_non_retryable_exception(self, mock_cursor, mock_retries): + """ Test that a non-retryable exception is not retried. """ + mock_cursor.execute.side_effect = ValueError + retry_cursor = RetryableCursor(mock_cursor, mock_retries) + sql_query = "SELECT * FROM table" + with pytest.raises(ValueError): + retry_cursor.execute(sql_query) + mock_cursor.execute.assert_called_once_with(sql_query) + + def test_exponential_backoff(self, mock_cursor, mock_retries): + """ Test that exponential backoff is applied between retries. """ + mock_cursor.execute.side_effect = [duckdb.duckdb.IOException, duckdb.duckdb.IOException, None] + retry_cursor = RetryableCursor(mock_cursor, mock_retries) + sql_query = "SELECT * FROM table" + + with patch("time.sleep") as mock_sleep: + retry_cursor.execute(sql_query) + assert mock_sleep.call_count == 2 + mock_sleep.assert_any_call(1) + mock_sleep.assert_any_call(2) From 4e5c4b9122e97ba04d259d11c122767b76d31c74 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Wed, 6 Dec 2023 07:47:39 -0800 Subject: [PATCH 2/6] Generalize this a bit while we figure out what exactly is getting thrown --- dbt/adapters/duckdb/environments/__init__.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index e6cbe4da..31657029 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -50,13 +50,10 @@ def execute(self, sql: str, bindings=None): self._cursor.execute(sql, bindings) success = True except Exception as e: - if type(e) not in self._RETRYABLE_EXCEPTIONS: - raise e - else: - # TODO: this is crude and should be made smarter - time.sleep(2**attempt) - exc = e - attempt += 1 + print(f"Retry cursor caught exception of type {type(e)}: {e}") + time.sleep(2**attempt) + exc = e + attempt += 1 if not success: if exc: raise exc From 1903227dc56ea119e46a2ec2430e62c65636ac95 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 22 Dec 2023 08:34:18 -0800 Subject: [PATCH 3/6] generalize this for connect retries as well as query retries --- dbt/adapters/duckdb/credentials.py | 11 +++- dbt/adapters/duckdb/environments/__init__.py | 56 +++++++++++++++----- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/dbt/adapters/duckdb/credentials.py b/dbt/adapters/duckdb/credentials.py index 99e4134a..a6d99806 100644 --- a/dbt/adapters/duckdb/credentials.py +++ b/dbt/adapters/duckdb/credentials.py @@ -72,7 +72,16 @@ class Remote(dbtClassMixin): @dataclass class Retries(dbtClassMixin): - max_attempts: int + # The number of times to attempt the initial duckdb.connect call + # (to wait for another process to free the lock on the DB file) + connect_attempts: int = 1 + + # The number of times to attempt to execute a DuckDB query that throws + # one of the retryable exceptions + query_attempts: Optional[int] = None + + # The list of exceptions that we are willing to retry on + retryable_exceptions: List[str] = ["IOException"] @dataclass diff --git a/dbt/adapters/duckdb/environments/__init__.py b/dbt/adapters/duckdb/environments/__init__.py index 31657029..273b3122 100644 --- a/dbt/adapters/duckdb/environments/__init__.py +++ b/dbt/adapters/duckdb/environments/__init__.py @@ -5,12 +5,12 @@ import tempfile import time from typing import Dict +from typing import List from typing import Optional import duckdb from ..credentials import DuckDBCredentials -from ..credentials import Retries from ..plugins import BasePlugin from ..utils import SourceConfig from ..utils import TargetConfig @@ -34,15 +34,14 @@ def _ensure_event_loop(): class RetryableCursor: - _RETRYABLE_EXCEPTIONS = {duckdb.duckdb.IOException} - - def __init__(self, cursor, retries: Retries): + def __init__(self, cursor, retry_attempts: int, retryable_exceptions: List[str]): self._cursor = cursor - self._retries = retries + self._retry_attempts = retry_attempts + self._retryable_exceptions = retryable_exceptions def execute(self, sql: str, bindings=None): attempt, success, exc = 0, False, None - while not success and attempt < self._retries.max_attempts: + while not success and attempt < self._retry_attempts: try: if bindings is None: self._cursor.execute(sql) @@ -50,10 +49,14 @@ def execute(self, sql: str, bindings=None): self._cursor.execute(sql, bindings) success = True except Exception as e: - print(f"Retry cursor caught exception of type {type(e)}: {e}") - time.sleep(2**attempt) - exc = e - attempt += 1 + exception_name = type(e).__name__ + if exception_name in self._retryable_exceptions: + time.sleep(2**attempt) + exc = e + attempt += 1 + else: + print(f"Did not retry exception named '{exception_name}'") + raise e if not success: if exc: raise exc @@ -111,7 +114,32 @@ def initialize_db( cls, creds: DuckDBCredentials, plugins: Optional[Dict[str, BasePlugin]] = None ): config = creds.config_options or {} - conn = duckdb.connect(creds.path, read_only=False, config=config) + + if creds.retries: + success, attempt, exc = False, 0, None + while not success and attempt < creds.retries.connect_attempts: + try: + conn = duckdb.connect(creds.path, read_only=False, config=config) + success = True + except Exception as e: + exception_name = type(e).__name__ + if exception_name in creds.retries.retryable_exceptions: + time.sleep(2**attempt) + exc = e + attempt += 1 + else: + print(f"Did not retry exception named '{exception_name}'") + raise e + if not success: + if exc: + raise exc + else: + raise RuntimeError( + "connect call failed, but no exceptions raised- this should be impossible" + ) + + else: + conn = duckdb.connect(creds.path, read_only=False, config=config) # install any extensions on the connection if creds.extensions is not None: @@ -164,8 +192,10 @@ def initialize_cursor( for df_name, df in registered_df.items(): cursor.register(df_name, df) - if creds.retries: - cursor = RetryableCursor(cursor, creds.retries) + if creds.retries and creds.retries.query_attempts: + cursor = RetryableCursor( + cursor, creds.retries.query_attempts, creds.retries.retryable_exceptions + ) return cursor From 1c8017ddac78cb09605e8363854408c95f0ecf1c Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 22 Dec 2023 08:43:27 -0800 Subject: [PATCH 4/6] Make the query tests pass again --- dbt/adapters/duckdb/credentials.py | 3 ++- tests/unit/test_retries.py | 21 ++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/duckdb/credentials.py b/dbt/adapters/duckdb/credentials.py index a6d99806..bafe4a00 100644 --- a/dbt/adapters/duckdb/credentials.py +++ b/dbt/adapters/duckdb/credentials.py @@ -1,6 +1,7 @@ import os import time from dataclasses import dataclass +from dataclasses import field from functools import lru_cache from typing import Any from typing import Dict @@ -81,7 +82,7 @@ class Retries(dbtClassMixin): query_attempts: Optional[int] = None # The list of exceptions that we are willing to retry on - retryable_exceptions: List[str] = ["IOException"] + retryable_exceptions: List[str] = field(default_factory=lambda: ["IOException"]) @dataclass diff --git a/tests/unit/test_retries.py b/tests/unit/test_retries.py index d9a5129d..39c50fc7 100644 --- a/tests/unit/test_retries.py +++ b/tests/unit/test_retries.py @@ -15,36 +15,39 @@ def mock_cursor(self): @pytest.fixture def mock_retries(self): - return Retries(max_attempts=3) + return Retries(query_attempts=3) - def test_successful_execute(self, mock_cursor, mock_retries): + @pytest.fixture + def retry_cursor(self, mock_cursor, mock_retries): + return RetryableCursor( + mock_cursor, + mock_retries.query_attempts, + mock_retries.retryable_exceptions) + + def test_successful_execute(self, mock_cursor, retry_cursor): """ Test that execute successfully runs the SQL query. """ - retry_cursor = RetryableCursor(mock_cursor, mock_retries) sql_query = "SELECT * FROM table" retry_cursor.execute(sql_query) mock_cursor.execute.assert_called_once_with(sql_query) - def test_retry_on_failure(self, mock_cursor, mock_retries): + def test_retry_on_failure(self, mock_cursor, retry_cursor): """ Test that execute retries the SQL query on failure. """ mock_cursor.execute.side_effect = [duckdb.duckdb.IOException, None] - retry_cursor = RetryableCursor(mock_cursor, mock_retries) sql_query = "SELECT * FROM table" retry_cursor.execute(sql_query) assert mock_cursor.execute.call_count == 2 - def test_no_retry_on_non_retryable_exception(self, mock_cursor, mock_retries): + def test_no_retry_on_non_retryable_exception(self, mock_cursor, retry_cursor): """ Test that a non-retryable exception is not retried. """ mock_cursor.execute.side_effect = ValueError - retry_cursor = RetryableCursor(mock_cursor, mock_retries) sql_query = "SELECT * FROM table" with pytest.raises(ValueError): retry_cursor.execute(sql_query) mock_cursor.execute.assert_called_once_with(sql_query) - def test_exponential_backoff(self, mock_cursor, mock_retries): + def test_exponential_backoff(self, mock_cursor, retry_cursor): """ Test that exponential backoff is applied between retries. """ mock_cursor.execute.side_effect = [duckdb.duckdb.IOException, duckdb.duckdb.IOException, None] - retry_cursor = RetryableCursor(mock_cursor, mock_retries) sql_query = "SELECT * FROM table" with patch("time.sleep") as mock_sleep: From 8d61a069c85cbcc082d973085167e8bb332904dd Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Fri, 22 Dec 2023 09:05:35 -0800 Subject: [PATCH 5/6] Test updates for the new retry settings/config --- tests/functional/plugins/test_plugins.py | 2 +- tests/unit/test_retries_connect.py | 36 +++++++++++++++++++ ...{test_retries.py => test_retries_query.py} | 0 3 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_retries_connect.py rename tests/unit/{test_retries.py => test_retries_query.py} (100%) diff --git a/tests/functional/plugins/test_plugins.py b/tests/functional/plugins/test_plugins.py index d4273a67..a577ca7d 100644 --- a/tests/functional/plugins/test_plugins.py +++ b/tests/functional/plugins/test_plugins.py @@ -91,7 +91,7 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db): "type": "duckdb", "path": dbt_profile_target.get("path", ":memory:"), "plugins": plugins, - "retries": {"max_attempts": 2}, + "retries": {"query_attempts": 2}, } }, "target": "dev", diff --git a/tests/unit/test_retries_connect.py b/tests/unit/test_retries_connect.py new file mode 100644 index 00000000..5a3153f5 --- /dev/null +++ b/tests/unit/test_retries_connect.py @@ -0,0 +1,36 @@ +import pytest +from unittest.mock import patch + +from duckdb.duckdb import IOException + +from dbt.adapters.duckdb.credentials import DuckDBCredentials +from dbt.adapters.duckdb.credentials import Retries +from dbt.adapters.duckdb.environments import Environment + +class TestConnectRetries: + + @pytest.fixture + def creds(self): + # Create a mock credentials object + return DuckDBCredentials( + path="foo.db", + retries=Retries(connect_attempts=2) + ) + + @pytest.mark.parametrize("exception", [None, IOException, ValueError]) + def test_initialize_db(self, creds, exception): + # Mocking the duckdb.connect method + with patch('duckdb.connect') as mock_connect: + if exception: + mock_connect.side_effect = [exception, None] + + if exception == ValueError: + with pytest.raises(ValueError) as excinfo: + Environment.initialize_db(creds) + else: + # Call the initialize_db method + Environment.initialize_db(creds) + if exception == IOException: + assert mock_connect.call_count == creds.retries.connect_attempts + else: + mock_connect.assert_called_once_with(creds.path, read_only=False, config={}) diff --git a/tests/unit/test_retries.py b/tests/unit/test_retries_query.py similarity index 100% rename from tests/unit/test_retries.py rename to tests/unit/test_retries_query.py From 6c9dffed811bc21690c77885356f5af24af28faf Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Sat, 23 Dec 2023 13:55:41 -0800 Subject: [PATCH 6/6] Testing other types of exceptions being retryable --- tests/unit/test_retries_connect.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_retries_connect.py b/tests/unit/test_retries_connect.py index 5a3153f5..dbe5b160 100644 --- a/tests/unit/test_retries_connect.py +++ b/tests/unit/test_retries_connect.py @@ -14,10 +14,10 @@ def creds(self): # Create a mock credentials object return DuckDBCredentials( path="foo.db", - retries=Retries(connect_attempts=2) + retries=Retries(connect_attempts=2, retryable_exceptions=["IOException", "ArithmeticError"]) ) - @pytest.mark.parametrize("exception", [None, IOException, ValueError]) + @pytest.mark.parametrize("exception", [None, IOException, ArithmeticError, ValueError]) def test_initialize_db(self, creds, exception): # Mocking the duckdb.connect method with patch('duckdb.connect') as mock_connect: @@ -30,7 +30,7 @@ def test_initialize_db(self, creds, exception): else: # Call the initialize_db method Environment.initialize_db(creds) - if exception == IOException: + if exception in {IOException, ArithmeticError}: assert mock_connect.call_count == creds.retries.connect_attempts else: mock_connect.assert_called_once_with(creds.path, read_only=False, config={})