@@ -13,34 +13,37 @@ export class EventStream<T extends ServerEvent<unknown>>
1313 extends ReadableStream < T >
1414{
1515 constructor (
16- stream : ReadableStream < Uint8Array > ,
16+ responseBody : ReadableStream < Uint8Array > ,
1717 parse : ( x : ServerEvent < string > ) => IteratorResult < T , undefined > ,
1818 ) {
19- const reader = stream . getReader ( ) ;
19+ const upstream = responseBody . getReader ( ) ;
2020 let buffer : Uint8Array = new Uint8Array ( ) ;
2121 super ( {
22- async pull ( controller ) {
22+ async pull ( downstream ) {
2323 try {
2424 while ( true ) {
25- const r = await reader . read ( ) ;
26- if ( r . done ) return controller . close ( ) ;
27- buffer = concatBuffer ( buffer , r . value ) ;
28- for ( const { chunk, remainder } of chunks ( buffer ) ) {
29- buffer = remainder ;
30- const item = parseChunk ( chunk , parse ) ;
31- if ( item ?. value ) controller . enqueue ( item . value ) ;
32- if ( item ?. done ) {
33- await reader . cancel ( "done" ) ;
34- return controller . close ( ) ;
35- }
25+ const match = findBoundary ( buffer ) ;
26+ if ( ! match ) {
27+ const chunk = await upstream . read ( ) ;
28+ if ( chunk . done ) return downstream . close ( ) ;
29+ buffer = concatBuffer ( buffer , chunk . value ) ;
30+ continue ;
31+ }
32+ const message = buffer . slice ( 0 , match . index ) ;
33+ buffer = buffer . slice ( match . index + match . length ) ;
34+ const item = parseMessage ( message , parse ) ;
35+ if ( item ?. value ) return downstream . enqueue ( item . value ) ;
36+ if ( item ?. done ) {
37+ await upstream . cancel ( "done" ) ;
38+ return downstream . close ( ) ;
3639 }
3740 }
3841 } catch ( e ) {
39- await reader . cancel ( e ) ;
40- controller . error ( e ) ;
42+ downstream . error ( e ) ;
43+ await upstream . cancel ( e ) ;
4144 }
4245 } ,
43- cancel : reason => reader . cancel ( reason ) ,
46+ cancel : reason => upstream . cancel ( reason ) ,
4447 } ) ;
4548 }
4649
@@ -105,22 +108,7 @@ function findBoundary(
105108 return null ;
106109}
107110
108- function * chunks (
109- remainder : Uint8Array ,
110- ) : Generator < { chunk : Uint8Array ; remainder : Uint8Array } > {
111- while ( true ) {
112- const match = findBoundary ( remainder ) ;
113- if ( ! match ) {
114- yield { chunk : new Uint8Array ( ) , remainder } ;
115- return ;
116- }
117- const chunk = remainder . slice ( 0 , match . index ) ;
118- remainder = remainder . slice ( match . index + match . length ) ;
119- yield { chunk, remainder } ;
120- }
121- }
122-
123- function parseChunk < T extends ServerEvent < unknown > > (
111+ function parseMessage < T extends ServerEvent < unknown > > (
124112 chunk : Uint8Array ,
125113 parse : ( x : ServerEvent < string > ) => IteratorResult < T , undefined > ,
126114) {
0 commit comments