Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions lib/charms/mysql/v0/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def wait_until_mysql_connection(self) -> None:
# Increment this major API version when introducing breaking changes
LIBAPI = 0

LIBPATCH = 91
LIBPATCH = 92

UNIT_TEARDOWN_LOCKNAME = "unit-teardown"
UNIT_ADD_LOCKNAME = "unit-add"
Expand Down Expand Up @@ -2025,26 +2025,47 @@ def add_instance_to_cluster(
# always release the lock
self._release_lock(local_lock_instance, instance_unit_label, UNIT_ADD_LOCKNAME)

def rejoin_instance_to_cluster(self, *, unit_label: str, from_instance: str) -> None:
"""Rejoin an instance to the InnoDB cluster."""
def rejoin_instance_to_cluster(
self, *, unit_address: str, unit_label: str, from_instance: str
) -> None:
"""Rejoin an instance to the InnoDB cluster.

Args:
unit_address: The address of the unit to rejoin.
unit_label: The label of the unit to rejoin.
from_instance: The instance from which to rejoin the cluster.
"""
options = {"password": self.server_config_password}
commands = (
f"cluster = dba.get_cluster('{self.cluster_name}')",
f"cluster.rejoin_instance('{unit_label}')",
f"cluster.rejoin_instance('{self.instance_def(self.server_config_user, unit_address)}',"
f"{options})",
)

from_instance = from_instance or self.instance_address
if not self._acquire_lock(
from_instance,
unit_label,
UNIT_ADD_LOCKNAME,
):
raise MySQLLockAcquisitionError("Lock not acquired")

try:
logger.debug(f"Rejoining instance {unit_label} to cluster {self.cluster_name}")
logger.debug(f"Rejoining instance {unit_address} to cluster {self.cluster_name}")
self._run_mysqlsh_script(
"\n".join(commands),
user=self.server_config_user,
password=self.server_config_password,
host=self.instance_def(self.server_config_user, from_instance),
)
except MySQLClientError as e:
logger.error(f"Failed to rejoin instance {unit_label} to cluster {self.cluster_name}")
logger.error(
f"Failed to rejoin instance {unit_address} to cluster {self.cluster_name}"
)
raise MySQLRejoinInstanceToClusterError from e
finally:
# always release the lock
self._release_lock(from_instance, unit_label, UNIT_ADD_LOCKNAME)

def is_instance_configured_for_innodb(
self, instance_address: str, instance_unit_label: str
Expand Down Expand Up @@ -2892,17 +2913,19 @@ def force_quorum_from_instance(self) -> None:

Recovery for cases where majority loss put the cluster in defunct state.
"""
instance_definition = self.instance_def(self.server_config_user)
force_quorum_command = (
f"cluster = dba.get_cluster('{self.cluster_name}')",
"cluster.force_quorum_using_partition_of()",
f"cluster.force_quorum_using_partition_of('{self.server_config_user}@"
f"{instance_definition}','{self.server_config_password}')",
)

try:
self._run_mysqlsh_script(
"\n".join(force_quorum_command),
user=self.server_config_user,
password=self.server_config_password,
host=self.instance_def(self.server_config_user),
host=instance_definition,
)
except MySQLClientError as e:
logger.error("Failed to force quorum from instance")
Expand Down
21 changes: 15 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,18 @@ def _execute_manual_rejoin(self) -> None:
logger.warning("Instance does not have ONLINE peers. Cannot perform manual rejoin")
return

if self._mysql.are_locks_acquired(from_instance=cluster_primary):
logger.info("waiting: cluster lock is held")
return
# add random delay to mitigate collisions when multiple units are rejoining
# due the difference between the time we test for locks and acquire them
# Not used for cryptographic purpose
sleep(random.uniform(0, 1.5)) # noqa: S311
try:
self._mysql.rejoin_instance_to_cluster(
unit_label=self.unit_label, from_instance=cluster_primary
unit_address=self.unit_fqdn,
unit_label=self.unit_label,
from_instance=cluster_primary,
)
return
except MySQLRejoinInstanceToClusterError:
Expand Down Expand Up @@ -585,15 +594,15 @@ def _on_update_status(self, _) -> None: # noqa: C901
if not self._handle_non_online_instance_status(state):
return

if self.unit.is_leader():
if self.unit.is_leader() and state == "online":
try:
primary_address = self._mysql.get_cluster_primary_address()
except MySQLGetClusterPrimaryAddressError:
self.unit.status = MaintenanceStatus("Unable to query cluster primary")
return
primary_address = None

if not primary_address:
self.unit.status = MaintenanceStatus("Unable to find cluster primary")
logger.error("Cluster has no primary. Check cluster status on online units.")
self.app.status = MaintenanceStatus("Cluster has no primary.")
return

if "s3-block-message" in self.app_peer_data:
Expand Down Expand Up @@ -927,7 +936,7 @@ def join_unit_to_cluster(self) -> None:

# add random delay to mitigate collisions when multiple units are joining
# due the difference between the time we test for locks and acquire them
# Not used for cryptographic puropse
# Not used for cryptographic purpose
sleep(random.uniform(0, 1.5)) # noqa: S311

if self._mysql.are_locks_acquired(from_instance=lock_instance or cluster_primary):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def deploy_and_scale_application(ops_test: OpsTest) -> str:
num_units=1,
channel="latest/edge",
base="[email protected]",
config={"sleep_interval": "500"},
)

await ops_test.model.wait_for_idle(
Expand Down
29 changes: 24 additions & 5 deletions tests/integration/high_availability/test_primary_switchover.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from ..markers import juju3

logging.getLogger("jubilant.wait").setLevel(logging.WARNING)


@juju3
@pytest.mark.abort_on_fail
Expand Down Expand Up @@ -56,13 +58,30 @@ def test_cluster_failover_after_majority_loss(juju: Juju, highly_available_clust

logging.info(f"Unit selected for promotion: {unit_to_promote}")

logging.info("Rebooting all but one unit to simulate majority loss...")
for unit in [non_primary_units.pop(), primary_unit]:
machine_name = get_unit_machine(juju, app_name, unit)
run(["lxc", "restart", machine_name], check=True)
logging.info("Kill all but one unit to simulate majority loss...")
units_to_kill = [non_primary_units.pop(), primary_unit]
machine_name = []
for unit in units_to_kill:
machine_name.append(get_unit_machine(juju, app_name, unit))

run(["lxc", "restart", "--force", machine_name[0], machine_name[1]], check=True)

juju.model_config({"update-status-hook-interval": "45s"})
logging.info("Waiting to settle in error state")
juju.wait(
lambda status: status.apps[app_name].units[unit_to_promote].workload_status.current
== "active"
and status.apps[app_name].units[units_to_kill[0]].workload_status.message == "offline"
and status.apps[app_name].units[units_to_kill[1]].workload_status.message == "offline",
timeout=60 * 15,
delay=15,
)

failover_task = juju.run(
unit_to_promote, "promote-to-primary", {"scope": "unit", "force": True}
unit_to_promote,
"promote-to-primary",
{"scope": "unit", "force": True},
wait=600,
)

juju.model_config({"update-status-hook-interval": "15s"})
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def test_on_update(
_get_member_state.assert_called_once()
_reboot_from_complete_outage.assert_called_once()
_snap_service_operation.assert_called()
_get_cluster_primary_address.assert_called_once()
_get_cluster_primary_address.assert_not_called()

self.assertTrue(isinstance(self.harness.model.unit.status, MaintenanceStatus))
# test instance state = unreachable
Expand Down
Loading