Skip to content

Commit a4d6546

Browse files
committed
Make schema agreement waiting code renew connection on each iteration
When schema agreement is started it could happen that control connection is getting disconnected/reconnected, when it happens schema agreement code used to use disconnected connection to run all the queries. As result, it could lead to schema agreement timeout, even if all nodes got schema updated long time ago. This commit updates connection on every iteration and makes it iterate when underlying connection is closed
1 parent 2354c82 commit a4d6546

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

cassandra/cluster.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42204220
if self._is_shutdown:
42214221
return
42224222

4223-
if not connection:
4224-
connection = self._connection
4223+
current_connection = connection or self._connection
42254224

42264225
if preloaded_results:
42274226
log.debug("[control connection] Attempting to use preloaded results for schema agreement")
42284227

42294228
peers_result = preloaded_results[0]
42304229
local_result = preloaded_results[1]
4231-
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
4230+
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
42324231
if schema_mismatches is None:
42334232
return True
42344233

@@ -4237,16 +4236,19 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42374236
elapsed = 0
42384237
cl = ConsistencyLevel.ONE
42394238
schema_mismatches = None
4240-
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
4239+
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection)
4240+
error_signaled = False
42414241

42424242
while elapsed < total_timeout:
4243+
current_connection = connection or self._connection
4244+
42434245
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
42444246
consistency_level=cl)
42454247
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
42464248
consistency_level=cl)
42474249
try:
42484250
timeout = min(self._timeout, total_timeout - elapsed)
4249-
peers_result, local_result = connection.wait_for_responses(
4251+
peers_result, local_result = current_connection.wait_for_responses(
42504252
peers_query, local_query, timeout=timeout)
42514253
except OperationTimedOut as timeout:
42524254
log.debug("[control connection] Timed out waiting for "
@@ -4257,10 +4259,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42574259
if self._is_shutdown:
42584260
log.debug("[control connection] Aborting wait for schema match due to shutdown")
42594261
return None
4260-
else:
4261-
raise
4262+
elif not error_signaled:
4263+
self._signal_error()
4264+
error_signaled = True
4265+
continue
42624266

4263-
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
4267+
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
42644268
if schema_mismatches is None:
42654269
return True
42664270

@@ -4269,7 +4273,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
42694273
elapsed = self._time.time() - start
42704274

42714275
log.warning("Node %s is reporting a schema disagreement: %s",
4272-
connection.endpoint, schema_mismatches)
4276+
current_connection.endpoint, schema_mismatches)
42734277
return False
42744278

42754279
def _get_schema_mismatches(self, peers_result, local_result, local_address):

0 commit comments

Comments
 (0)