Skip to content

Commit

Permalink
fix(oracle): avoid double cursor closing by removing unnecessary `clo…
Browse files Browse the repository at this point in the history
…se` in `_fetch_from_cursor` (#9913)

Previously we were trying to close a cursor after an exception was
raised in `raw_sql`, which already closes the cursor in the case of an
exception. This is not allowed by the oracledb driver, so just close the
cursor on success.
  • Loading branch information
cpcloud authored and ncclementi committed Aug 26, 2024
1 parent 8b0fb66 commit 85c1cb6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 42 deletions.
16 changes: 9 additions & 7 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import keyword
import re
import urllib.parse
import weakref
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar

Expand Down Expand Up @@ -34,6 +35,12 @@
class TablesAccessor(collections.abc.Mapping):
"""A mapping-like object for accessing tables off a backend.
::: {.callout-note}
## The `tables` accessor is tied to the lifetime of the backend.
If the backend goes out of scope, the `tables` accessor is no longer valid.
:::
Tables may be accessed by name using either index or attribute access:
Examples
Expand Down Expand Up @@ -804,12 +811,7 @@ def __init__(self, *args, **kwargs):
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs
self._can_reconnect: bool = True
# expression cache
self._query_cache = RefCountedCache(
populate=self._load_into_cache,
lookup=lambda name: self.table(name).op(),
finalize=self._clean_up_cached_table,
)
self._query_cache = RefCountedCache(weakref.proxy(self))

@property
@abc.abstractmethod
Expand Down Expand Up @@ -1017,7 +1019,7 @@ def tables(self):
>>> people = con.tables.people # access via attribute
"""
return TablesAccessor(self)
return TablesAccessor(weakref.proxy(self))

@property
@abc.abstractmethod
Expand Down
21 changes: 21 additions & 0 deletions ibis/backends/duckdb/tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import gc
import os
import subprocess
import sys
Expand Down Expand Up @@ -403,3 +404,23 @@ def test_read_csv_with_types(tmp_path, input, all_varchar):
path.write_bytes(data)
t = con.read_csv(path, all_varchar=all_varchar, **input)
assert t.schema()["geom"].is_geospatial()


def test_tables_accessor_no_reference_cycle():
"""Test that a single reference to a connection has the desired lifetime semantics."""
con = ibis.duckdb.connect()

before = len(gc.get_referrers(con))
tables = con.tables
after = len(gc.get_referrers(con))

assert after == before

# valid call, and there are no tables in the database
assert not list(tables)

del con

# no longer valid because the backend has been manually decref'd
with pytest.raises(ReferenceError):
list(tables)
15 changes: 2 additions & 13 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,19 +623,8 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

from ibis.backends.oracle.converter import OraclePandasData

try:
df = pd.DataFrame.from_records(
cursor, columns=schema.names, coerce_float=True
)
except Exception:
# clean up the cursor if we fail to create the DataFrame
#
# in the sqlite case failing to close the cursor results in
# artificially locked tables
cursor.close()
raise
df = OraclePandasData.convert_table(df, schema)
return df
df = pd.DataFrame.from_records(cursor, columns=schema.names, coerce_float=True)
return OraclePandasData.convert_table(df, schema)

def _clean_up_tmp_table(self, name: str) -> None:
with self.begin() as bind:
Expand Down
12 changes: 12 additions & 0 deletions ibis/backends/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import gc

import pytest
from pytest import param

Expand Down Expand Up @@ -115,6 +117,16 @@ def test_tables_accessor_repr(con):
assert f"- {name}" in result


def test_tables_accessor_no_reference_cycle(con):
before = len(gc.get_referrers(con))
_ = con.tables
after = len(gc.get_referrers(con))

# assert that creating a `tables` accessor object doesn't increase the
# number of strong references
assert after == before


@pytest.mark.parametrize(
"expr_fn",
[
Expand Down
9 changes: 4 additions & 5 deletions ibis/backends/tests/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
MySQLOperationalError,
MySQLProgrammingError,
OracleDatabaseError,
OracleInterfaceError,
PolarsInvalidOperationError,
PolarsPanicException,
PsycoPg2InternalError,
Expand Down Expand Up @@ -505,8 +504,8 @@ def test_date_truncate(backend, alltypes, df, unit):
),
pytest.mark.notyet(
["oracle"],
raises=OracleInterfaceError,
reason="cursor not open, probably a bug in the sql generated",
raises=OracleDatabaseError,
reason="ORA-01839: date not valid for month specified",
),
sqlite_without_ymd_intervals,
],
Expand Down Expand Up @@ -633,8 +632,8 @@ def convert_to_offset(offset, displacement_type=displacement_type):
),
pytest.mark.notyet(
["oracle"],
raises=OracleInterfaceError,
reason="cursor not open, probably a bug in the sql generated",
raises=OracleDatabaseError,
reason="ORA-01839: date not valid for month specified",
),
sqlite_without_ymd_intervals,
],
Expand Down
27 changes: 10 additions & 17 deletions ibis/common/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import functools
import sys
import weakref
from collections import namedtuple
from typing import TYPE_CHECKING, Any
from weakref import finalize, ref

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -39,17 +39,8 @@ class RefCountedCache:
We can implement that interface if and when we need to.
"""

def __init__(
self,
*,
populate: Callable[[str, Any], None],
lookup: Callable[[str], Any],
finalize: Callable[[Any], None],
) -> None:
self.populate = populate
self.lookup = lookup
self.finalize = finalize

def __init__(self, backend: weakref.proxy) -> None:
self.backend = backend
self.cache: dict[Any, CacheEntry] = dict()

def get(self, key, default=None):
Expand All @@ -70,11 +61,13 @@ def store(self, input):

key = input.op()
name = gen_name("cache")
self.populate(name, input)
cached = self.lookup(name)
finalizer = finalize(cached, self._release, key)

self.cache[key] = CacheEntry(name, ref(cached), finalizer)
self.backend._load_into_cache(name, input)

cached = self.backend.table(name).op()
finalizer = weakref.finalize(cached, self._release, key)

self.cache[key] = CacheEntry(name, weakref.ref(cached), finalizer)

return cached

Expand All @@ -88,7 +81,7 @@ def release(self, name: str) -> None:
def _release(self, key) -> None:
entry = self.cache.pop(key)
try:
self.finalize(entry.name)
self.backend._clean_up_cached_table(entry.name)
except Exception:
# suppress exceptions during interpreter shutdown
if not sys.is_finalizing():
Expand Down

0 comments on commit 85c1cb6

Please sign in to comment.