Skip to content

Commit

Permalink
apply black
Browse files Browse the repository at this point in the history
  • Loading branch information
Salamandar committed Dec 9, 2024
1 parent db61115 commit 0a5dd05
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
47 changes: 34 additions & 13 deletions src/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
logger = getLogger("yunohost.firewall")


class YunoFirewall():
class YunoFirewall:
FIREWALL_FILE = Path("/etc/yunohost/firewall.yml")

def __init__(self) -> None:
Expand Down Expand Up @@ -70,11 +70,17 @@ def _validate_port(protocol: str, port: int | str) -> tuple[str, int | str]:
raise ValueError(f"protocol should be tcp or udp, not {protocol}")
return protocol, port

def open_port(self, protocol: str, port: int | str, comment: str, upnp: bool = False) -> None:
def open_port(
self, protocol: str, port: int | str, comment: str, upnp: bool = False
) -> None:
protocol, port = self._validate_port(protocol, port)

if port not in self.config[protocol]:
self.config[protocol][port] = {"open": False, "upnp": False, "comment": comment}
self.config[protocol][port] = {
"open": False,
"upnp": False,
"comment": comment,
}

if not self.config[protocol][port]["open"]:
self.config[protocol][port]["open"] = True
Expand All @@ -85,7 +91,9 @@ def open_port(self, protocol: str, port: int | str, comment: str, upnp: bool = F
self.need_reload = True
self.write()

def close_port(self, protocol: str, port: int | str, upnp_only: bool = False) -> None:
def close_port(
self, protocol: str, port: int | str, upnp_only: bool = False
) -> None:
protocol, port = self._validate_port(protocol, port)

if port not in self.config[protocol]:
Expand Down Expand Up @@ -131,7 +139,7 @@ def clear(self) -> None:
os.system("systemctl stop nftables")


class YunoUPnP():
class YunoUPnP:
UPNP_PORT = 55354 # Picked at random, this port has no real meaning
UPNP_PORT_COMMENT = "YunoHost UPnP firewall configurator"
UPNP_CRON_JOB = Path("/etc/cron.d/yunohost-firewall-upnp")
Expand Down Expand Up @@ -191,7 +199,9 @@ def open_port(self, protocol: str, port: int | str, comment: str) -> bool:
# Add new port mapping
desc = f"yunohost firewall: port {port} {comment}"
try:
self.upnpc.addportmapping( port, protocol, self.upnpc.lanaddr, port, desc, "")
self.upnpc.addportmapping(
port, protocol, self.upnpc.lanaddr, port, desc, ""
)
except Exception:
logger.debug("unable to add port %d using UPnP", port, exc_info=1)
return False
Expand Down Expand Up @@ -229,8 +239,9 @@ def refresh(self, firewall: "YunoFirewall") -> bool:
def enable(self) -> None:
if not self.enabled():
# Add cron job
cron = "*/50 * * * * root /usr/bin/yunohost firewall upnp status >>/dev/null\n"
self.UPNP_CRON_JOB.write_text(cron)
self.UPNP_CRON_JOB.write_text(
"*/50 * * * * root /usr/bin/yunohost firewall upnp status >>/dev/null\n"
)
self.enabled(True)

def disable(self) -> None:
Expand Down Expand Up @@ -264,7 +275,9 @@ def firewall_open(
if not reload_if_changed and not firewall.need_reload:
logger.warning(m18n.n("port_already_opened", port=port))

if (firewall.need_reload and reload_if_changed) or (not no_reload and not reload_if_changed):
if (firewall.need_reload and reload_if_changed) or (
not no_reload and not reload_if_changed
):
firewall.apply()


Expand All @@ -290,7 +303,9 @@ def firewall_close(
if not firewall.need_reload and not reload_if_changed:
logger.warning(m18n.n("port_already_closed", port=port))

if (firewall.need_reload and reload_if_changed) or (not no_reload and not reload_if_changed):
if (firewall.need_reload and reload_if_changed) or (
not no_reload and not reload_if_changed
):
firewall.apply()


Expand All @@ -314,11 +329,15 @@ def firewall_delete(
if not firewall.need_reload and not reload_if_changed:
logger.warning(m18n.n("port_already_closed", port=port))

if (firewall.need_reload and reload_if_changed) or (not no_reload and not reload_if_changed):
if (firewall.need_reload and reload_if_changed) or (
not no_reload and not reload_if_changed
):
firewall.apply()


def firewall_list(raw: bool = False, protocol: str = "tcp", forwarded: bool = False) -> dict[str, Any] | list[int]:
def firewall_list(
raw: bool = False, protocol: str = "tcp", forwarded: bool = False
) -> dict[str, Any] | list[int]:
"""
List all firewall rules
Expand Down Expand Up @@ -384,7 +403,9 @@ def firewall_upnp(action: str = "status", no_refresh: bool = False) -> dict[str,

if upnp.refresh(firewall):
# Display success message if needed
logger.success(m18n.n("upnp_enabled" if upnp.enabled() else "upnp_disabled"))
logger.success(
m18n.n("upnp_enabled") if upnp.enabled() else m18n.n("upnp_disabled")
)
else:
# FIXME: Do not update the config file to let a refresh handle the failure?
raise YunohostError("upnp_port_open_failed")
Expand Down
4 changes: 1 addition & 3 deletions src/migrations/0032_firewall_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ class MyMigration(Migration):

mode = "auto"


def firewall_file_migrate(self) -> None:
old_data = yaml.safe_load(FIREWALL_FILE.open("r", encoding="utf-8"))

new_data: dict[str, Any] = {
"router_forwarding_upnp": old_data["uPnP"]["enabled"],
"tcp": {},
"udp": {}
"udp": {},
}
for proto in ["TCP", "UDP"]:
new_data[proto.lower()] = {
Expand All @@ -54,6 +53,5 @@ def firewall_file_migrate(self) -> None:
}
yaml.dump(new_data, FIREWALL_FILE.open("w", encoding="utf-8"))


def run(self):
self.firewall_file_migrate()

0 comments on commit 0a5dd05

Please sign in to comment.