Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make guard to have only one pending reconnect on control connection #462

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 34 additions & 57 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception):
pass


class _ControlReconnectionHandler(_ReconnectionHandler):
"""
Internal
"""

def __init__(self, control_connection, *args, **kwargs):
_ReconnectionHandler.__init__(self, *args, **kwargs)
self.control_connection = weakref.proxy(control_connection)

def try_reconnect(self):
return self.control_connection._reconnect_internal()

def on_reconnection(self, connection):
self.control_connection._set_new_connection(connection)

def on_exception(self, exc, next_delay):
# TODO only overridden to add logging, so add logging
if isinstance(exc, AuthenticationFailed):
return False
else:
log.debug("Error trying to reconnect control connection: %r", exc)
return True


def _watch_callback(obj_weakref, method_name, *args, **kwargs):
"""
A callback handler for the ControlConnection that tolerates
Expand Down Expand Up @@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout,

self._reconnection_handler = None
self._reconnection_lock = RLock()
self._reconnection_pending = False

self._event_schedule_times = {}

Expand Down Expand Up @@ -3695,6 +3672,8 @@ def _connect_host_in_lbp(self):
)

for host in lbp.make_query_plan():
if self._is_shutdown:
break
try:
return (self._try_connect(host), None)
except ConnectionException as exc:
Expand Down Expand Up @@ -3818,44 +3797,47 @@ def reconnect(self):
if self._is_shutdown:
return

with self._reconnection_lock:
if self._reconnection_pending:
return
self._reconnection_pending = True

self._submit(self._reconnect)

def _reconnect(self):
def _reconnect(self, schedule = None):
log.debug("[control connection] Attempting to reconnect")
if self._is_shutdown:
return

try:
self._set_new_connection(self._reconnect_internal())
self._reconnection_pending = False
return
except NoHostAvailable:
# make a retry schedule (which includes backoff)
schedule = self._cluster.reconnection_policy.new_schedule()
log.debug("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt")
except Exception as ex:
log.debug("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s", ex)

with self._reconnection_lock:
if schedule is None:
schedule = self._cluster.reconnection_policy.new_schedule()

# cancel existing reconnection attempts
if self._reconnection_handler:
self._reconnection_handler.cancel()
try:
next_delay = next(schedule)
except StopIteration:
# the schedule has been exhausted
schedule = self._cluster.reconnection_policy.new_schedule()
try:
next_delay = next(schedule)
except StopIteration:
next_delay = 0

# when a connection is successfully made, _set_new_connection
# will be called with the new connection and then our
# _reconnection_handler will be cleared out
self._reconnection_handler = _ControlReconnectionHandler(
self, self._cluster.scheduler, schedule,
self._get_and_set_reconnection_handler,
new_handler=None)
self._reconnection_handler.start()
except Exception:
log.debug("[control connection] error reconnecting", exc_info=True)
raise
if self._is_shutdown:
return

def _get_and_set_reconnection_handler(self, new_handler):
"""
Called by the _ControlReconnectionHandler when a new connection
is successfully created. Clears out the _reconnection_handler on
this ControlConnection.
"""
with self._reconnection_lock:
old = self._reconnection_handler
self._reconnection_handler = new_handler
return old
if next_delay == 0:
self._submit(self._reconnect)
else:
self._cluster.scheduler.schedule(next_delay, partial(self._reconnect, schedule))

def _submit(self, *args, **kwargs):
try:
Expand All @@ -3866,11 +3848,6 @@ def _submit(self, *args, **kwargs):
return None

def shutdown(self):
# stop trying to reconnect (if we are)
with self._reconnection_lock:
if self._reconnection_handler:
self._reconnection_handler.cancel()

with self._lock:
if self._is_shutdown:
return
Expand Down
55 changes: 55 additions & 0 deletions tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,61 @@ def test_invalid_protocol_negotation(self):
cluster.connect()
cluster.shutdown()

def test_control_connection_reconnect(self):
"""
Ensure clusters that connect on a keyspace, do
"""
cassandra.cluster.log.setLevel(logging.DEBUG)

cluster = TestCluster()
_ = cluster.connect()

cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal)

cluster.control_connection.reconnect()
cluster.control_connection.reconnect()
cluster.control_connection.reconnect()
cluster.control_connection.reconnect()

while cluster.control_connection._reconnection_pending:
time.sleep(0.1)

self.assertFalse(cluster.control_connection._connection.is_closed)
self.assertFalse(cluster.control_connection._connection.is_defunct)
self.assertTrue(cluster.control_connection.refresh_schema())
cluster.control_connection._reconnect_internal.assert_called_once()

def test_control_connection_reconnect_rescheduled(self):
"""
Ensure clusters that connect on a keyspace, do
"""
cassandra.cluster.log.setLevel(logging.DEBUG)

cluster = TestCluster()
_ = cluster.connect()

original_reconnect_internal = cluster.control_connection._reconnect_internal
def _throw(*args):
cluster.control_connection._reconnect_internal = Mock(wraps=original_reconnect_internal)
raise NoHostAvailable("Unable to connect to any servers")

cluster.scheduler.schedule = Mock(wraps=cluster.scheduler.schedule)
cluster.control_connection._reconnect_internal = _throw

cluster.control_connection.reconnect()
cluster.control_connection.reconnect()
cluster.control_connection.reconnect()
cluster.control_connection.reconnect()

while cluster.control_connection._reconnection_pending:
time.sleep(0.1)

self.assertFalse(cluster.control_connection._connection.is_closed)
self.assertFalse(cluster.control_connection._connection.is_defunct)
self.assertTrue(cluster.control_connection.refresh_schema())
cluster.control_connection._reconnect_internal.assert_called_once()
cluster.scheduler.schedule.assert_called_once()

def test_connect_on_keyspace(self):
"""
Ensure clusters that connect on a keyspace, do
Expand Down
Loading