From 7b287a81c6eefd0819fc2cd8594d3167e1e7d7ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Tue, 26 Sep 2023 21:03:20 +0200 Subject: [PATCH 1/2] Fix wait_for_schema_agreement deadlock Fixes https://github.com/scylladb/python-driver/issues/168 Fix works by extracting part of on_down that marks host as down out of the executor - so it does not need to wait for free thread. When host is marked as down, wait_for_schema_agreement can finish, which in turn enables rest of on_down (the part that still runs on executor) to be executed. --- cassandra/cluster.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 31ecd15b6f..4476bbb0e3 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2003,6 +2003,17 @@ def _start_reconnector(self, host, is_host_addition): reconnector.start() @run_in_executor + def on_down_potentially_blocking(self, host, is_host_addition): + self.profile_manager.on_down(host) + self.control_connection.on_down(host) + for session in tuple(self.sessions): + session.on_down(host) + + for listener in self.listeners: + listener.on_down(host) + + self._start_reconnector(host, is_host_addition) + def on_down(self, host, is_host_addition, expect_host_to_be_down=False): """ Intended for internal use only. @@ -2028,18 +2039,9 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False): host.set_down() if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting(): return - log.warning("Host %s has been marked down", host) - self.profile_manager.on_down(host) - self.control_connection.on_down(host) - for session in tuple(self.sessions): - session.on_down(host) - - for listener in self.listeners: - listener.on_down(host) - - self._start_reconnector(host, is_host_addition) + self.on_down_potentially_blocking(host, is_host_addition) def on_add(self, host, refresh_nodes=True): if self.is_shutdown: From 01383bc7f1e725ae0a087616cb3cdf0e6c69004d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Wed, 27 Sep 2023 14:29:29 +0200 Subject: [PATCH 2/2] Add regression test for schema deadlock Regression test for deadlock when performing schema change right after killing a node: https://github.com/scylladb/python-driver/issues/168 --- ..._concurrent_schema_change_and_node_kill.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 tests/integration/standard/test_concurrent_schema_change_and_node_kill.py diff --git a/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py b/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py new file mode 100644 index 0000000000..aeda381c0d --- /dev/null +++ b/tests/integration/standard/test_concurrent_schema_change_and_node_kill.py @@ -0,0 +1,36 @@ +import os +import logging +import unittest + +from tests.integration import use_cluster, get_node, local, TestCluster + +LOGGER = logging.getLogger(__name__) + + +def setup_module(): + use_cluster('test_concurrent_schema_change_and_node_kill', [3], start=True) + +@local +class TestConcurrentSchemaChangeAndNodeKill(unittest.TestCase): + @classmethod + def setup_class(cls): + cls.cluster = TestCluster(max_schema_agreement_wait=120) + cls.session = cls.cluster.connect() + + @classmethod + def teardown_class(cls): + cls.cluster.shutdown() + + def test_schema_change_after_node_kill(self): + node2 = get_node(2) + self.session.execute( + "DROP KEYSPACE IF EXISTS ks_deadlock;") + self.session.execute( + "CREATE KEYSPACE IF NOT EXISTS ks_deadlock " + "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2' };") + self.session.set_keyspace('ks_deadlock') + self.session.execute("CREATE TABLE IF NOT EXISTS some_table(k int, c int, v int, PRIMARY KEY (k, v));") + self.session.execute("INSERT INTO some_table (k, c, v) VALUES (1, 2, 3);") + node2.stop(wait=False, gently=False) + self.session.execute("ALTER TABLE some_table ADD v2 int;", timeout=180) + print(self.session.execute("SELECT * FROM some_table WHERE k = 1;").all())