Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed postgres subscription issues v14 #7164

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Data;
using System.Threading.Channels;
using HotChocolate.Subscriptions.Diagnostics;
using Npgsql;
using NpgsqlTypes;
using static HotChocolate.Subscriptions.Postgres.PostgresResources;

namespace HotChocolate.Subscriptions.Postgres;
Expand Down Expand Up @@ -69,6 +69,7 @@
private async Task HandleMessage(NpgsqlConnection connection, CancellationToken ct)
{
PostgresMessageEnvelope firstItem;

while (!_channel.Reader.TryRead(out firstItem))
{
if (ct.IsCancellationRequested)
Expand All @@ -79,7 +80,7 @@
await _channel.Reader.WaitToReadAsync(ct);
}

var messages = new List<PostgresMessageEnvelope> { firstItem, };
var messages = new List<PostgresMessageEnvelope> { firstItem };
while (!ct.IsCancellationRequested &&
_maxSendBatchSize > messages.Count &&
_channel.Reader.TryRead(out var item))
Expand All @@ -93,40 +94,60 @@
// firstMessage that was already read from the channel
ct.ThrowIfCancellationRequested();

await using var batch = connection.CreateBatch();
var payloads = new string[messages.Count];

foreach (var message in messages)
for (var i = 0; i < messages.Count; i++)
{
var command = batch.CreateBatchCommand();
payloads[i] = messages[i].FormattedPayload;
}

command.CommandText = "SELECT pg_notify(@channel, @message);";
const string sql =
"""
SELECT pg_notify(t.channel, t.message)
FROM (SELECT @channel AS channel, unnest(@messages) AS message ) AS t;
""";

var channel = new NpgsqlParameter("channel", DbType.String)
{
Value = _channelName,
};
var msg = new NpgsqlParameter("message", DbType.String)
await using var command = connection.CreateCommand();

command.CommandText = sql;

command.Parameters.Add(
new NpgsqlParameter("channel", NpgsqlDbType.Text)
{
Value = message.FormattedPayload,
};
command.Parameters.Add(channel);
command.Parameters.Add(msg);
Value = _channelName
});

batch.BatchCommands.Add(command);
}
command.Parameters.Add(
new NpgsqlParameter("messages", NpgsqlDbType.Array | NpgsqlDbType.Varchar)
{
Value = payloads
});

await batch.PrepareAsync(ct);
await batch.ExecuteNonQueryAsync(ct);
await command.PrepareAsync(ct);
await command.ExecuteNonQueryAsync(ct);
}
catch (Exception ex)
{
var msg = string.Format(ChannelWriter_FailedToSend, messages.Count, ex.Message);
_diagnosticEvents.ProviderInfo(msg);

// if we cannot send the message we put it back into the channel
// however as the channel is bounded, we might not able to requeue the message and will
// be forced to drop them if they can't be written
var failedCount = 0;

Check warning on line 137 in src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs

View check run for this annotation

Codecov / codecov/patch

src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs#L137

Added line #L137 was not covered by tests

foreach (var message in messages)
{
await _channel.Writer.WriteAsync(message, ct);
if (!_channel.Writer.TryWrite(message))
{
failedCount++;
}
}

Check warning on line 145 in src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs

View check run for this annotation

Codecov / codecov/patch

src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs#L141-L145

Added lines #L141 - L145 were not covered by tests

if (failedCount > 0)
{
_diagnosticEvents.ProviderInfo(
string.Format(ChannelWriter_FailedToRequeueMessage, failedCount));

Check warning on line 150 in src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs

View check run for this annotation

Codecov / codecov/patch

src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs#L147-L150

Added lines #L147 - L150 were not covered by tests
}
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@
<data name="PostgresMessageEnvelope_PayloadTooLarge" xml:space="preserve">
<value>Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes</value>
</data>
<data name="ChannelWriter_FailedToRequeueMessage" xml:space="preserve">
<value>The postgres writer was unable to requeue messages. {0} messages have been lost</value>
</data>
</root>
Loading