Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait schema agreement #461

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
if self._is_shutdown:
return

if not connection:
connection = self._connection
current_connection = connection or self._connection

if preloaded_results:
log.debug("[control connection] Attempting to use preloaded results for schema agreement")

peers_result = preloaded_results[0]
local_result = preloaded_results[1]
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
if schema_mismatches is None:
return True

Expand All @@ -4237,16 +4236,27 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
elapsed = 0
cl = ConsistencyLevel.ONE
schema_mismatches = None
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, current_connection)
error_signaled = False

while elapsed < total_timeout:
if current_connection != connection or self._connection:
current_connection = connection or self._connection
error_signaled = False

if current_connection.is_defunct or current_connection.is_closed:
log.debug("[control connection] connection is closed, wait and trying again")
self._time.sleep(0.2)
elapsed = self._time.time() - start
continue

peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
consistency_level=cl)
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
consistency_level=cl)
try:
timeout = min(self._timeout, total_timeout - elapsed)
peers_result, local_result = connection.wait_for_responses(
peers_result, local_result = current_connection.wait_for_responses(
peers_query, local_query, timeout=timeout)
except OperationTimedOut as timeout:
log.debug("[control connection] Timed out waiting for "
Expand All @@ -4257,10 +4267,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
if self._is_shutdown:
log.debug("[control connection] Aborting wait for schema match due to shutdown")
return None
else:
raise
elif not error_signaled:
self._signal_error()
error_signaled = True
continue

schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, current_connection.endpoint)
if schema_mismatches is None:
return True

Comment on lines 4268 to 4278

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some problems with that approach.

DDL Requests

Schema agreement is not always done on control connection. After SCHEMA_CHANGE response to a request we perform schema agreement wait on the connection that the request was sent to.
If this connection becomes broken during that:

  • old code would raise an error immediately, which is perfectly reasonable in this case
  • new code would trigger control connection reconnection (for no reason - control connection may be perfectly fine), and then keep trying sending requests on defunct connection until timeout. This seems not optimal.

Multiple failures

This fix only guards us from a single failure, because of the error_signaled guard. If node X (with control connection) goes down, we signal it, connect CC to node Y, which then goes down, we will not call signal_error again and just keep trying the defunct connection.

OTOH getting rid of error_signaled is not a good idea - it could result in reconnection storm / loop.

I am not sure how to address this. Could you check how Java Driver approaches this? I'm asking about Java and not Rust because Rust handle schema agreement very differently so it is not applicable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some problems with that approach.

DDL Requests

Schema agreement is not always done on control connection. After SCHEMA_CHANGE response to a request we perform schema agreement wait on the connection that the request was sent to. If this connection becomes broken during that:

  • old code would raise an error immediately, which is perfectly reasonable in this case

I disagree here, not is not reasonable, statement was executed, driver received response, the fact that connection become dead should not impact the process.
If you throw same error that schema agreement logic used, for API user will not be able to distinct between schema agreement exception and statement exception and therefore will not be able to handle it properly.
Best behavior here would be to keep trying to check on schema agreement on any live connection available.

  • new code would trigger control connection reconnection (for no reason - control connection may be perfectly fine), and then keep trying sending requests on defunct connection until timeout. This seems not optimal.

True, we better fix that.

Multiple failures

This fix only guards us from a single failure, because of the error_signaled guard. If node X (with control connection) goes down, we signal it, connect CC to node Y, which then goes down, we will not call signal_error again and just keep trying the defunct connection.

OTOH getting rid of error_signaled is not a good idea - it could result in reconnection storm / loop.

I am not sure how to address this. Could you check how Java Driver approaches this? I'm asking about Java and not Rust because Rust handle schema agreement very differently so it is not applicable.

On my book the only proper way to address these issues is to make code iterate over available connections.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree here, not is not reasonable, statement was executed, driver received response, the fact that connection become dead should not impact the process.
If you throw same error that schema agreement logic used, for API user will not be able to distinct between schema agreement exception and statement exception and therefore will not be able to handle it properly.
Best behavior here would be to keep trying to check on schema agreement on any live connection available.

The point is to provide the following:

  1. User executes a DDL request (let's say it creates a table)
  2. It completes successfully (= no exception is thrown by the driver)
  3. If so, user can execute a request using this new table, and it won't return an error about the table being unknown.

To guarantee this we have to await schema agreement after issuing DDL.
More specifically: we have to await schema agreement against the same node we issued DDL against. Why? If we check schema agreement on other node it is possible that it does not know the new schema yet, so the schema agreement will be falsely successful, violating the guarantee.

This means we can try on other connections, but it has to be against the same node.
If we can't complete schema agreement against this node, we have to throw an exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lorak-mmk , Please correct me if I am wrong, schema changes are done with QUORUM consistency level. Which means that if driver check QUORUM number of nodes on schema agreement and succeed it would be enough to ensure that whole cluster has same schema in given curcumstances.

Expand All @@ -4269,7 +4281,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
elapsed = self._time.time() - start

log.warning("Node %s is reporting a schema disagreement: %s",
connection.endpoint, schema_mismatches)
current_connection.endpoint, schema_mismatches)
return False

def _get_schema_mismatches(self, peers_result, local_result, local_address):
Expand Down
Loading