diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 17e113e7a..6077fc270 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception): pass -class _ControlReconnectionHandler(_ReconnectionHandler): - """ - Internal - """ - - def __init__(self, control_connection, *args, **kwargs): - _ReconnectionHandler.__init__(self, *args, **kwargs) - self.control_connection = weakref.proxy(control_connection) - - def try_reconnect(self): - return self.control_connection._reconnect_internal() - - def on_reconnection(self, connection): - self.control_connection._set_new_connection(connection) - - def on_exception(self, exc, next_delay): - # TODO only overridden to add logging, so add logging - if isinstance(exc, AuthenticationFailed): - return False - else: - log.debug("Error trying to reconnect control connection: %r", exc) - return True - - def _watch_callback(obj_weakref, method_name, *args, **kwargs): """ A callback handler for the ControlConnection that tolerates @@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout, self._reconnection_handler = None self._reconnection_lock = RLock() + self._reconnection_pending = False self._event_schedule_times = {} @@ -3695,6 +3672,8 @@ def _connect_host_in_lbp(self): ) for host in lbp.make_query_plan(): + if self._is_shutdown: + break try: return (self._try_connect(host), None) except ConnectionException as exc: @@ -3818,44 +3797,47 @@ def reconnect(self): if self._is_shutdown: return + with self._reconnection_lock: + if self._reconnection_pending: + return + self._reconnection_pending = True + self._submit(self._reconnect) - def _reconnect(self): + def _reconnect(self, schedule = None): log.debug("[control connection] Attempting to reconnect") + if self._is_shutdown: + return + try: self._set_new_connection(self._reconnect_internal()) + self._reconnection_pending = False + return except NoHostAvailable: - # make a retry schedule (which includes backoff) - schedule = self._cluster.reconnection_policy.new_schedule() + log.debug("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt") + except Exception as ex: + log.debug("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s", ex) - with self._reconnection_lock: + if schedule is None: + schedule = self._cluster.reconnection_policy.new_schedule() - # cancel existing reconnection attempts - if self._reconnection_handler: - self._reconnection_handler.cancel() + try: + next_delay = next(schedule) + except StopIteration: + # the schedule has been exhausted + schedule = self._cluster.reconnection_policy.new_schedule() + try: + next_delay = next(schedule) + except StopIteration: + next_delay = 0 - # when a connection is successfully made, _set_new_connection - # will be called with the new connection and then our - # _reconnection_handler will be cleared out - self._reconnection_handler = _ControlReconnectionHandler( - self, self._cluster.scheduler, schedule, - self._get_and_set_reconnection_handler, - new_handler=None) - self._reconnection_handler.start() - except Exception: - log.debug("[control connection] error reconnecting", exc_info=True) - raise + if self._is_shutdown: + return - def _get_and_set_reconnection_handler(self, new_handler): - """ - Called by the _ControlReconnectionHandler when a new connection - is successfully created. Clears out the _reconnection_handler on - this ControlConnection. - """ - with self._reconnection_lock: - old = self._reconnection_handler - self._reconnection_handler = new_handler - return old + if next_delay == 0: + self._submit(self._reconnect) + else: + self._cluster.scheduler.schedule(next_delay, partial(self._reconnect, schedule)) def _submit(self, *args, **kwargs): try: @@ -3866,11 +3848,6 @@ def _submit(self, *args, **kwargs): return None def shutdown(self): - # stop trying to reconnect (if we are) - with self._reconnection_lock: - if self._reconnection_handler: - self._reconnection_handler.cancel() - with self._lock: if self._is_shutdown: return diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index cdfc7c1b8..e57b5387f 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -326,6 +326,61 @@ def test_invalid_protocol_negotation(self): cluster.connect() cluster.shutdown() + def test_control_connection_reconnect(self): + """ + Ensure clusters that connect on a keyspace, do + """ + cassandra.cluster.log.setLevel(logging.DEBUG) + + cluster = TestCluster() + _ = cluster.connect() + + cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal) + + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + + while cluster.control_connection._reconnection_pending: + time.sleep(0.1) + + self.assertFalse(cluster.control_connection._connection.is_closed) + self.assertFalse(cluster.control_connection._connection.is_defunct) + self.assertTrue(cluster.control_connection.refresh_schema()) + cluster.control_connection._reconnect_internal.assert_called_once() + + def test_control_connection_reconnect_rescheduled(self): + """ + Ensure clusters that connect on a keyspace, do + """ + cassandra.cluster.log.setLevel(logging.DEBUG) + + cluster = TestCluster() + _ = cluster.connect() + + original_reconnect_internal = cluster.control_connection._reconnect_internal + def _throw(*args): + cluster.control_connection._reconnect_internal = Mock(wraps=original_reconnect_internal) + raise NoHostAvailable("Unable to connect to any servers") + + cluster.scheduler.schedule = Mock(wraps=cluster.scheduler.schedule) + cluster.control_connection._reconnect_internal = _throw + + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + + while cluster.control_connection._reconnection_pending: + time.sleep(0.1) + + self.assertFalse(cluster.control_connection._connection.is_closed) + self.assertFalse(cluster.control_connection._connection.is_defunct) + self.assertTrue(cluster.control_connection.refresh_schema()) + cluster.control_connection._reconnect_internal.assert_called_once() + cluster.scheduler.schedule.assert_called_once() + def test_connect_on_keyspace(self): """ Ensure clusters that connect on a keyspace, do