diff --git a/src/charm.py b/src/charm.py index ebdcbae34f..291fe87681 100755 --- a/src/charm.py +++ b/src/charm.py @@ -15,7 +15,6 @@ import socket import subprocess from time import sleep -from typing import Optional import ops from charms.data_platform_libs.v0.data_models import TypedCharmBase @@ -400,7 +399,7 @@ def _on_database_storage_detaching(self, _) -> None: if not self._mysql.is_instance_in_cluster(self.unit_label): return - def _get_leader_unit() -> Optional[Unit]: + def _get_leader_unit() -> Unit | None: """Get the leader unit.""" for unit in self.peers.units: if self.peers.data[unit]["leader"] == "true": @@ -630,7 +629,7 @@ def _on_cos_agent_relation_broken(self, _: RelationBrokenEvent) -> None: # ======================= @property - def tracing_endpoint(self) -> Optional[str]: + def tracing_endpoint(self) -> str | None: """Otlp http endpoint for charm instrumentation.""" return self.tracing_endpoint_config @@ -665,7 +664,7 @@ def unit_fqdn(self) -> str: return socket.getfqdn() @property - def restart_peers(self) -> Optional[ops.Relation]: + def restart_peers(self) -> ops.Relation | None: """Retrieve the peer relation.""" return self.model.get_relation("restart") @@ -673,7 +672,7 @@ def is_unit_busy(self) -> bool: """Returns whether the unit is in blocked state and should not run any operations.""" return self.unit_peer_data.get("member-state") == "waiting" - def get_unit_hostname(self, unit_name: Optional[str] = None) -> str: + def get_unit_hostname(self, unit_name: str | None = None) -> str: """Get the hostname of the unit.""" if unit_name: unit = self.model.get_unit(unit_name) @@ -876,7 +875,7 @@ def _is_unit_waiting_to_join_cluster(self) -> bool: and self.cluster_initialized ) - def _get_primary_from_online_peer(self) -> Optional[str]: + def _get_primary_from_online_peer(self) -> str | None: """Get the primary address from an online peer.""" for unit in self.peers.units: if self.peers.data[unit].get("member-state") == "online": diff --git a/src/config.py b/src/config.py index 8177419a72..0f1a66ab7c 100644 --- a/src/config.py +++ b/src/config.py @@ -8,7 +8,6 @@ import logging import os import re -from typing import Optional from charms.data_platform_libs.v0.data_models import BaseConfigModel from charms.mysql.v0.mysql import MAX_CONNECTIONS_FLOOR @@ -44,7 +43,7 @@ def filter_static_keys(self, keys: set) -> set: return keys - self.static_config @property - def custom_config(self) -> Optional[dict]: + def custom_config(self) -> dict | None: """Return current custom config dict.""" if not os.path.exists(self.config_file_path): return None @@ -61,12 +60,12 @@ class CharmConfig(BaseConfigModel): """Manager for the structured configuration.""" profile: str - cluster_name: Optional[str] - cluster_set_name: Optional[str] - profile_limit_memory: Optional[int] - mysql_interface_user: Optional[str] - mysql_interface_database: Optional[str] - experimental_max_connections: Optional[int] + cluster_name: str | None + cluster_set_name: str | None + profile_limit_memory: int | None + mysql_interface_user: str | None + mysql_interface_database: str | None + experimental_max_connections: int | None binlog_retention_days: int plugin_audit_enabled: bool plugin_audit_strategy: str @@ -75,7 +74,7 @@ class CharmConfig(BaseConfigModel): @validator("profile") @classmethod - def profile_values(cls, value: str) -> Optional[str]: + def profile_values(cls, value: str) -> str | None: """Check profile config option is one of `testing` or `production`.""" if value not in ["testing", "production"]: raise ValueError("Value not one of 'testing' or 'production'") @@ -84,7 +83,7 @@ def profile_values(cls, value: str) -> Optional[str]: @validator("cluster_name", "cluster_set_name") @classmethod - def cluster_name_validator(cls, value: str) -> Optional[str]: + def cluster_name_validator(cls, value: str) -> str | None: """Check for valid cluster, cluster-set name. Limited to 63 characters, and must start with a letter and @@ -106,7 +105,7 @@ def cluster_name_validator(cls, value: str) -> Optional[str]: @validator("profile_limit_memory") @classmethod - def profile_limit_memory_validator(cls, value: int) -> Optional[int]: + def profile_limit_memory_validator(cls, value: int) -> int | None: """Check profile limit memory.""" if value < 600: raise ValueError("MySQL Charm requires at least 600MB for bootstrapping") @@ -117,7 +116,7 @@ def profile_limit_memory_validator(cls, value: int) -> Optional[int]: @validator("experimental_max_connections") @classmethod - def experimental_max_connections_validator(cls, value: int) -> Optional[int]: + def experimental_max_connections_validator(cls, value: int) -> int | None: """Check experimental max connections.""" if value < MAX_CONNECTIONS_FLOOR: raise ValueError( @@ -138,7 +137,7 @@ def binlog_retention_days_validator(cls, value: int) -> int: @validator("plugin_audit_strategy") @classmethod - def plugin_audit_strategy_validator(cls, value: str) -> Optional[str]: + def plugin_audit_strategy_validator(cls, value: str) -> str | None: """Check profile config option is one of `testing` or `production`.""" if value not in ["async", "semi-async"]: raise ValueError("Value not one of 'async' or 'semi-async'") @@ -147,7 +146,7 @@ def plugin_audit_strategy_validator(cls, value: str) -> Optional[str]: @validator("logs_audit_policy") @classmethod - def logs_audit_policy_validator(cls, value: str) -> Optional[str]: + def logs_audit_policy_validator(cls, value: str) -> str | None: """Check values for audit log policy.""" valid_values = ["all", "logins", "queries"] if value not in valid_values: diff --git a/src/mysql_vm_helpers.py b/src/mysql_vm_helpers.py index 9d2d90be4c..f4c652adf8 100644 --- a/src/mysql_vm_helpers.py +++ b/src/mysql_vm_helpers.py @@ -12,7 +12,7 @@ import subprocess import tempfile import typing -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable import jinja2 import pexpect @@ -436,8 +436,8 @@ def wait_until_mysql_connection(self, check_port: bool = True) -> None: def execute_backup_commands( # type: ignore self, s3_directory: str, - s3_parameters: Dict[str, str], - ) -> Tuple[str, str]: + s3_parameters: dict[str, str], + ) -> tuple[str, str]: """Executes commands to create a backup.""" return super().execute_backup_commands( s3_directory, @@ -465,13 +465,13 @@ def delete_temp_backup_directory( # type: ignore def retrieve_backup_with_xbcloud( # type: ignore self, backup_id: str, - s3_parameters: Dict[str, str], + s3_parameters: dict[str, str], temp_restore_directory: str = CHARMED_MYSQL_COMMON_DIRECTORY, xbcloud_location: str = CHARMED_MYSQL_XBCLOUD_LOCATION, xbstream_location: str = CHARMED_MYSQL_XBSTREAM_LOCATION, user=ROOT_SYSTEM_USER, group=ROOT_SYSTEM_USER, - ) -> Tuple[str, str, str]: + ) -> tuple[str, str, str]: """Retrieve the provided backup with xbcloud.""" return super().retrieve_backup_with_xbcloud( backup_id, @@ -483,7 +483,7 @@ def retrieve_backup_with_xbcloud( # type: ignore group, ) - def prepare_backup_for_restore(self, backup_location: str) -> Tuple[str, str]: + def prepare_backup_for_restore(self, backup_location: str) -> tuple[str, str]: """Prepare the download backup for restore with xtrabackup --prepare.""" return super().prepare_backup_for_restore( backup_location, @@ -504,7 +504,7 @@ def empty_data_files(self) -> None: def restore_backup( self, backup_location: str, - ) -> Tuple[str, str]: + ) -> tuple[str, str]: """Restore the provided prepared backup.""" # TODO: remove workaround for changing permissions and ownership of data # files once restore backup commands can be run with snap_daemon user @@ -572,13 +572,13 @@ def delete_temp_restore_directory(self) -> None: def _execute_commands( self, - commands: List[str], + commands: list[str], bash: bool = False, user: str = None, group: str = None, - env_extra: Dict = {}, - stream_output: Optional[str] = None, - ) -> Tuple[str, str]: + env_extra: dict = {}, + stream_output: str | None = None, + ) -> tuple[str, str]: """Execute commands on the server where mysql is running. Args: @@ -802,10 +802,10 @@ def _run_mysqlsh_script( def _run_mysqlcli_script( self, - script: Union[Tuple[Any, ...], List[Any]], + script: tuple[Any, ...] | list[Any], user: str = "root", - password: Optional[str] = None, - timeout: Optional[int] = None, + password: str | None = None, + timeout: int | None = None, exception_as_warning: bool = False, log_errors: bool = True, ) -> list: @@ -1001,7 +1001,7 @@ def write_content_to_file( os.chmod(path, mode=permission) @staticmethod - def fetch_error_log() -> Optional[str]: + def fetch_error_log() -> str | None: """Fetch the mysqld error log.""" if os.path.exists(f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/error.log"): # can be empty if just rotated diff --git a/src/relations/db_router.py b/src/relations/db_router.py index 5ad28f4e15..8a0d9389b7 100644 --- a/src/relations/db_router.py +++ b/src/relations/db_router.py @@ -7,7 +7,6 @@ import logging import typing from collections import namedtuple -from typing import Dict, List, Set, Tuple from charms.mysql.v0.mysql import ( MySQLCheckUserExistenceError, @@ -70,7 +69,7 @@ def _get_or_set_password_in_peer_databag(self, username: str) -> str: def _get_requested_users_from_relation_databag( self, db_router_databag: RelationDataContent - ) -> List[RequestedUser]: + ) -> list[RequestedUser]: """Retrieve requested user information from the db-router relation databag. Args: @@ -109,8 +108,8 @@ def _get_requested_users_from_relation_databag( return requested_users def _create_requested_users( - self, requested_users: List[RequestedUser], user_unit_name: str - ) -> Tuple[Dict[str, str], Set[str]]: + self, requested_users: list[RequestedUser], user_unit_name: str + ) -> tuple[dict[str, str], set[str]]: """Create the requested users and said user scoped databases. Args: diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 17c18fd7d8..adb29f9cdd 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -8,7 +8,6 @@ import string import subprocess import tempfile -from typing import Dict, List, Optional, Set import juju.unit import yaml @@ -133,7 +132,7 @@ async def get_primary_unit( return primary_unit -async def get_server_config_credentials(unit: Unit) -> Dict: +async def get_server_config_credentials(unit: Unit) -> dict: """Helper to run an action to retrieve server config credentials. Args: @@ -145,7 +144,7 @@ async def get_server_config_credentials(unit: Unit) -> Dict: return await juju_.run_action(unit, "get-password", username=SERVER_CONFIG_USERNAME) -async def fetch_credentials(unit: Unit, username: str = None) -> Dict: +async def fetch_credentials(unit: Unit, username: str = None) -> dict: """Helper to run an action to fetch credentials. Args: @@ -159,7 +158,7 @@ async def fetch_credentials(unit: Unit, username: str = None) -> Dict: return await juju_.run_action(unit, "get-password", username=username) -async def rotate_credentials(unit: Unit, username: str = None, password: str = None) -> Dict: +async def rotate_credentials(unit: Unit, username: str = None, password: str = None) -> dict: """Helper to run an action to rotate credentials. Args: @@ -176,7 +175,7 @@ async def rotate_credentials(unit: Unit, username: str = None, password: str = N return await juju_.run_action(unit, "set-password", username=username, password=password) -async def get_legacy_mysql_credentials(unit: Unit) -> Dict: +async def get_legacy_mysql_credentials(unit: Unit) -> dict: """Helper to run an action to retrieve legacy mysql config credentials. Args: @@ -189,7 +188,7 @@ async def get_legacy_mysql_credentials(unit: Unit) -> Dict: @retry(stop=stop_after_attempt(20), wait=wait_fixed(5), reraise=True) -async def get_system_user_password(unit: Unit, user: str) -> Dict: +async def get_system_user_password(unit: Unit, user: str) -> dict: """Helper to run an action to retrieve system user password. Args: @@ -206,10 +205,10 @@ async def execute_queries_on_unit( unit_address: str, username: str, password: str, - queries: List[str], + queries: list[str], commit: bool = False, raw: bool = False, -) -> List: +) -> list: """Execute given MySQL queries on a unit. Args: @@ -271,7 +270,7 @@ def is_relation_broken(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) @retry(stop=stop_after_attempt(30), wait=wait_fixed(5), reraise=True) def is_connection_possible( - credentials: Dict, *, retry_if_not_possible=False, **extra_opts + credentials: dict, *, retry_if_not_possible=False, **extra_opts ) -> bool: """Test a connection to a MySQL server. @@ -544,7 +543,7 @@ async def get_relation_data( return relation_data -def get_read_only_endpoints(relation_data: list) -> Set[str]: +def get_read_only_endpoints(relation_data: list) -> set[str]: """Returns the read-only-endpoints from the relation data. Args: @@ -573,8 +572,8 @@ def get_read_only_endpoints(relation_data: list) -> Set[str]: async def get_leader_unit( - ops_test: Optional[OpsTest], app_name: str, model: Optional[Model] = None -) -> Optional[Unit]: + ops_test: OpsTest | None, app_name: str, model: Model | None = None +) -> Unit | None: """Get the leader unit of a given application. Args: @@ -593,7 +592,7 @@ async def get_leader_unit( return leader_unit -def get_read_only_endpoint_ips(relation_data: list) -> List[str]: +def get_read_only_endpoint_ips(relation_data: list) -> list[str]: """Returns the read-only-endpoint hostnames from the relation data. Args: @@ -640,7 +639,7 @@ async def remove_leader_unit(ops_test: OpsTest, application_name: str): ) -async def get_units_ip_addresses(ops_test: OpsTest, app_name: str) -> List[str]: +async def get_units_ip_addresses(ops_test: OpsTest, app_name: str) -> list[str]: """Retrieves hostnames of given application units. Args: @@ -810,12 +809,12 @@ async def unit_file_md5(ops_test: OpsTest, unit_name: str, file_path: str) -> st return None -async def get_cluster_status(unit: Unit, cluster_set: Optional[bool] = False) -> Dict: +async def get_cluster_status(unit: Unit, cluster_set: bool | None = False) -> dict: """Get the cluster status by running the get-cluster-status action. Args: - ops_test: The ops test framework unit: The unit on which to execute the action on + cluster_set: Whether to get the cluster-set instead Returns: A dictionary representing the cluster status @@ -978,9 +977,7 @@ def get_unit_by_index(app_name: str, units: list, index: int): return unit -async def get_status_log( - ops_test: OpsTest, unit_name: str, num_logs: Optional[int] = None -) -> list: +async def get_status_log(ops_test: OpsTest, unit_name: str, num_logs: int | None = None) -> list: """Get the status log for a unit. Args: diff --git a/tests/integration/high_availability/high_availability_helpers.py b/tests/integration/high_availability/high_availability_helpers.py index 440a4f8341..530becc78c 100644 --- a/tests/integration/high_availability/high_availability_helpers.py +++ b/tests/integration/high_availability/high_availability_helpers.py @@ -3,7 +3,6 @@ import logging from pathlib import Path -from typing import List, Optional import yaml from juju.unit import Unit @@ -76,7 +75,7 @@ def get_application_name(ops_test: OpsTest, application_name_substring: str) -> async def ensure_n_online_mysql_members( - ops_test: OpsTest, number_online_members: int, mysql_units: Optional[List[Unit]] = None + ops_test: OpsTest, number_online_members: int, mysql_units: list[Unit] | None = None ) -> bool: """Waits until N mysql cluster members are online. @@ -241,8 +240,8 @@ async def insert_data_into_mysql_and_validate_replication( ops_test: OpsTest, database_name: str, table_name: str, - mysql_application_substring: Optional[str] = "mysql", - mysql_units: Optional[List[Unit]] = None, + mysql_application_substring: str | None = "mysql", + mysql_units: list[Unit] | None = None, ) -> str: """Inserts data into the mysql cluster and validates its replication. @@ -332,7 +331,7 @@ async def clean_up_database_and_table( async def ensure_all_units_continuous_writes_incrementing( - ops_test: OpsTest, mysql_units: Optional[List[Unit]] = None + ops_test: OpsTest, mysql_units: list[Unit] | None = None ) -> None: """Ensure that continuous writes is incrementing on all units. diff --git a/tests/integration/high_availability/test_async_replication.py b/tests/integration/high_availability/test_async_replication.py index 1d555d4ed0..61f0423146 100644 --- a/tests/integration/high_availability/test_async_replication.py +++ b/tests/integration/high_availability/test_async_replication.py @@ -8,7 +8,6 @@ from asyncio import gather from pathlib import Path from time import sleep -from typing import Optional import pytest import yaml @@ -34,7 +33,7 @@ @pytest.fixture(scope="module") -def first_model(ops_test: OpsTest) -> Optional[Model]: +def first_model(ops_test: OpsTest) -> Model | None: """Return the first model.""" first_model = ops_test.model return first_model diff --git a/tests/integration/high_availability/test_primary_switchover.py b/tests/integration/high_availability/test_primary_switchover.py index 9d2c97d6db..dc3f4f33b4 100644 --- a/tests/integration/high_availability/test_primary_switchover.py +++ b/tests/integration/high_availability/test_primary_switchover.py @@ -3,7 +3,6 @@ import logging from subprocess import run -from typing import Optional import pytest from jubilant import Juju, all_active @@ -75,7 +74,7 @@ def test_cluster_failover_after_majority_loss(juju: Juju, highly_available_clust assert get_primary_unit_name(juju, primary_unit) == unit_to_promote, "Failover failed" -def get_primary_unit_name(juju: Juju, mysql_unit) -> Optional[str]: +def get_primary_unit_name(juju: Juju, mysql_unit) -> str | None: """Get the current primary node of the cluster.""" cluster_status_task = juju.run(mysql_unit, "get-cluster-status") assert cluster_status_task.status == "completed", "Failed to retrieve cluster status" @@ -86,7 +85,7 @@ def get_primary_unit_name(juju: Juju, mysql_unit) -> Optional[str]: return label.replace("-", "/") -def get_app_name(juju: Juju, charm_name: str) -> Optional[str]: +def get_app_name(juju: Juju, charm_name: str) -> str | None: """Get the application name for the given charm.""" status = juju.status() for app, value in status.apps.items(): diff --git a/tests/integration/relations/test_db_router.py b/tests/integration/relations/test_db_router.py index 4375e4b374..1c5f9062a1 100644 --- a/tests/integration/relations/test_db_router.py +++ b/tests/integration/relations/test_db_router.py @@ -5,7 +5,6 @@ import asyncio import logging from pathlib import Path -from typing import Dict, List import pytest import yaml @@ -27,7 +26,7 @@ async def check_successful_keystone_migration( - ops_test: OpsTest, server_config_credentials: Dict + ops_test: OpsTest, server_config_credentials: dict ) -> None: """Checks that the keystone application is successfully migrated in mysql. @@ -66,9 +65,9 @@ async def check_successful_keystone_migration( async def check_keystone_users_existence( ops_test: OpsTest, - server_config_credentials: Dict[str, str], - users_that_should_exist: List[str], - users_that_should_not_exist: List[str], + server_config_credentials: dict[str, str], + users_that_should_exist: list[str], + users_that_should_not_exist: list[str], ) -> None: """Checks that keystone users exist in the database. diff --git a/tests/integration/relations/test_shared_db.py b/tests/integration/relations/test_shared_db.py index 601a865299..989ba81ca3 100644 --- a/tests/integration/relations/test_shared_db.py +++ b/tests/integration/relations/test_shared_db.py @@ -4,7 +4,6 @@ import logging from pathlib import Path -from typing import Dict, List import pytest import yaml @@ -66,7 +65,7 @@ async def deploy_and_relate_keystone_with_mysql( async def check_successful_keystone_migration( - ops_test: OpsTest, server_config_credentials: Dict + ops_test: OpsTest, server_config_credentials: dict ) -> None: """Checks that the keystone application is successfully migrated in mysql. @@ -106,9 +105,9 @@ async def check_successful_keystone_migration( async def check_keystone_users_existence( ops_test: OpsTest, - server_config_credentials: Dict[str, str], - users_that_should_exist: List[str], - users_that_should_not_exist: List[str], + server_config_credentials: dict[str, str], + users_that_should_exist: list[str], + users_that_should_not_exist: list[str], ) -> None: """Checks that keystone users exist in the database.