Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import socket
import subprocess
from time import sleep
from typing import Optional

import ops
from charms.data_platform_libs.v0.data_models import TypedCharmBase
Expand Down Expand Up @@ -400,7 +399,7 @@ def _on_database_storage_detaching(self, _) -> None:
if not self._mysql.is_instance_in_cluster(self.unit_label):
return

def _get_leader_unit() -> Optional[Unit]:
def _get_leader_unit() -> Unit | None:
"""Get the leader unit."""
for unit in self.peers.units:
if self.peers.data[unit]["leader"] == "true":
Expand Down Expand Up @@ -630,7 +629,7 @@ def _on_cos_agent_relation_broken(self, _: RelationBrokenEvent) -> None:
# =======================

@property
def tracing_endpoint(self) -> Optional[str]:
def tracing_endpoint(self) -> str | None:
"""Otlp http endpoint for charm instrumentation."""
return self.tracing_endpoint_config

Expand Down Expand Up @@ -665,15 +664,15 @@ def unit_fqdn(self) -> str:
return socket.getfqdn()

@property
def restart_peers(self) -> Optional[ops.Relation]:
def restart_peers(self) -> ops.Relation | None:
"""Retrieve the peer relation."""
return self.model.get_relation("restart")

def is_unit_busy(self) -> bool:
"""Returns whether the unit is in blocked state and should not run any operations."""
return self.unit_peer_data.get("member-state") == "waiting"

def get_unit_hostname(self, unit_name: Optional[str] = None) -> str:
def get_unit_hostname(self, unit_name: str | None = None) -> str:
"""Get the hostname of the unit."""
if unit_name:
unit = self.model.get_unit(unit_name)
Expand Down Expand Up @@ -876,7 +875,7 @@ def _is_unit_waiting_to_join_cluster(self) -> bool:
and self.cluster_initialized
)

def _get_primary_from_online_peer(self) -> Optional[str]:
def _get_primary_from_online_peer(self) -> str | None:
"""Get the primary address from an online peer."""
for unit in self.peers.units:
if self.peers.data[unit].get("member-state") == "online":
Expand Down
27 changes: 13 additions & 14 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import os
import re
from typing import Optional

from charms.data_platform_libs.v0.data_models import BaseConfigModel
from charms.mysql.v0.mysql import MAX_CONNECTIONS_FLOOR
Expand Down Expand Up @@ -44,7 +43,7 @@ def filter_static_keys(self, keys: set) -> set:
return keys - self.static_config

@property
def custom_config(self) -> Optional[dict]:
def custom_config(self) -> dict | None:
"""Return current custom config dict."""
if not os.path.exists(self.config_file_path):
return None
Expand All @@ -61,12 +60,12 @@ class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

profile: str
cluster_name: Optional[str]
cluster_set_name: Optional[str]
profile_limit_memory: Optional[int]
mysql_interface_user: Optional[str]
mysql_interface_database: Optional[str]
experimental_max_connections: Optional[int]
cluster_name: str | None
cluster_set_name: str | None
profile_limit_memory: int | None
mysql_interface_user: str | None
mysql_interface_database: str | None
experimental_max_connections: int | None
binlog_retention_days: int
plugin_audit_enabled: bool
plugin_audit_strategy: str
Expand All @@ -75,7 +74,7 @@ class CharmConfig(BaseConfigModel):

@validator("profile")
@classmethod
def profile_values(cls, value: str) -> Optional[str]:
def profile_values(cls, value: str) -> str | None:
"""Check profile config option is one of `testing` or `production`."""
if value not in ["testing", "production"]:
raise ValueError("Value not one of 'testing' or 'production'")
Expand All @@ -84,7 +83,7 @@ def profile_values(cls, value: str) -> Optional[str]:

@validator("cluster_name", "cluster_set_name")
@classmethod
def cluster_name_validator(cls, value: str) -> Optional[str]:
def cluster_name_validator(cls, value: str) -> str | None:
"""Check for valid cluster, cluster-set name.

Limited to 63 characters, and must start with a letter and
Expand All @@ -106,7 +105,7 @@ def cluster_name_validator(cls, value: str) -> Optional[str]:

@validator("profile_limit_memory")
@classmethod
def profile_limit_memory_validator(cls, value: int) -> Optional[int]:
def profile_limit_memory_validator(cls, value: int) -> int | None:
"""Check profile limit memory."""
if value < 600:
raise ValueError("MySQL Charm requires at least 600MB for bootstrapping")
Expand All @@ -117,7 +116,7 @@ def profile_limit_memory_validator(cls, value: int) -> Optional[int]:

@validator("experimental_max_connections")
@classmethod
def experimental_max_connections_validator(cls, value: int) -> Optional[int]:
def experimental_max_connections_validator(cls, value: int) -> int | None:
"""Check experimental max connections."""
if value < MAX_CONNECTIONS_FLOOR:
raise ValueError(
Expand All @@ -138,7 +137,7 @@ def binlog_retention_days_validator(cls, value: int) -> int:

@validator("plugin_audit_strategy")
@classmethod
def plugin_audit_strategy_validator(cls, value: str) -> Optional[str]:
def plugin_audit_strategy_validator(cls, value: str) -> str | None:
"""Check profile config option is one of `testing` or `production`."""
if value not in ["async", "semi-async"]:
raise ValueError("Value not one of 'async' or 'semi-async'")
Expand All @@ -147,7 +146,7 @@ def plugin_audit_strategy_validator(cls, value: str) -> Optional[str]:

@validator("logs_audit_policy")
@classmethod
def logs_audit_policy_validator(cls, value: str) -> Optional[str]:
def logs_audit_policy_validator(cls, value: str) -> str | None:
"""Check values for audit log policy."""
valid_values = ["all", "logins", "queries"]
if value not in valid_values:
Expand Down
30 changes: 15 additions & 15 deletions src/mysql_vm_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import subprocess
import tempfile
import typing
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Iterable

import jinja2
import pexpect
Expand Down Expand Up @@ -436,8 +436,8 @@ def wait_until_mysql_connection(self, check_port: bool = True) -> None:
def execute_backup_commands( # type: ignore
self,
s3_directory: str,
s3_parameters: Dict[str, str],
) -> Tuple[str, str]:
s3_parameters: dict[str, str],
) -> tuple[str, str]:
"""Executes commands to create a backup."""
return super().execute_backup_commands(
s3_directory,
Expand Down Expand Up @@ -465,13 +465,13 @@ def delete_temp_backup_directory( # type: ignore
def retrieve_backup_with_xbcloud( # type: ignore
self,
backup_id: str,
s3_parameters: Dict[str, str],
s3_parameters: dict[str, str],
temp_restore_directory: str = CHARMED_MYSQL_COMMON_DIRECTORY,
xbcloud_location: str = CHARMED_MYSQL_XBCLOUD_LOCATION,
xbstream_location: str = CHARMED_MYSQL_XBSTREAM_LOCATION,
user=ROOT_SYSTEM_USER,
group=ROOT_SYSTEM_USER,
) -> Tuple[str, str, str]:
) -> tuple[str, str, str]:
"""Retrieve the provided backup with xbcloud."""
return super().retrieve_backup_with_xbcloud(
backup_id,
Expand All @@ -483,7 +483,7 @@ def retrieve_backup_with_xbcloud( # type: ignore
group,
)

def prepare_backup_for_restore(self, backup_location: str) -> Tuple[str, str]:
def prepare_backup_for_restore(self, backup_location: str) -> tuple[str, str]:
"""Prepare the download backup for restore with xtrabackup --prepare."""
return super().prepare_backup_for_restore(
backup_location,
Expand All @@ -504,7 +504,7 @@ def empty_data_files(self) -> None:
def restore_backup(
self,
backup_location: str,
) -> Tuple[str, str]:
) -> tuple[str, str]:
"""Restore the provided prepared backup."""
# TODO: remove workaround for changing permissions and ownership of data
# files once restore backup commands can be run with snap_daemon user
Expand Down Expand Up @@ -572,13 +572,13 @@ def delete_temp_restore_directory(self) -> None:

def _execute_commands(
self,
commands: List[str],
commands: list[str],
bash: bool = False,
user: str = None,
group: str = None,
env_extra: Dict = {},
stream_output: Optional[str] = None,
) -> Tuple[str, str]:
env_extra: dict = {},
stream_output: str | None = None,
) -> tuple[str, str]:
"""Execute commands on the server where mysql is running.

Args:
Expand Down Expand Up @@ -802,10 +802,10 @@ def _run_mysqlsh_script(

def _run_mysqlcli_script(
self,
script: Union[Tuple[Any, ...], List[Any]],
script: tuple[Any, ...] | list[Any],
user: str = "root",
password: Optional[str] = None,
timeout: Optional[int] = None,
password: str | None = None,
timeout: int | None = None,
exception_as_warning: bool = False,
log_errors: bool = True,
) -> list:
Expand Down Expand Up @@ -1001,7 +1001,7 @@ def write_content_to_file(
os.chmod(path, mode=permission)

@staticmethod
def fetch_error_log() -> Optional[str]:
def fetch_error_log() -> str | None:
"""Fetch the mysqld error log."""
if os.path.exists(f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/error.log"):
# can be empty if just rotated
Expand Down
7 changes: 3 additions & 4 deletions src/relations/db_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import logging
import typing
from collections import namedtuple
from typing import Dict, List, Set, Tuple

from charms.mysql.v0.mysql import (
MySQLCheckUserExistenceError,
Expand Down Expand Up @@ -70,7 +69,7 @@ def _get_or_set_password_in_peer_databag(self, username: str) -> str:

def _get_requested_users_from_relation_databag(
self, db_router_databag: RelationDataContent
) -> List[RequestedUser]:
) -> list[RequestedUser]:
"""Retrieve requested user information from the db-router relation databag.

Args:
Expand Down Expand Up @@ -109,8 +108,8 @@ def _get_requested_users_from_relation_databag(
return requested_users

def _create_requested_users(
self, requested_users: List[RequestedUser], user_unit_name: str
) -> Tuple[Dict[str, str], Set[str]]:
self, requested_users: list[RequestedUser], user_unit_name: str
) -> tuple[dict[str, str], set[str]]:
"""Create the requested users and said user scoped databases.

Args:
Expand Down
Loading