Skip to content

Commit a45e3b4

Browse files
draft
1 parent 9e90c84 commit a45e3b4

File tree

7 files changed

+803
-36
lines changed

7 files changed

+803
-36
lines changed

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 Canonical Ltd.
3+
# See LICENSE file for licensing details.
4+
5+
from collections.abc import Callable
6+
7+
import jubilant
8+
from jubilant import Juju
9+
from jubilant.statustypes import Status, UnitStatus
10+
11+
from constants import SERVER_CONFIG_USERNAME
12+
13+
from ..helpers import execute_queries_on_unit
14+
15+
MINUTE_SECS = 60
16+
17+
JujuModelStatusFunc = Callable[[Status], bool]
18+
JujuAppsStatusFunc = Callable[[Status, str], bool]
19+
20+
21+
def get_app_leader(juju: Juju, app_name: str) -> str:
22+
"""Get the leader unit for the given application."""
23+
model_status = juju.status()
24+
app_status = model_status.apps[app_name]
25+
for name, status in app_status.units.items():
26+
if status.leader:
27+
return name
28+
29+
raise Exception("No leader unit found")
30+
31+
32+
def get_app_name(juju: Juju, charm_name: str) -> str | None:
33+
"""Get the application name for the given charm."""
34+
model_status = juju.status()
35+
app_statuses = model_status.apps
36+
for name, status in app_statuses.items():
37+
if status.charm_name == charm_name:
38+
return name
39+
40+
raise Exception("No application name found")
41+
42+
43+
def get_app_units(juju: Juju, app_name: str) -> dict[str, UnitStatus]:
44+
"""Get the units for the given application."""
45+
model_status = juju.status()
46+
app_status = model_status.apps[app_name]
47+
return app_status.units
48+
49+
50+
def get_unit_ip(juju: Juju, app_name: str, unit_name: str) -> str:
51+
"""Get the application unit IP."""
52+
model_status = juju.status()
53+
app_status = model_status.apps[app_name]
54+
for name, status in app_status.units.items():
55+
if name == unit_name:
56+
return status.public_address
57+
58+
raise Exception("No application unit found")
59+
60+
61+
def get_mysql_cluster_status(juju: Juju, unit: str, cluster_set: bool | None = False) -> dict:
62+
"""Get the cluster status by running the get-cluster-status action.
63+
64+
Args:
65+
juju: The juju instance to use.
66+
unit: The unit on which to execute the action on
67+
cluster_set: Whether to get the cluster-set instead
68+
69+
Returns:
70+
A dictionary representing the cluster status
71+
"""
72+
task = juju.run(
73+
unit=unit,
74+
action="get-cluster-status",
75+
params={"cluster-set": bool(cluster_set)},
76+
wait=5 * MINUTE_SECS,
77+
)
78+
task.raise_on_failure()
79+
80+
return task.results.get("status", {})
81+
82+
83+
def get_mysql_primary_unit(juju: Juju, app_name: str) -> str:
84+
"""Get the current primary node of the cluster."""
85+
mysql_primary = get_app_leader(juju, app_name)
86+
mysql_cluster_status = get_mysql_cluster_status(juju, mysql_primary)
87+
mysql_cluster_topology = mysql_cluster_status["defaultreplicaset"]["topology"]
88+
89+
for label, value in mysql_cluster_topology.items():
90+
if value["memberrole"] == "primary":
91+
return label.replace("-", "/")
92+
93+
raise Exception("No MySQL primary node found")
94+
95+
96+
async def get_mysql_max_written_value(juju: Juju, app_name: str, unit_name: str) -> int:
97+
"""Retrieve the max written value in the MySQL database.
98+
99+
Args:
100+
juju: The Juju model.
101+
app_name: The application name.
102+
unit_name: The unit name.
103+
"""
104+
credentials_task = juju.run(
105+
unit=unit_name,
106+
action="get-password",
107+
params={"username": SERVER_CONFIG_USERNAME},
108+
)
109+
credentials_task.raise_on_failure()
110+
111+
output = await execute_queries_on_unit(
112+
get_unit_ip(juju, app_name, unit_name),
113+
credentials_task.results["username"],
114+
credentials_task.results["password"],
115+
["SELECT MAX(number) FROM `continuous_writes`.`data`;"],
116+
)
117+
return output[0]
118+
119+
120+
def wait_for_status(jubilant_status_func: JujuAppsStatusFunc, *apps: str) -> JujuModelStatusFunc:
121+
"""Waits for Juju agents to be idle, and for applications to reach a certain status.
122+
123+
Args:
124+
jubilant_status_func: The Juju apps status function to wait for.
125+
apps: The applications to wait for.
126+
127+
Returns:
128+
Juju model status function.
129+
"""
130+
return lambda status: all((
131+
jubilant.all_agents_idle(status, *apps),
132+
jubilant_status_func(status, *apps),
133+
))

0 commit comments

Comments
 (0)