Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion database/090-access-token.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,46 @@ $$;


CREATE FUNCTION cleanup_expired_token() RETURNS VOID AS $$
DECLARE
deleted_row RECORD;
rows_deleted INTEGER := 0;
soon_expiring RECORD;
soon_expiring_count INTEGER := 0;
BEGIN
DELETE FROM user_access_token WHERE expires_at <= CURRENT_DATE;
FOR deleted_row IN
DELETE FROM user_access_token
WHERE expires_at <= CURRENT_TIMESTAMP
RETURNING *
LOOP
-- Send notification for each deleted row
PERFORM pg_notify(
'access_token_deleted_now',
json_build_object(
'id', deleted_row.id,
'account', deleted_row.account,
'display_name', deleted_row.display_name
)::text
);
rows_deleted := rows_deleted + 1;
END LOOP;
RAISE NOTICE 'Deleted % access tokens and sent notifications', rows_deleted;

FOR soon_expiring IN
SELECT * FROM user_access_token
WHERE expires_at::date = CURRENT_DATE + 7
LOOP
-- Send notification for each token expiring in 7 days
PERFORM pg_notify(
'access_token_expiring_7_days',
json_build_object(
'id', soon_expiring.id,
'account', soon_expiring.account,
'display_name', soon_expiring.display_name
)::text
);
soon_expiring_count := soon_expiring_count + 1;
END LOOP;
RAISE NOTICE '% access tokens expiring in 7 days, sent notifications', soon_expiring_count;

END;
$$ LANGUAGE plpgsql;
4 changes: 4 additions & 0 deletions publisher/app/listeners/channels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
24 changes: 24 additions & 0 deletions publisher/app/listeners/channels/access_token_deleted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os

from app.templates.common import render_template
from .base import ChannelHandler
from app.services.postgrest_helpers import get_mail_for_account
from app import utils

class AccessTokenDeletedHandler(ChannelHandler):
def __init__(self):
super().__init__("access_token_deleted_now")

def preprocess(self, payload):
recipient = get_mail_for_account(payload["account"])
return dict(
subject="RSD: Your API access token expired",
recipients=[recipient],
html_content=render_template("access_token_expired_now.html", {"DISPLAY_NAME": payload["display_name"]}),
plain_content=None
)
24 changes: 24 additions & 0 deletions publisher/app/listeners/channels/access_token_expiring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os

from app.templates.common import render_template
from .base import ChannelHandler
from app.services.postgrest_helpers import get_mail_for_account
from app import utils

class AccessTokenExpiringHandler(ChannelHandler):
def __init__(self):
super().__init__("access_token_expiring_7_days")

def preprocess(self, payload):
recipient = get_mail_for_account(payload["account"])
return dict(
subject="RSD: Your API access token will expire in 7 days",
recipients=[recipient],
html_content=render_template("access_token_expiring_soon.html", {"DISPLAY_NAME": payload["display_name"]}),
plain_content=None
)
18 changes: 18 additions & 0 deletions publisher/app/listeners/channels/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os
from app import utils

class ChannelHandler:
def __init__(self, name):
self.name = name

def preprocess(self, payload):
"""Must override and return mail body dict"""
raise NotImplementedError("Process method must be implemented.")

def process(self, mail_body):
return utils.publish_to_queue(os.environ.get("MAIL_QUEUE", "mailq"), mail_body)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os

from app.templates.common import render_template
from .base import ChannelHandler
from app.services.postgrest_helpers import get_software_name, get_maintainer_emails_for_community, get_community_info
from app import utils

class SoftwareCommunityJoinRequestHandler(ChannelHandler):
def __init__(self):
super().__init__("software_for_community_join_request")

def preprocess(self, payload):
software_name, software_slug = get_software_name(payload["software"])
recipients = get_maintainer_emails_for_community(payload["community"])
community_name, community_slug = get_community_info(payload["community"])
software_page_url = utils.create_software_page_url(software_slug)
community_settings_url = utils.create_community_requests_url(community_slug)
return dict(
subject=f"RSD: Community join request for {community_name}",
recipients=recipients,
html_content=render_template("community_join_request.html", {"SOFTWARE_NAME": software_name, "COMMUNITY_NAME": community_name, "SOFTWARE_PAGE_URL": software_page_url, "COMMUNITY_REQUESTS_URL": community_settings_url, "RSD_URL": os.getenv("HOST_URL")}),
plain_content=None
)
92 changes: 25 additions & 67 deletions publisher/app/listeners/postgres_notifications_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@
import psycopg
import select
import json
import time
import os
import requests
import app.utils as utils
from app.templates.common import render_template
from app.auth import JwtProvider
from channels.software_for_community_join_request import SoftwareCommunityJoinRequestHandler
from channels.access_token_deleted import AccessTokenDeletedHandler
from channels.access_token_expiring import AccessTokenExpiringHandler

BASE_URL=os.getenv("POSTGREST_URL")
JWT_PROVIDER = JwtProvider()

CHANNEL_HANDLERS = [
SoftwareCommunityJoinRequestHandler(),
AccessTokenDeletedHandler(),
AccessTokenExpiringHandler()
]

def connect_to_postgres():
for i in range(5):
try:
Expand All @@ -30,85 +36,37 @@ def connect_to_postgres():
except psycopg.OperationalError as e:
print(f"Connecting attempt {i+1} Publisher to Postgres database failed: {e}")

def listen_to_channel(cursor, channel_name):
cursor.execute(f"LISTEN {channel_name};")
def listen_to_channels(conn, channel_handlers):
cursor = conn.cursor()
for handler in channel_handlers:
cursor.execute(f"LISTEN {handler.name};")

def process_notifications(connection):
print("Listening to channels...")
while True:
try:
if select.select([connection], [], [], 5) == ([], [], []):
if select.select([conn], [], [], 5) == ([], [], []):
continue

for notify in connection.notifies():
payload = json.loads(notify.payload)
if notify.channel == "software_for_community_join_request":
software_name, software_slug = get_software_name(payload["software"])
recipients = get_maintainer_emails_for_community(payload["community"])
community_name, community_slug = get_community_info(payload["community"])
if community_name:
send_community_join_request_mail(recipients, software_name, community_name, utils.create_software_page_url(software_slug), utils.create_community_requests_url(community_slug))

for notify in conn.notifies():
process_notifications(channel_handlers, notify.channel, json.loads(notify.payload))
except (Exception, psycopg.DatabaseError) as error:
utils.log_to_backend(
service_name="Postgres Notification Listener",
table_name="",
message=f"Exception while listening to Postgres (software_for_community_join_request): {error}",
message=f"Exception while listening to Postgres: {error}",
)
print(error)
break


def get_maintainer_emails_for_community(community_id):
response = requests.post(
f"{BASE_URL}/rpc/maintainers_of_community",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
},
json={
'community_id': community_id
}
)
return [maintainer['email'][0] for maintainer in response.json()]

def get_community_info(community_id):
response = requests.get(
f"{BASE_URL}/community?id=eq.{community_id}&select=name, slug",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
}
)
return response.json()[0]["name"], response.json()[0]["slug"]

def get_software_name(software_id):
response = requests.get(
f"{BASE_URL}/software?id=eq.{software_id}&select=brand_name, slug",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
}
)
return response.json()[0]["brand_name"], response.json()[0]["slug"]
def process_notifications(handlers, channel_name, payload):
for handler in handlers:
if handler.name == channel_name:
preprocessed = handler.preprocess(payload)
handler.process(preprocessed)
break

def send_community_join_request_mail(recipients, software_name, community_name, software_page_url, community_settings_url):
subject = f"RSD: Community join request for {community_name}"
html_content = render_template("community_join_request.html", {"SOFTWARE_NAME": software_name, "COMMUNITY_NAME": community_name, "SOFTWARE_PAGE_URL": software_page_url, "COMMUNITY_REQUESTS_URL": community_settings_url, "RSD_URL": os.getenv("HOST_URL")})
body = dict(
subject=subject,
recipients=recipients,
html_content=html_content,
plain_content=None
)
utils.publish_to_queue(os.environ.get("MAIL_QUEUE", "mailq"), body)

if __name__ == "__main__":
connection = connect_to_postgres()
connection.autocommit = True

cursor = connection.cursor()

# listen for community join requests
listen_to_channel(cursor, "software_for_community_join_request")
process_notifications(connection)

listen_to_channels(connection, CHANNEL_HANDLERS)
54 changes: 54 additions & 0 deletions publisher/app/services/postgrest_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os
import requests
from app.auth import JwtProvider

BASE_URL=os.getenv("POSTGREST_URL")
JWT_PROVIDER = JwtProvider()

def get_maintainer_emails_for_community(community_id):
response = requests.post(
f"{BASE_URL}/rpc/maintainers_of_community",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
},
json={
'community_id': community_id
}
)
return [maintainer['email'][0] for maintainer in response.json()]

def get_community_info(community_id):
response = requests.get(
f"{BASE_URL}/community?id=eq.{community_id}&select=name,slug",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
}
)
return response.json()[0]["name"], response.json()[0]["slug"]

def get_software_name(software_id):
response = requests.get(
f"{BASE_URL}/software?id=eq.{software_id}&select=brand_name, slug",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
}
)
return response.json()[0]["brand_name"], response.json()[0]["slug"]

def get_mail_for_account(account_id):
response = requests.get(
f"{BASE_URL}/user_profile?account=eq.{account_id}&select=email_address",
headers={
"Authorization": f"Bearer {JWT_PROVIDER.get_admin_jwt()}",
"Content-Type": "application/json",
}
)
return response.json()[0]["email_address"]
12 changes: 12 additions & 0 deletions publisher/app/templates/access_token_expired_now.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!doctype html>
<html>

<head>
<title>API Access Token expired</title>
</head>

<body>
Your API access token "{{DISPLAY_NAME}}" has expired and was deleted. If you are still using this token, you need to generate a new one.
</body>

</html>
4 changes: 4 additions & 0 deletions publisher/app/templates/access_token_expired_now.html.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>

SPDX-License-Identifier: Apache-2.0
13 changes: 13 additions & 0 deletions publisher/app/templates/access_token_expiring_soon.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!doctype html>
<html>

<head>
<title>API Access Token expiring soon</title>
</head>

<body>
Your API access token "{{DISPLAY_NAME}}" will expire in 7 days. If you are still using this token, you need to
generate a new one.
</body>

</html>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SPDX-FileCopyrightText: 2025 Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences
SPDX-FileCopyrightText: 2025 Paula Stock (GFZ) <[email protected]>

SPDX-License-Identifier: Apache-2.0
2 changes: 1 addition & 1 deletion publisher/app/templates/community_join_request.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
The software <a href="{{SOFTWARE_PAGE_URL}}"><b>{{SOFTWARE_NAME}}</b></a> has requested to join your community <a href="{{COMMUNITY_REQUESTS_URL}}"><b>{{COMMUNITY_NAME}}</b></a>.
Log into RSD (<a href="{{RSD_URL}}"><b>{{RSD_URL}}</b></a>) to approve or reject the request.
</body>
</html>
</html>
Loading