diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index faf8e00df..c897fb8ae 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -257,11 +257,11 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< { if (buffer.Length > 0) { - var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var nextPipelineFilter); + var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var currentPipelineFilter); - if (nextPipelineFilter != null) + if (currentPipelineFilter != null) { - _pipelineFilter = pipelineFilter = nextPipelineFilter; + pipelineFilter = currentPipelineFilter; } if (!needReadMore) @@ -299,7 +299,7 @@ protected void WriteEOFPackage() _packagePipe.WirteEOF(); } - private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipelineFilter pipelineFilter, IObjectPipe packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter nextPipelineFilter) + private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipelineFilter pipelineFilter, IObjectPipe packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter currentPipelineFilter) { consumed = buffer.Start; examined = buffer.End; @@ -312,20 +312,22 @@ private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipe while (true) { - var currentPipelineFilter = pipelineFilter; + var prevPipelineFilter = pipelineFilter; var filterSwitched = false; - var packageInfo = currentPipelineFilter.Filter(ref seqReader); + var packageInfo = pipelineFilter.Filter(ref seqReader); - nextPipelineFilter = currentPipelineFilter.NextFilter; + var nextFilter = pipelineFilter.NextFilter; - if (nextPipelineFilter != null) + if (nextFilter != null) { - nextPipelineFilter.Context = currentPipelineFilter.Context; // pass through the context - pipelineFilter = nextPipelineFilter; + nextFilter.Context = pipelineFilter.Context; // pass through the context + _pipelineFilter = pipelineFilter = nextFilter; filterSwitched = true; } + currentPipelineFilter = pipelineFilter; + var bytesConsumed = seqReader.Consumed; bytesConsumedTotal += bytesConsumed; @@ -353,14 +355,14 @@ private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipe consumed = buffer.GetPosition(bytesConsumedTotal); return true; } - + // we should reset the previous pipeline filter after switch - currentPipelineFilter.Reset(); + prevPipelineFilter.Reset(); } else { // reset the pipeline filter after we parse one full package - currentPipelineFilter.Reset(); + prevPipelineFilter.Reset(); packagePipe.Write(packageInfo); }