forked from impstation/imp-station-14
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathServerDbPostgres.Notifications.cs
132 lines (111 loc) · 4.81 KB
/
ServerDbPostgres.Notifications.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Content.Server.Administration.Managers;
using Microsoft.EntityFrameworkCore;
using Npgsql;
namespace Content.Server.Database;
/// Listens for ban_notification containing the player id and the banning server id using postgres listen/notify.
/// Players a ban_notification got received for get banned, except when the current server id and the one in the notification payload match.
public sealed partial class ServerDbPostgres
{
/// <summary>
/// The list of notify channels to subscribe to.
/// </summary>
private static readonly string[] NotificationChannels =
[
BanManager.BanNotificationChannel,
MultiServerKickManager.NotificationChannel,
];
private static readonly TimeSpan ReconnectWaitIncrease = TimeSpan.FromSeconds(10);
private readonly CancellationTokenSource _notificationTokenSource = new();
private NpgsqlConnection? _notificationConnection;
private TimeSpan _reconnectWaitTime = TimeSpan.Zero;
/// <summary>
/// Sets up the database connection and the notification handler
/// </summary>
private void InitNotificationListener(string connectionString)
{
_notificationConnection = new NpgsqlConnection(connectionString);
_notificationConnection.Notification += OnNotification;
var cancellationToken = _notificationTokenSource.Token;
Task.Run(() => NotificationListener(cancellationToken), cancellationToken);
}
/// <summary>
/// Listens to the notification channel with basic error handling and reopens the connection if it got closed
/// </summary>
private async Task NotificationListener(CancellationToken cancellationToken)
{
if (_notificationConnection == null)
return;
_notifyLog.Verbose("Starting notification listener");
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (_notificationConnection.State == ConnectionState.Broken)
{
_notifyLog.Debug("Notification listener entered broken state, closing...");
await _notificationConnection.CloseAsync();
}
if (_notificationConnection.State == ConnectionState.Closed)
{
_notifyLog.Debug("Opening notification listener connection...");
if (_reconnectWaitTime != TimeSpan.Zero)
{
_notifyLog.Verbose($"_reconnectWaitTime is {_reconnectWaitTime}");
await Task.Delay(_reconnectWaitTime, cancellationToken);
}
await _notificationConnection.OpenAsync(cancellationToken);
_reconnectWaitTime = TimeSpan.Zero;
_notifyLog.Verbose($"Notification connection opened...");
}
foreach (var channel in NotificationChannels)
{
_notifyLog.Verbose($"Listening on channel {channel}");
await using var cmd = new NpgsqlCommand($"LISTEN {channel}", _notificationConnection);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
while (!cancellationToken.IsCancellationRequested)
{
_notifyLog.Verbose("Waiting on notifications...");
await _notificationConnection.WaitAsync(cancellationToken);
}
}
catch (OperationCanceledException)
{
// Abort loop on cancel.
_notifyLog.Verbose($"Shutting down notification listener due to cancellation");
return;
}
catch (Exception e)
{
_reconnectWaitTime += ReconnectWaitIncrease;
_notifyLog.Error($"Error in notification listener: {e}");
}
}
_notificationConnection.Dispose();
}
private void OnNotification(object _, NpgsqlNotificationEventArgs notification)
{
_notifyLog.Verbose($"Received notification on channel {notification.Channel}");
NotificationReceived(new DatabaseNotification
{
Channel = notification.Channel,
Payload = notification.Payload,
});
}
public override async Task SendNotification(DatabaseNotification notification)
{
await using var db = await GetDbImpl();
await db.PgDbContext.Database.ExecuteSqlAsync(
$"SELECT pg_notify({notification.Channel}, {notification.Payload})");
}
public override void Shutdown()
{
_notificationTokenSource.Cancel();
if (_notificationConnection == null)
return;
_notificationConnection.Notification -= OnNotification;
}
}