@@ -5,6 +5,8 @@ import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line';
55import { ReadableStreamToAsyncIterable } from '../internal/shims' ;
66import { isAbortError } from '../internal/errors' ;
77import { encodeUTF8 } from '../internal/utils/bytes' ;
8+ import { loggerFor } from '../internal/utils/log' ;
9+ import type { Kernel } from '../client' ;
810
911type Bytes = string | ArrayBuffer | Uint8Array | null | undefined ;
1012
@@ -16,16 +18,24 @@ export type ServerSentEvent = {
1618
1719export class Stream < Item > implements AsyncIterable < Item > {
1820 controller : AbortController ;
21+ #client: Kernel | undefined ;
1922
2023 constructor (
2124 private iterator : ( ) => AsyncIterator < Item > ,
2225 controller : AbortController ,
26+ client ?: Kernel ,
2327 ) {
2428 this . controller = controller ;
29+ this . #client = client ;
2530 }
2631
27- static fromSSEResponse < Item > ( response : Response , controller : AbortController ) : Stream < Item > {
32+ static fromSSEResponse < Item > (
33+ response : Response ,
34+ controller : AbortController ,
35+ client ?: Kernel ,
36+ ) : Stream < Item > {
2837 let consumed = false ;
38+ const logger = client ? loggerFor ( client ) : console ;
2939
3040 async function * iterator ( ) : AsyncIterator < Item , any , undefined > {
3141 if ( consumed ) {
@@ -38,8 +48,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
3848 try {
3949 yield JSON . parse ( sse . data ) ;
4050 } catch ( e ) {
41- console . error ( `Could not parse message into JSON:` , sse . data ) ;
42- console . error ( `From chunk:` , sse . raw ) ;
51+ logger . error ( `Could not parse message into JSON:` , sse . data ) ;
52+ logger . error ( `From chunk:` , sse . raw ) ;
4353 throw e ;
4454 }
4555 }
@@ -54,14 +64,18 @@ export class Stream<Item> implements AsyncIterable<Item> {
5464 }
5565 }
5666
57- return new Stream ( iterator , controller ) ;
67+ return new Stream ( iterator , controller , client ) ;
5868 }
5969
6070 /**
6171 * Generates a Stream from a newline-separated ReadableStream
6272 * where each item is a JSON value.
6373 */
64- static fromReadableStream < Item > ( readableStream : ReadableStream , controller : AbortController ) : Stream < Item > {
74+ static fromReadableStream < Item > (
75+ readableStream : ReadableStream ,
76+ controller : AbortController ,
77+ client ?: Kernel ,
78+ ) : Stream < Item > {
6579 let consumed = false ;
6680
6781 async function * iterLines ( ) : AsyncGenerator < string , void , unknown > {
@@ -101,7 +115,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
101115 }
102116 }
103117
104- return new Stream ( iterator , controller ) ;
118+ return new Stream ( iterator , controller , client ) ;
105119 }
106120
107121 [ Symbol . asyncIterator ] ( ) : AsyncIterator < Item > {
@@ -131,8 +145,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
131145 } ;
132146
133147 return [
134- new Stream ( ( ) => teeIterator ( left ) , this . controller ) ,
135- new Stream ( ( ) => teeIterator ( right ) , this . controller ) ,
148+ new Stream ( ( ) => teeIterator ( left ) , this . controller , this . #client ) ,
149+ new Stream ( ( ) => teeIterator ( right ) , this . controller , this . #client ) ,
136150 ] ;
137151 }
138152
0 commit comments