Skip to content

Commit

Permalink
feature(raft): Rolling restart raft topology coordinator node
Browse files Browse the repository at this point in the history
When current topology coordinator is not available, new round
of election of new coordinator should be started and new
raft topology coordinator node will be elected.

Added new function to search current coordinator node
Added new nemesis to Rolling restart of elected coordinator node
  • Loading branch information
aleksbykov authored and soyacz committed Nov 27, 2024
1 parent 1546416 commit 5aaf574
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 6 deletions.
4 changes: 4 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ class SstablesNotFound(Exception):

class CapacityReservationError(Exception):
pass


class RaftTopologyCoordinatorNotFound(Exception):
"""Raise exception if no host id for raft topology was not found in group0 history"""
51 changes: 50 additions & 1 deletion sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
from sdcm.utils.toppartition_util import NewApiTopPartitionCmd, OldApiTopPartitionCmd
from sdcm.utils.version_utils import MethodVersionNotFound, scylla_versions, ComparableScyllaVersion
from sdcm.utils.raft import Group0MembersNotConsistentWithTokenRingMembersException, TopologyOperations
from sdcm.utils.raft.common import NodeBootstrapAbortManager
from sdcm.utils.raft.common import NodeBootstrapAbortManager, get_topology_coordinator_node
from sdcm.utils.issues import SkipPerIssues
from sdcm.wait import wait_for, wait_for_log_lines
from sdcm.exceptions import (
Expand Down Expand Up @@ -380,6 +380,13 @@ def publish_event(self, disrupt, status=True, data=None):
severity = Severity.NORMAL if status else Severity.ERROR
DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish()

def switch_target_node(self, node: BaseNode):
with NEMESIS_TARGET_SELECTION_LOCK:
self.target_node.running_nemesis = None
self.target_node = None
node.running_nemesis = self.current_disruption
self.target_node = node

def set_current_running_nemesis(self, node):
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption
Expand Down Expand Up @@ -5282,6 +5289,39 @@ def disrupt_grow_shrink_zero_nodes(self):
znode = random.choice([node for node in self.cluster.zero_nodes if node.dc_idx == self.target_node.dc_idx])
self.decommission_nodes(nodes=[znode])

@target_all_nodes
def disrupt_serial_restart_elected_topology_coordinator(self):
""" Serial restart of elected topology coordinator node,
should trigger new coordinator node election
"""
if not self.target_node.raft.is_consistent_topology_changes_enabled:
raise UnsupportedNemesis("Consistent topology changes feature is disabled")

self.use_nemesis_seed()
num_of_restarts = random.randint(1, len(self.cluster.nodes))
self.log.debug("Number of serial restart of topology coordinator: %s", num_of_restarts)
election_wait_timeout = random.choice([1, 5, 10, 15])
self.log.debug("Wait new topology coordinator election timeout: %s", election_wait_timeout)
for num_of_restart in range(num_of_restarts):
with self.run_nemesis(node_list=self.cluster.nodes, nemesis_label="search coordinator") as verification_node:
coordinator_node = get_topology_coordinator_node(verification_node)
if coordinator_node != self.target_node and coordinator_node.running_nemesis:
raise UnsupportedNemesis(
f"Coordinator node is busy with {coordinator_node.running_nemesis}, Coordinator node was restarted: {num_of_restart}")
elif coordinator_node != self.target_node:
self.switch_target_node(coordinator_node)
self.log.debug("Coordinator node: %s, %s", coordinator_node, coordinator_node.name)
self.target_node.stop_scylla()
self.log.debug("Wait random timeout %s to new coordinator will be elected", election_wait_timeout)
time.sleep(election_wait_timeout)
with self.run_nemesis(node_list=self.cluster.nodes,
nemesis_label="search coordinator") as verification_node:
new_coordinator_node = get_topology_coordinator_node(verification_node)
self.log.debug("New coordinator node: %s, %s", new_coordinator_node, new_coordinator_node.name)
self.target_node.start_scylla()
assert self.target_node != new_coordinator_node, \
f"New coordinator node was not elected while old one {coordinator_node.name} was stopped"


def disrupt_method_wrapper(method, is_exclusive=False): # pylint: disable=too-many-statements # noqa: PLR0915
"""
Expand Down Expand Up @@ -6828,3 +6868,12 @@ def __init__(self, *args, **kwargs):
self.use_all_nodes_as_target = True
self.build_list_of_disruptions_to_execute(nemesis_selector=['zero_node_changes'])
self.shuffle_list_of_disruptions()


class SerialRestartOfElectedTopologyCoordinatorNemesis(Nemesis):

disruptive = True
topology_changes = True

def disrupt(self):
self.disrupt_serial_restart_elected_topology_coordinator()
7 changes: 5 additions & 2 deletions sdcm/rest/storage_service_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

from fabric.runners import Result

from sdcm.cluster import BaseNode
from sdcm.rest.remote_curl_client import RemoteCurlClient


class StorageServiceClient(RemoteCurlClient):
def __init__(self, node: BaseNode):
def __init__(self, node: "BaseNode"): # noqa F821
super().__init__(host="localhost:10000", endpoint="storage_service", node=node)

def compact_ks_cf(self, keyspace: str, cf: Optional[str] = None) -> Result:
Expand Down Expand Up @@ -50,3 +49,7 @@ def upgrade_sstables(self, keyspace: str = "ks", cf: Optional[str] = None):
path = f"keyspace_upgrade_sstables/{keyspace}"

return self.run_remoter_curl(method="GET", path=path, params=params)

def get_local_hostid(self):
path = "hostid/local"
return self.run_remoter_curl(method="GET", path=path, params=None, retry=3)
3 changes: 2 additions & 1 deletion sdcm/utils/raft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sdcm.utils.health_checker import HealthEventsGenerator
from sdcm.wait import wait_for


LOGGER = logging.getLogger(__name__)
RAFT_DEFAULT_SCYLLA_VERSION = "5.5.0-dev"

Expand Down Expand Up @@ -443,4 +444,4 @@ def get_node_status_from_system_by(verification_node: "BaseNode", *, ip_address:
__all__ = ["get_raft_mode",
"get_node_status_from_system_by",
"Group0MembersNotConsistentWithTokenRingMembersException",
]
"TopologyOperations"]
27 changes: 27 additions & 0 deletions sdcm/utils/raft/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import contextlib
import time
import traceback
import re

from typing import Iterable, Callable
from functools import partial
from json import loads

from sdcm.sct_events.decorators import raise_event_on_failure
from sdcm.exceptions import BootstrapStreamErrorFailure, ExitByEventError
Expand All @@ -15,7 +17,11 @@
from sdcm.utils.common import ParallelObject
from sdcm.utils.raft import get_node_status_from_system_by
from sdcm.cluster import BaseMonitorSet, NodeSetupFailed, BaseScyllaCluster, BaseNode
from sdcm.exceptions import RaftTopologyCoordinatorNotFound
from sdcm.rest.storage_service_client import StorageServiceClient

LOGGER = logging.getLogger(__name__)
UUID_REGEX = re.compile(r"([0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})")


class RaftException(Exception):
Expand Down Expand Up @@ -44,6 +50,27 @@ def validate_raft_on_nodes(nodes: list[BaseNode]) -> None:
LOGGER.debug("Raft is ready!")


def get_topology_coordinator_node(node: BaseNode) -> BaseNode:
active_nodes: list[BaseNode] = node.parent_cluster.get_nodes_up_and_normal(node)
stm = "select description from system.group0_history where key = 'history' and \
description LIKE 'Starting new topology coordinator%' ALLOW FILTERING;"
with node.parent_cluster.cql_connection_patient(node) as session:
result = list(session.execute(stm))
coordinators_ids = []
for row in result:
if match := UUID_REGEX.search(row.description):
coordinators_ids.append(match.group(1))
if not coordinators_ids:
raise RaftTopologyCoordinatorNotFound("No host ids were found in raft group0 history")
LOGGER.debug("All coordinators history ids: %s", coordinators_ids)
for active_node in active_nodes:
node_hostid = loads(StorageServiceClient(active_node).get_local_hostid().stdout)
LOGGER.debug("Node %s host id is %s", active_node.name, node_hostid)
if node_hostid == coordinators_ids[0]:
return active_node
raise RaftTopologyCoordinatorNotFound(f"The node with host id {coordinators_ids[0]} was not found")


class NodeBootstrapAbortManager:
INSTANCE_START_TIMEOUT = 600
SUCCESS_BOOTSTRAP_TIMEOUT = 3600
Expand Down
3 changes: 2 additions & 1 deletion unit_tests/test_nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def expected_topology_changes_methods(self):
"disrupt_terminate_and_replace_node",
"disrupt_decommission_streaming_err",
"disrupt_remove_node_then_add_node",
"disrupt_bootstrap_streaming_error"]
"disrupt_bootstrap_streaming_error",
"disrupt_serial_restart_elected_topology_coordinator"]

@pytest.fixture(autouse=True)
def expected_schema_changes_methods(self):
Expand Down
2 changes: 1 addition & 1 deletion unit_tests/test_nemesis_sisyphus.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_list_all_available_nemesis(generate_file=True):
disruption_list, disruptions_dict, disruption_classes = sisyphus.get_list_of_disrupt_methods(
subclasses_list=subclasses, export_properties=True)

assert len(disruption_list) == 89
assert len(disruption_list) == 90

if generate_file:
with open(sct_abs_path('data_dir/nemesis.yml'), 'w', encoding="utf-8") as outfile1:
Expand Down

0 comments on commit 5aaf574

Please sign in to comment.