Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c373ed0

Browse files
committedFeb 2, 2020
Removed all state machine creation for sync path in message reader.
1 parent 4cbc411 commit c373ed0

File tree

6 files changed

+235
-67
lines changed

6 files changed

+235
-67
lines changed
 

‎src/Bedrock.Framework/Protocols/WebSockets/WebSocketFrameReader.cs

+16-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ namespace Bedrock.Framework.Protocols.WebSockets
1313
/// </summary>
1414
public class WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
1515
{
16+
/// <summary>
17+
/// An instance of the WebSocketFrameReader.
18+
/// </summary>
19+
private WebSocketPayloadReader _payloadReader;
20+
1621
/// <summary>
1722
/// Attempts to parse a message from a sequence.
1823
/// </summary>
@@ -30,7 +35,7 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
3035
return false;
3136
}
3237

33-
if (input.IsSingleSegment)
38+
if (input.IsSingleSegment || input.FirstSpan.Length >= 14)
3439
{
3540
if (TryParseSpan(input.FirstSpan, input.Length, out var bytesRead, out message))
3641
{
@@ -123,8 +128,17 @@ private bool TryParseSpan(in ReadOnlySpan<byte> span, long inputLength, out int
123128
}
124129

125130
var header = new WebSocketHeader(fin, opcode, masked, payloadLength, maskingKey);
126-
message = new WebSocketReadFrame(header, new WebSocketPayloadReader(header));
127131

132+
if(_payloadReader == null)
133+
{
134+
_payloadReader = new WebSocketPayloadReader(header);
135+
}
136+
else
137+
{
138+
_payloadReader.Reset(header);
139+
}
140+
141+
message = new WebSocketReadFrame(header, _payloadReader);
128142
bytesRead = 2 + extendedPayloadLengthSize + maskSize;
129143
return true;
130144
}

‎src/Bedrock.Framework/Protocols/WebSockets/WebSocketMessageReader.cs

+129-39
Original file line numberDiff line numberDiff line change
@@ -101,62 +101,91 @@ public WebSocketMessageReader(PipeReader transport, IControlFrameHandler control
101101
/// </summary>
102102
/// <param name="cancellationToken">A cancellation token, if any.</param>
103103
/// <returns>A message read result.</returns>
104-
public async ValueTask<MessageReadResult> ReadAsync(CancellationToken cancellationToken = default)
104+
public ValueTask<MessageReadResult> ReadAsync(CancellationToken cancellationToken = default)
105105
{
106106
if (_awaitingHeader)
107107
{
108-
var frame = await GetNextMessageFrameAsync(cancellationToken).ConfigureAwait(false);
109-
ValidateHeader(frame.Header);
108+
var readTask = GetNextMessageFrameAsync(cancellationToken);
109+
if(readTask.IsCompletedSuccessfully)
110+
{
111+
var frame = readTask.Result;
112+
ValidateHeader(frame.Header);
110113

111-
_header = frame.Header;
112-
_payloadReader = frame.Payload;
114+
_header = frame.Header;
115+
_payloadReader = frame.Payload;
116+
}
117+
else
118+
{
119+
return DoReadHeaderRequiredAsync(readTask, cancellationToken);
120+
}
113121
}
114122

123+
return ReadPayloadAsync(cancellationToken);
124+
}
125+
126+
/// <summary>
127+
/// Completes an async read when reading a header is required.
128+
/// </summary>
129+
/// <param name="readTask">The active async read task from the ProtocolReader.</param>
130+
/// <param name="cancellationToken">A cancellation token.</param>
131+
/// <returns>A MessageReadResult.</returns>
132+
private async ValueTask<MessageReadResult> DoReadHeaderRequiredAsync(ValueTask<WebSocketReadFrame> readTask, CancellationToken cancellationToken)
133+
{
134+
var frame = await readTask.ConfigureAwait(false);
135+
136+
ValidateHeader(frame.Header);
137+
138+
_header = frame.Header;
139+
_payloadReader = frame.Payload;
140+
141+
return await ReadPayloadAsync(cancellationToken);
142+
}
143+
144+
/// <summary>
145+
/// Reads a portion of a message payload.
146+
/// </summary>
147+
/// <param name="cancellationToken">A cancellation token.</param>
148+
/// <returns>A MessageReadResult.</returns>
149+
private ValueTask<MessageReadResult> ReadPayloadAsync(CancellationToken cancellationToken)
150+
{
115151
//Don't keep reading data into the buffer if we've hit a threshold
116152
//TODO: Is this even the right value to use in this context?
117153
if (_buffer.UnconsumedWrittenCount < _options.PauseWriterThreshold)
118154
{
119155
var readTask = _protocolReader.ReadAsync(_payloadReader, cancellationToken);
120-
ProtocolReadResult<ReadOnlySequence<byte>> payloadSequence;
121-
122156
if (readTask.IsCompletedSuccessfully)
123157
{
124-
payloadSequence = readTask.Result;
158+
PopulateFromRead(readTask.Result);
125159
}
126160
else
127161
{
128-
payloadSequence = await readTask;
129-
}
130-
131-
if (payloadSequence.IsCanceled)
132-
{
133-
throw new OperationCanceledException("Read canceled while attempting to read WebSocket payload.");
134-
}
135-
136-
var sequence = payloadSequence.Message;
137-
138-
//If there is already data in the buffer, we'll need to add to it
139-
if (_buffer.UnconsumedWrittenCount > 0)
140-
{
141-
if (sequence.IsSingleSegment)
142-
{
143-
_buffer.Write(sequence.FirstSpan);
144-
}
145-
else
146-
{
147-
foreach (var segment in sequence)
148-
{
149-
_buffer.Write(segment.Span);
150-
}
151-
}
152-
}
162+
return CreateMessageReadResultAsync(readTask, cancellationToken);
163+
}
164+
}
153165

154-
_currentSequence = payloadSequence.Message;
155-
_isCompleted = payloadSequence.IsCompleted;
156-
_isCanceled = payloadSequence.IsCanceled;
166+
var endOfMessage = _header.Fin && _payloadReader.BytesRemaining == 0;
157167

158-
_awaitingHeader = _payloadReader.BytesRemaining == 0;
168+
//Serve back buffered data, if it exists, else give the direct sequence without buffering
169+
if (_buffer.UnconsumedWrittenCount > 0)
170+
{
171+
return new ValueTask<MessageReadResult>(
172+
new MessageReadResult(new ReadOnlySequence<byte>(_buffer.WrittenMemory), endOfMessage, _isCanceled, _isCompleted));
159173
}
174+
else
175+
{
176+
return new ValueTask<MessageReadResult>(new MessageReadResult(_currentSequence, endOfMessage, _isCanceled, _isCompleted));
177+
}
178+
}
179+
180+
/// <summary>
181+
/// Creates a new MessageReadResult asynchronously.
182+
/// </summary>
183+
/// <param name="readTask">The active read task from the ProtocolReader.</param>
184+
/// <param name="cancellationToken">A cancellation token.</param>
185+
/// <returns>A new MessageReadResult.</returns>
186+
private async ValueTask<MessageReadResult> CreateMessageReadResultAsync(ValueTask<ProtocolReadResult<ReadOnlySequence<byte>>> readTask, CancellationToken cancellationToken)
187+
{
188+
PopulateFromRead(await readTask);
160189

161190
var endOfMessage = _header.Fin && _payloadReader.BytesRemaining == 0;
162191

@@ -171,6 +200,42 @@ public async ValueTask<MessageReadResult> ReadAsync(CancellationToken cancellati
171200
}
172201
}
173202

203+
/// <summary>
204+
/// Populates the message reader from a payload read result.
205+
/// </summary>
206+
/// <param name="readResult">The read result to populate the message reader from.</param>
207+
private void PopulateFromRead(ProtocolReadResult<ReadOnlySequence<byte>> readResult)
208+
{
209+
if (readResult.IsCanceled)
210+
{
211+
throw new OperationCanceledException("Read canceled while attempting to read WebSocket payload.");
212+
}
213+
214+
var sequence = readResult.Message;
215+
216+
//If there is already data in the buffer, we'll need to add to it
217+
if (_buffer.UnconsumedWrittenCount > 0)
218+
{
219+
if (sequence.IsSingleSegment)
220+
{
221+
_buffer.Write(sequence.FirstSpan);
222+
}
223+
else
224+
{
225+
foreach (var segment in sequence)
226+
{
227+
_buffer.Write(segment.Span);
228+
}
229+
}
230+
}
231+
232+
_currentSequence = readResult.Message;
233+
_isCompleted = readResult.IsCompleted;
234+
_isCanceled = readResult.IsCanceled;
235+
236+
_awaitingHeader = _payloadReader.BytesRemaining == 0;
237+
}
238+
174239
/// <summary>
175240
/// Advances the reader to the provided position.
176241
/// </summary>
@@ -223,15 +288,40 @@ public void AdvanceTo(SequencePosition consumed, SequencePosition examined)
223288
/// </summary>
224289
/// <param name="cancellationToken">A cancellation token, if any.</param>
225290
/// <returns>True if the message is text, false otherwise.</returns>
226-
public async ValueTask<bool> MoveNextMessageAsync(CancellationToken cancellationToken = default)
291+
public ValueTask<bool> MoveNextMessageAsync(CancellationToken cancellationToken = default)
227292
{
228293
if (_payloadReader is object && _payloadReader.BytesRemaining != 0)
229294
{
230295
throw new InvalidOperationException("MoveNextMessageAsync cannot be called while a message is still being read.");
231296
}
232297

233-
var frame = await GetNextMessageFrameAsync(cancellationToken);
298+
var readTask = GetNextMessageFrameAsync(cancellationToken);
299+
if (readTask.IsCompletedSuccessfully)
300+
{
301+
return new ValueTask<bool>(SetNextMessageAndGetIsText(readTask.Result));
302+
}
234303

304+
return DoSetNextMessageAsync(readTask);
305+
}
306+
307+
/// <summary>
308+
/// Sets the next message frame asynchronously.
309+
/// </summary>
310+
/// <param name="readTask">The active ProtocolReader read task.</param>
311+
/// <returns>True if the next message is a text message, false otherwise.</returns>
312+
private async ValueTask<bool> DoSetNextMessageAsync(ValueTask<WebSocketReadFrame> readTask)
313+
{
314+
return SetNextMessageAndGetIsText(await readTask);
315+
}
316+
317+
/// <summary>
318+
/// Sets the message reader up with the next message frame data and determines if the message
319+
/// is a text or binary message.
320+
/// </summary>
321+
/// <param name="frame">The read frame to set the message reader with.</param>
322+
/// <returns>True if the next message is text, false otherwise.</returns>
323+
private bool SetNextMessageAndGetIsText(WebSocketReadFrame frame)
324+
{
235325
if (frame.Header.Opcode != WebSocketOpcode.Binary && frame.Header.Opcode != WebSocketOpcode.Text)
236326
{
237327
ThrowBadProtocol($"Expected a start of message frame of Binary or Text but received {frame.Header.Opcode} instead.");

‎src/Bedrock.Framework/Protocols/WebSockets/WebSocketPayloadEncoder.cs

+10-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Bedrock.Framework.Protocols.WebSockets
1313
/// Masks or unmasks a WebSocket payload according to the provided masking key, tracking the
1414
/// masking key index accross mask or unmasking requests.
1515
/// </summary>
16-
internal struct WebSocketPayloadEncoder
16+
internal class WebSocketPayloadEncoder
1717
{
1818
/// <summary>
1919
/// The masking key to use to mask or unmask the payload.
@@ -30,6 +30,15 @@ internal struct WebSocketPayloadEncoder
3030
/// </summary>
3131
/// <param name="maskingKey">The masking key to use to mask or unmask payloads.</param>
3232
public WebSocketPayloadEncoder(int maskingKey)
33+
{
34+
Reset(maskingKey);
35+
}
36+
37+
/// <summary>
38+
/// Resets the payload encoder.
39+
/// </summary>
40+
/// <param name="maskingKey">The masking key to use to mask or unmask payloads.</param>
41+
public void Reset(int maskingKey)
3342
{
3443
_maskingKey = maskingKey;
3544
_currentMaskIndex = 0;

‎src/Bedrock.Framework/Protocols/WebSockets/WebSocketPayloadReader.cs

+12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ public WebSocketPayloadReader(WebSocketHeader header)
4343
_masked = header.Masked;
4444
}
4545

46+
/// <summary>
47+
/// Resets the payload reader.
48+
/// </summary>
49+
/// <param name="header">The WebSocketHeader associated with this payload.</param>
50+
public void Reset(WebSocketHeader header)
51+
{
52+
BytesRemaining = header.PayloadLength;
53+
_masked = header.Masked;
54+
55+
_payloadEncoder.Reset(header.MaskingKey);
56+
}
57+
4658
/// <summary>
4759
/// Attempts to read the WebSocket payload from a sequence.
4860
/// </summary>

‎tests/Bedrock.Framework.Benchmarks/Program.cs

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using BenchmarkDotNet.Running;
22
using System;
3+
using System.Threading.Tasks;
34

45
namespace Bedrock.Framework.Benchmarks
56
{

‎tests/Bedrock.Framework.Benchmarks/WebSocketProtocolBenchmarks.cs

+67-25
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Bedrock.Framework.Protocols.WebSockets;
22
using BenchmarkDotNet.Attributes;
3+
using BenchmarkDotNet.Configs;
34
using Microsoft.AspNetCore.Connections;
45
using System;
56
using System.Buffers;
@@ -13,21 +14,27 @@
1314

1415
namespace Bedrock.Framework.Benchmarks
1516
{
17+
[GroupBenchmarksBy(BenchmarkLogicalGroupRule.ByCategory)]
18+
[CategoriesColumn]
1619
public class WebSocketProtocolBenchmarks
1720
{
18-
private WebSocket _webSocket;
21+
private WebSocket _webSocketServer;
1922

20-
private WebSocketProtocol _webSocketProtocol;
23+
private WebSocket _webSocketClient;
2124

22-
private DefaultConnectionContext _connectionContext;
25+
private WebSocketProtocol _webSocketProtocolServer;
2326

24-
private MemoryStream _stream;
27+
private WebSocketProtocol _webSocketProtocolClient;
2528

26-
private byte[] _message;
29+
private DefaultConnectionContext _serverConnectionContext;
2730

28-
private ArraySegment<byte> _arrayBuffer;
31+
private DefaultConnectionContext _clientConnectionContext;
32+
33+
private MemoryStream _serverStream;
2934

30-
private ReadOnlyMemory<byte> _romBuffer;
35+
private MemoryStream _clientStream;
36+
37+
private ArraySegment<byte> _arrayBuffer;
3138

3239
private class DummyPipeReader : PipeReader
3340
{
@@ -64,50 +71,85 @@ private class DummyDuplexPipe : IDuplexPipe
6471

6572
[GlobalSetup]
6673
public async ValueTask Setup()
74+
{
75+
var serverMessage = await GetMessageBytes(true, 4000);
76+
var clientMessage = await GetMessageBytes(false, 4000);
77+
78+
(_serverConnectionContext, _serverStream) = CreateContextAndStream(serverMessage);
79+
(_clientConnectionContext, _clientStream) = CreateContextAndStream(clientMessage);
80+
81+
_webSocketServer = WebSocket.CreateFromStream(_serverStream, true, null, TimeSpan.FromSeconds(30));
82+
_webSocketProtocolServer = new WebSocketProtocol(_serverConnectionContext, WebSocketProtocolType.Server);
83+
84+
_webSocketClient = WebSocket.CreateFromStream(_clientStream, false, null, TimeSpan.FromSeconds(30));
85+
_webSocketProtocolClient = new WebSocketProtocol(_clientConnectionContext, WebSocketProtocolType.Server);
86+
87+
_arrayBuffer = new ArraySegment<byte>(new byte[10000]);
88+
}
89+
90+
private async ValueTask<byte[]> GetMessageBytes(bool isMasked, long size)
6791
{
6892
var writer = new WebSocketFrameWriter();
6993
var pipe = new Pipe();
7094

71-
_message = new byte[4000];
72-
73-
var header = WebSocketHeader.CreateMasked(true, WebSocketOpcode.Binary, 4000);
74-
writer.WriteMessage(new WebSocketWriteFrame(header, new ReadOnlySequence<byte>(_message)), pipe.Writer);
95+
var header = new WebSocketHeader(true, WebSocketOpcode.Binary, isMasked, (ulong)size, isMasked ? WebSocketHeader.GenerateMaskingKey() : default);
96+
writer.WriteMessage(new WebSocketWriteFrame(header, new ReadOnlySequence<byte>(new byte[4000])), pipe.Writer);
7597

7698
await pipe.Writer.FlushAsync();
7799

78100
var result = await pipe.Reader.ReadAsync();
79-
_message = result.Buffer.ToArray();
101+
return result.Buffer.ToArray();
102+
}
80103

81-
var dummyReader = new DummyPipeReader { Result = new ReadResult(new ReadOnlySequence<byte>(_message), false, false) };
82-
var dummyDuplexPipe = new DummyDuplexPipe { DummyReader = dummyReader };
104+
private (DefaultConnectionContext context, MemoryStream stream) CreateContextAndStream(byte[] message)
105+
{
106+
var reader = new DummyPipeReader { Result = new ReadResult(new ReadOnlySequence<byte>(message), false, false) };
107+
var duplexPipe = new DummyDuplexPipe { DummyReader = reader };
83108

84-
_connectionContext = new DefaultConnectionContext { Transport = dummyDuplexPipe };
85-
_stream = new MemoryStream(_message);
109+
var stream = new MemoryStream(message);
110+
var context = new DefaultConnectionContext { Transport = duplexPipe };
86111

87-
_webSocket = WebSocket.CreateFromStream(_stream, true, null, TimeSpan.FromSeconds(30));
88-
_webSocketProtocol = new WebSocketProtocol(_connectionContext, WebSocketProtocolType.Server);
112+
return (context, stream);
113+
}
89114

90-
_arrayBuffer = new ArraySegment<byte>(new byte[10000]);
91-
_romBuffer = new ReadOnlyMemory<byte>(_message);
115+
[BenchmarkCategory("Masked"), Benchmark(Baseline = true)]
116+
public async ValueTask WebSocketRead()
117+
{
118+
_clientStream.Seek(0, SeekOrigin.Begin);
119+
120+
var endOfMessage = false;
121+
while (!endOfMessage)
122+
{
123+
var result = await _webSocketClient.ReceiveAsync(_arrayBuffer, CancellationToken.None);
124+
endOfMessage = result.EndOfMessage;
125+
}
92126
}
93127

94-
[Benchmark(Baseline = true)]
128+
[BenchmarkCategory("Unmasked"), Benchmark(Baseline = true)]
95129
public async ValueTask WebSocketReadMasked()
96130
{
97-
_stream.Seek(0, SeekOrigin.Begin);
131+
_serverStream.Seek(0, SeekOrigin.Begin);
98132

99133
var endOfMessage = false;
100134
while (!endOfMessage)
101135
{
102-
var result = await _webSocket.ReceiveAsync(_arrayBuffer, CancellationToken.None);
136+
var result = await _webSocketServer.ReceiveAsync(_arrayBuffer, CancellationToken.None);
103137
endOfMessage = result.EndOfMessage;
104138
}
105139
}
106140

107-
[Benchmark]
141+
[BenchmarkCategory("Masked"), Benchmark]
142+
public async ValueTask WebSocketProtocolRead()
143+
{
144+
var message = await _webSocketProtocolClient.ReadAsync();
145+
var data = await message.Reader.ReadAsync();
146+
message.Reader.AdvanceTo(data.Data.End);
147+
}
148+
149+
[BenchmarkCategory("Unmasked"), Benchmark]
108150
public async ValueTask WebSocketProtocolReadMasked()
109151
{
110-
var message = await _webSocketProtocol.ReadAsync();
152+
var message = await _webSocketProtocolServer.ReadAsync();
111153
var data = await message.Reader.ReadAsync();
112154
message.Reader.AdvanceTo(data.Data.End);
113155
}

0 commit comments

Comments
 (0)
Please sign in to comment.