Skip to content

Commit 7262e5d

Browse files
committed
Improve StreamableHttpPostTransport thread safety
1 parent 4dd7383 commit 7262e5d

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace ModelContextProtocol.Server;
1414
internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport parentTransport, Stream responseStream) : ITransport
1515
{
1616
private readonly SseWriter _sseWriter = new();
17-
private readonly SemaphoreSlim _eventStreamLock = new(1, 1);
17+
private readonly SemaphoreSlim _sendLock = new(1, 1);
1818
private ISseEventStreamWriter? _eventStreamWriter;
1919
private RequestId _pendingRequest;
2020

@@ -66,10 +66,13 @@ public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, Cancellatio
6666
return false;
6767
}
6868

69-
var eventStreamWriter = await GetOrCreateEventStreamAsync(cancellationToken).ConfigureAwait(false);
70-
if (eventStreamWriter is not null)
69+
using (await _sendLock.LockAsync(cancellationToken).ConfigureAwait(false))
7170
{
72-
await _sseWriter.SendPrimingEventAsync(parentTransport.RetryInterval, eventStreamWriter, cancellationToken).ConfigureAwait(false);
71+
var eventStreamWriter = await GetOrCreateEventStreamAsync(cancellationToken).ConfigureAwait(false);
72+
if (eventStreamWriter is not null)
73+
{
74+
await _sseWriter.SendPrimingEventAsync(parentTransport.RetryInterval, eventStreamWriter, cancellationToken).ConfigureAwait(false);
75+
}
7376
}
7477

7578
await writeTask.ConfigureAwait(false);
@@ -85,6 +88,8 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
8588
throw new InvalidOperationException("Server to client requests are not supported in stateless mode.");
8689
}
8790

91+
using var _ = await _sendLock.LockAsync(cancellationToken).ConfigureAwait(false);
92+
8893
var eventStreamWriter = await GetOrCreateEventStreamAsync(cancellationToken).ConfigureAwait(false);
8994

9095
var isAccepted = await _sseWriter.SendMessageAsync(message, eventStreamWriter, cancellationToken).ConfigureAwait(false);
@@ -118,6 +123,8 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo
118123
return;
119124
}
120125

126+
using var _ = await _sendLock.LockAsync(cancellationToken).ConfigureAwait(false);
127+
121128
var eventStreamWriter = await GetOrCreateEventStreamAsync(cancellationToken).ConfigureAwait(false);
122129
if (eventStreamWriter is null)
123130
{
@@ -137,6 +144,8 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo
137144

138145
public async ValueTask DisposeAsync()
139146
{
147+
using var _ = await _sendLock.LockAsync().ConfigureAwait(false);
148+
140149
await _sseWriter.DisposeAsync().ConfigureAwait(false);
141150

142151
// Don't dispose the event stream writer here, as we may continue to write to the event store
@@ -145,8 +154,6 @@ public async ValueTask DisposeAsync()
145154

146155
private async ValueTask<ISseEventStreamWriter?> GetOrCreateEventStreamAsync(CancellationToken cancellationToken)
147156
{
148-
using var _ = await _eventStreamLock.LockAsync(cancellationToken).ConfigureAwait(false);
149-
150157
if (_eventStreamWriter is not null)
151158
{
152159
return _eventStreamWriter;

0 commit comments

Comments
 (0)