Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1848371 adding connection.is_valid to perform connection validation on TCP/IP and Session levels #2117

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne

- v3.12.5(TBD)
- Added a feature to limit the sizes of IO-bound ThreadPoolExecutors during PUT and GET commands.
- Added a feature to verify if the connection is still good enough to send queries over
sfc-gh-dszmolka marked this conversation as resolved.
Show resolved Hide resolved

- v3.12.4(December 3,2024)
- Fixed a bug where multipart uploads to Azure would be missing their MD5 hashes.
Expand Down
24 changes: 22 additions & 2 deletions src/snowflake/connector/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,7 @@ def _log_telemetry(self, telemetry_data) -> None:
self._telemetry.try_add_log_to_batch(telemetry_data)

def _add_heartbeat(self) -> None:
"""Add an hourly heartbeat query in order to keep connection alive."""
"""Add a periodic heartbeat query in order to keep connection alive."""
if not self.heartbeat_thread:
self._validate_client_session_keep_alive_heartbeat_frequency()
heartbeat_wref = weakref.WeakMethod(self._heartbeat_tick)
Expand All @@ -1694,7 +1694,7 @@ def _cancel_heartbeat(self) -> None:
logger.debug("stopped heartbeat")

def _heartbeat_tick(self) -> None:
"""Execute a hearbeat if connection isn't closed yet."""
"""Execute a heartbeat if connection isn't closed yet."""
if not self.is_closed():
logger.debug("heartbeating!")
self.rest._heartbeat()
Expand Down Expand Up @@ -1981,3 +1981,23 @@ def _log_telemetry_imported_packages(self) -> None:
connection=self,
)
)

def is_valid(self) -> bool:
"""This function tries to answer the question: Is this connection still good for sending queries?
Attempts to validate the connections both on the TCP/IP and Session levels."""
logger.debug("validating connection and session")
if self.is_closed():
logger.debug("connection is already closed and not valid")
return False
else:
sfc-gh-dszmolka marked this conversation as resolved.
Show resolved Hide resolved
try:
logger.debug("trying to heartbeat into the session to validate")
hb_result = self.rest._heartbeat()
session_valid = hb_result.get("success")
logger.debug("session still valid? %s", str(session_valid))
return session_valid
except Exception as e:
logger.debug(
"session could not be validated due to exception: %s", str(e)
sfc-gh-dszmolka marked this conversation as resolved.
Show resolved Hide resolved
)
return False
10 changes: 10 additions & 0 deletions test/integ/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,3 +1462,13 @@ def test_disable_telemetry(conn_cnx, caplog):
cur.execute("select 1").fetchall()
assert not conn.telemetry_enabled
assert "POST /telemetry/send" not in caplog.text


@pytest.mark.skipolddriver
def test_is_valid(conn_cnx):
"""Tests whether connection and session validation happens."""
with conn_cnx() as conn:
assert conn
assert conn.is_valid() is True
conn.close()
assert conn.is_valid() is False
sfc-gh-dszmolka marked this conversation as resolved.
Show resolved Hide resolved
Loading