6
6
import psycopg
7
7
import select
8
8
import json
9
- import time
10
9
import os
11
- import requests
12
10
import app .utils as utils
13
- from app .templates .common import render_template
14
11
from app .auth import JwtProvider
12
+ from channels .software_for_community_join_request import SoftwareCommunityJoinRequestHandler
13
+ from channels .access_token_deleted import AccessTokenDeletedHandler
14
+ from channels .access_token_expiring import AccessTokenExpiringHandler
15
15
16
16
BASE_URL = os .getenv ("POSTGREST_URL" )
17
17
JWT_PROVIDER = JwtProvider ()
18
18
19
+ CHANNEL_HANDLERS = [
20
+ SoftwareCommunityJoinRequestHandler (),
21
+ AccessTokenDeletedHandler (),
22
+ AccessTokenExpiringHandler ()
23
+ ]
24
+
19
25
def connect_to_postgres ():
20
26
for i in range (5 ):
21
27
try :
@@ -30,85 +36,37 @@ def connect_to_postgres():
30
36
except psycopg .OperationalError as e :
31
37
print (f"Connecting attempt { i + 1 } Publisher to Postgres database failed: { e } " )
32
38
33
- def listen_to_channel (cursor , channel_name ):
34
- cursor .execute (f"LISTEN { channel_name } ;" )
39
+ def listen_to_channels (conn , channel_handlers ):
40
+ cursor = conn .cursor ()
41
+ for handler in channel_handlers :
42
+ cursor .execute (f"LISTEN { handler .name } ;" )
35
43
36
- def process_notifications ( connection ):
44
+ print ( "Listening to channels..." )
37
45
while True :
38
46
try :
39
- if select .select ([connection ], [], [], 5 ) == ([], [], []):
47
+ if select .select ([conn ], [], [], 5 ) == ([], [], []):
40
48
continue
41
-
42
- for notify in connection .notifies ():
43
- payload = json .loads (notify .payload )
44
- if notify .channel == "software_for_community_join_request" :
45
- software_name , software_slug = get_software_name (payload ["software" ])
46
- recipients = get_maintainer_emails_for_community (payload ["community" ])
47
- community_name , community_slug = get_community_info (payload ["community" ])
48
- if community_name :
49
- send_community_join_request_mail (recipients , software_name , community_name , utils .create_software_page_url (software_slug ), utils .create_community_requests_url (community_slug ))
50
-
49
+ for notify in conn .notifies ():
50
+ process_notifications (channel_handlers , notify .channel , json .loads (notify .payload ))
51
51
except (Exception , psycopg .DatabaseError ) as error :
52
52
utils .log_to_backend (
53
53
service_name = "Postgres Notification Listener" ,
54
54
table_name = "" ,
55
- message = f"Exception while listening to Postgres (software_for_community_join_request) : { error } " ,
55
+ message = f"Exception while listening to Postgres: { error } " ,
56
56
)
57
57
print (error )
58
58
break
59
59
60
60
61
- def get_maintainer_emails_for_community (community_id ):
62
- response = requests .post (
63
- f"{ BASE_URL } /rpc/maintainers_of_community" ,
64
- headers = {
65
- "Authorization" : f"Bearer { JWT_PROVIDER .get_admin_jwt ()} " ,
66
- "Content-Type" : "application/json" ,
67
- },
68
- json = {
69
- 'community_id' : community_id
70
- }
71
- )
72
- return [maintainer ['email' ][0 ] for maintainer in response .json ()]
73
-
74
- def get_community_info (community_id ):
75
- response = requests .get (
76
- f"{ BASE_URL } /community?id=eq.{ community_id } &select=name, slug" ,
77
- headers = {
78
- "Authorization" : f"Bearer { JWT_PROVIDER .get_admin_jwt ()} " ,
79
- "Content-Type" : "application/json" ,
80
- }
81
- )
82
- return response .json ()[0 ]["name" ], response .json ()[0 ]["slug" ]
83
-
84
- def get_software_name (software_id ):
85
- response = requests .get (
86
- f"{ BASE_URL } /software?id=eq.{ software_id } &select=brand_name, slug" ,
87
- headers = {
88
- "Authorization" : f"Bearer { JWT_PROVIDER .get_admin_jwt ()} " ,
89
- "Content-Type" : "application/json" ,
90
- }
91
- )
92
- return response .json ()[0 ]["brand_name" ], response .json ()[0 ]["slug" ]
61
+ def process_notifications (handlers , channel_name , payload ):
62
+ for handler in handlers :
63
+ if handler .name == channel_name :
64
+ preprocessed = handler .preprocess (payload )
65
+ handler .process (preprocessed )
66
+ break
93
67
94
- def send_community_join_request_mail (recipients , software_name , community_name , software_page_url , community_settings_url ):
95
- subject = f"RSD: Community join request for { community_name } "
96
- html_content = render_template ("community_join_request.html" , {"SOFTWARE_NAME" : software_name , "COMMUNITY_NAME" : community_name , "SOFTWARE_PAGE_URL" : software_page_url , "COMMUNITY_REQUESTS_URL" : community_settings_url , "RSD_URL" : os .getenv ("HOST_URL" )})
97
- body = dict (
98
- subject = subject ,
99
- recipients = recipients ,
100
- html_content = html_content ,
101
- plain_content = None
102
- )
103
- utils .publish_to_queue (os .environ .get ("MAIL_QUEUE" , "mailq" ), body )
104
68
105
69
if __name__ == "__main__" :
106
70
connection = connect_to_postgres ()
107
71
connection .autocommit = True
108
-
109
- cursor = connection .cursor ()
110
-
111
- # listen for community join requests
112
- listen_to_channel (cursor , "software_for_community_join_request" )
113
- process_notifications (connection )
114
-
72
+ listen_to_channels (connection , CHANNEL_HANDLERS )
0 commit comments