Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions backend/apps/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import re
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from django.conf import settings
Expand All @@ -13,6 +14,9 @@
from django.utils.text import slugify as django_slugify
from humanize import intword, naturaltime

if TYPE_CHECKING:
from django.http import HttpRequest


def convert_to_camel_case(text: str) -> str:
"""Convert a string to camelCase.
Expand All @@ -28,7 +32,7 @@ def convert_to_camel_case(text: str) -> str:
offset = 1 if text.startswith("_") else 0
head = parts[offset : offset + 1] or [text]

segments = [f"_{head[0]}" if offset else head[0]]
segments: list[str] = [f"_{head[0]}" if offset else head[0]]
segments.extend(word.capitalize() for word in parts[offset + 1 :])

return "".join(segments)
Expand All @@ -47,11 +51,11 @@ def convert_to_snake_case(text: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", text).lower()


def clean_url(url: str) -> str | None:
def clean_url(url: str | None) -> str | None:
"""Clean a URL by removing whitespace and trailing punctuation.

Args:
url (str): Raw URL string.
url (str | None): Raw URL string.

Returns:
str | None: Cleaned URL string or None if empty.
Expand Down Expand Up @@ -79,14 +83,14 @@ def get_absolute_url(path: str) -> str:
def get_nest_user_agent() -> str:
"""Return the user agent string for the Nest application.

Returns
Returns:
str: The user agent string.

"""
return settings.APP_NAME.replace(" ", "-").lower()


def get_user_ip_address(request) -> str:
def get_user_ip_address(request: HttpRequest) -> str:
"""Retrieve the user's IP address from the request.

Args:
Expand All @@ -99,8 +103,10 @@ def get_user_ip_address(request) -> str:
if settings.IS_LOCAL_ENVIRONMENT:
return settings.PUBLIC_IP_ADDRESS

x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
return x_forwarded_for.split(",")[0] if x_forwarded_for else request.META.get("REMOTE_ADDR")
x_forwarded_for: str | None = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
return x_forwarded_for.split(",")[0]
return request.META.get("REMOTE_ADDR", "")


def is_valid_json(content: str) -> bool:
Expand Down Expand Up @@ -134,18 +140,18 @@ def join_values(fields: list, delimiter: str = " ") -> str:
return delimiter.join(field for field in fields if field)


def natural_date(value: int | str) -> str:
def natural_date(value: int | str | datetime) -> str:
"""Convert a date or timestamp into a human-readable format.

Args:
value (str or int or datetime): The date or timestamp to convert.
value (str | int | datetime): The date or timestamp to convert.

Returns:
str: The humanized date string.

"""
if isinstance(value, str):
dt = datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=UTC)
dt: datetime = datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=UTC)
elif isinstance(value, int):
dt = datetime.fromtimestamp(value, tz=UTC)
else:
Expand All @@ -154,27 +160,27 @@ def natural_date(value: int | str) -> str:
return naturaltime(dt)


def natural_number(value: int, unit=None) -> str:
def natural_number(value: int, unit: str | None = None) -> str:
"""Convert a number into a human-readable format.

Args:
value (int): The number to convert.
unit (str, optional): The unit to append.
unit (str , optional): The unit to append.

Returns:
str: The humanized number string.

"""
number = intword(value)
number: str = intword(value)
return f"{number} {unit}{pluralize(value)}" if unit else number


def round_down(value: int, base: int) -> int:
"""Round down the stats to the nearest base.

Args:
value: The value to round down.
base: The base to round down to.
value (int): The value to round down.
base (int): The base to round down to.

Returns:
int: The rounded down value.
Expand Down Expand Up @@ -211,11 +217,11 @@ def truncate(text: str, limit: int, truncate: str = "...") -> str:
return Truncator(text).chars(limit, truncate=truncate)


def validate_url(url: str) -> bool:
def validate_url(url: str | None) -> bool:
"""Validate that a URL has proper scheme and netloc.

Args:
url (str): URL string to validate.
url (str | None): URL string to validate.

Returns:
bool: True if URL is valid, False otherwise.
Expand Down
27 changes: 15 additions & 12 deletions backend/apps/github/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ def check_owasp_site_repository(key: str) -> bool:
)


def check_funding_policy_compliance(platform: str, target: str) -> bool:
def check_funding_policy_compliance(platform: str, target: str | None) -> bool:
"""Check OWASP funding policy compliance.

Args:
platform (str): The funding platform (e.g., 'github', 'custom').
target (str): The funding target.
target (str, optional): The funding target.

Returns:
bool: True if the funding policy is compliant, False otherwise.
Expand All @@ -50,8 +50,8 @@ def check_funding_policy_compliance(platform: str, target: str) -> bool:
if platform == "github":
return target.lower() == "owasp"
if platform == "custom":
location = urlparse(target).netloc.lower()
owasp_org = "owasp.org"
location: str = urlparse(target).netloc.lower()
owasp_org: str = "owasp.org"
return location == owasp_org or location.endswith(f".{owasp_org}")

return False
Expand All @@ -66,17 +66,19 @@ def get_repository_file_content(

Args:
url (str): The URL of the file.
timeout (int, optional): The request timeout in seconds.
timeout (float, optional): The request timeout in seconds.

Returns:
str: The content of the file, or None if the request fails.
str: The content of the file, or empty string if the request fails.

"""
try:
return requests.get(url, timeout=timeout).text
response: requests.Response = requests.get(url, timeout=timeout)
except RequestException as e:
logger.exception("Failed to fetch file", extra={"URL": url, "error": str(e)})
return ""
else:
return response.text


def get_repository_path(url: str) -> str | None:
Expand All @@ -86,7 +88,8 @@ def get_repository_path(url: str) -> str | None:
url (str): The repository URL.

Returns:
str: The repository path in the format 'owner/repository_name', or None if parsing fails.
str | None: The repository path in the format 'owner/repository_name',
or None if parsing fails.

"""
match = GITHUB_REPOSITORY_RE.search(url.split("#")[0])
Expand All @@ -101,19 +104,19 @@ def normalize_url(url: str, *, check_path: bool = False) -> str | None:
check_path (bool, optional): Whether to check if the URL has a path.

Returns:
str: The normalized URL, or None if the URL is invalid.
str | None: The normalized URL, or None if the URL is invalid.

"""
parsed_url = urlparse(url)
if not parsed_url.netloc or (check_path and not parsed_url.path):
return None

http_prefix = "http://" # NOSONAR
https_prefix = "https://"
http_prefix: str = "http://" # NOSONAR
https_prefix: str = "https://"
if not parsed_url.scheme:
url = f"{https_prefix}{url}"

normalized_url = (
normalized_url: str = (
f"{https_prefix}{url[len(http_prefix) :]}" if url.startswith(http_prefix) else url
)

Expand Down
33 changes: 17 additions & 16 deletions backend/apps/slack/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
from functools import lru_cache
from html import escape as escape_html
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

if TYPE_CHECKING: # pragma: no cover
Expand All @@ -22,7 +22,7 @@
logger: logging.Logger = logging.getLogger(__name__)


def escape(content) -> str:
def escape(content: str) -> str:
"""Escape HTML content.

Args:
Expand Down Expand Up @@ -53,7 +53,7 @@ def format_links_for_slack(text: str) -> str:


@lru_cache
def get_gsoc_projects(year: int) -> list:
def get_gsoc_projects(year: int) -> list[dict[str, Any]]:
"""Get GSoC projects.

Args:
Expand Down Expand Up @@ -83,18 +83,19 @@ def get_news_data(limit: int = 10, timeout: float | None = 30) -> list[dict[str,

Args:
limit (int, optional): The maximum number of news items to fetch.

timeout (int, optional): The request timeout in seconds.

Returns:
list: A list of dictionaries containing news data (author, title, and URL).

"""
response = requests.get(OWASP_NEWS_URL, timeout=timeout)
response: requests.Response = requests.get(OWASP_NEWS_URL, timeout=timeout)
tree = html.fromstring(response.content)
h2_tags = tree.xpath("//h2")

items_total = 0
items = []
items_total: int = 0
items: list[dict[str, str]] = []
for h2 in h2_tags:
if anchor := h2.xpath(".//a[@href]"):
author_tag = h2.xpath("./following-sibling::p[@class='author']")
Expand All @@ -114,7 +115,7 @@ def get_news_data(limit: int = 10, timeout: float | None = 30) -> list[dict[str,


@lru_cache
def get_staff_data(timeout: float | None = 30) -> list | None:
def get_staff_data(timeout: float | None = 30) -> list[dict[str, Any]] | None:
"""Get staff data.

Args:
Expand All @@ -124,7 +125,7 @@ def get_staff_data(timeout: float | None = 30) -> list | None:
list or None: A sorted list of staff data dictionaries, or None if an error occurs.

"""
file_path = "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/staff.yml"
file_path: str = "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/staff.yml"
try:
return sorted(
yaml.safe_load(
Expand All @@ -144,10 +145,10 @@ def get_sponsors_data(limit: int = 10) -> QuerySet | None:
"""Get sponsors data.

Args:
limit (int, optional): The maximum number of sponsors to fetch.
limit(int, optional): The maximum number of sponsors to fetch.

Returns:
QuerySet or None: A queryset of sponsors, or None if an error occurs.
A queryset of sponsors, or None if an error occurs.

"""
from apps.owasp.models.sponsor import Sponsor
Expand All @@ -167,7 +168,7 @@ def get_posts_data(limit: int = 5) -> QuerySet | None:
limit (int, optional): The maximum number of posts to fetch.

Returns:
QuerySet or None: A queryset of recent posts, or None if an error occurs.
QuerySet or None: A queryset of sponsors, or None if an error occurs.

"""
from apps.owasp.models.post import Post
Expand All @@ -179,7 +180,7 @@ def get_posts_data(limit: int = 5) -> QuerySet | None:
return None


def get_text(blocks: tuple) -> str:
def get_text(blocks: tuple[dict[str, Any], ...]) -> str:
"""Convert blocks to plain text.

Args:
Expand All @@ -189,7 +190,7 @@ def get_text(blocks: tuple) -> str:
str: The plain text representation of the blocks.

"""
text = []
text: list[str] = []

for block in blocks:
match block.get("type"):
Expand Down Expand Up @@ -236,11 +237,11 @@ def strip_markdown(text: str) -> str:
"""Strip markdown formatting.

Args:
text (str): The text with markdown formatting.
text(str): The text with markdown formatting.

Returns:
str: The text with markdown formatting removed.
The text with markdown formatting removed.

"""
slack_link_pattern = re.compile(r"<(https?://[^|]+)\|([^>]+)>")
slack_link_pattern: re.Pattern[str] = re.compile(r"<(https?://[^|]+)\|([^>]+)>")
return slack_link_pattern.sub(r"\2 (\1)", text).replace("*", "")