@@ -24,14 +24,12 @@ import {
2424 FrameHandler ,
2525 Multiplexer ,
2626 Outbound ,
27- serializeFrame ,
28- } from "rsocket-core" ;
29- import { Duplex } from "stream" ;
30-
31- export class WebsocketDuplexConnection
32- extends Deferred
33- implements DuplexConnection , Outbound
34- {
27+ serializeFrame
28+ } from 'rsocket-core' ;
29+ import { Duplex } from 'stream' ;
30+ import WebSocket from 'ws' ;
31+
32+ export class WebsocketDuplexConnection extends Deferred implements DuplexConnection , Outbound {
3533 readonly multiplexerDemultiplexer : Multiplexer & Demultiplexer & FrameHandler ;
3634
3735 constructor (
@@ -40,18 +38,16 @@ export class WebsocketDuplexConnection
4038 multiplexerDemultiplexerFactory : (
4139 frame : Frame ,
4240 outbound : Outbound & Closeable
43- ) => Multiplexer & Demultiplexer & FrameHandler
41+ ) => Multiplexer & Demultiplexer & FrameHandler ,
42+ private rawSocket : WebSocket . WebSocket
4443 ) {
4544 super ( ) ;
4645
47- websocketDuplex . on ( " close" , this . handleClosed ) ;
48- websocketDuplex . on ( " error" , this . handleError ) ;
49- websocketDuplex . on ( " data" , this . handleMessage ) ;
46+ websocketDuplex . on ( ' close' , this . handleClosed ) ;
47+ websocketDuplex . on ( ' error' , this . handleError ) ;
48+ websocketDuplex . on ( ' data' , this . handleMessage ) ;
5049
51- this . multiplexerDemultiplexer = multiplexerDemultiplexerFactory (
52- frame ,
53- this
54- ) ;
50+ this . multiplexerDemultiplexer = multiplexerDemultiplexerFactory ( frame , this ) ;
5551 }
5652
5753 get availability ( ) : number {
@@ -77,32 +73,40 @@ export class WebsocketDuplexConnection
7773 return ;
7874 }
7975
80- // if (__DEV__) {
81- // if (this._options.debug) {
82- // console.log(printFrame(frame));
83- // }
84- // }
85- const buffer =
86- /* this._options.lengthPrefixedFrames
87- ? serializeFrameWithLength(frame, this._encoders)
88- :*/ serializeFrame ( frame ) ;
89- // if (!this._socket) {
90- // throw new Error(
91- // "RSocketWebSocketClient: Cannot send frame, not connected."
92- // );
93- // }
94- this . websocketDuplex . write ( buffer ) ;
76+ try {
77+ // if (__DEV__) {
78+ // if (this._options.debug) {
79+ // console.log(printFrame(frame));
80+ // }
81+ // }
82+ const buffer =
83+ /* this._options.lengthPrefixedFrames
84+ ? serializeFrameWithLength(frame, this._encoders)
85+ :*/ serializeFrame ( frame ) ;
86+ // if (!this._socket) {
87+ // throw new Error(
88+ // "RSocketWebSocketClient: Cannot send frame, not connected."
89+ // );
90+ // }
91+
92+ // Work around for this issue
93+ // https://github.com/websockets/ws/issues/1515
94+ if ( this . rawSocket . readyState == this . rawSocket . CLOSING || this . rawSocket . readyState == this . rawSocket . CLOSED ) {
95+ this . close ( new Error ( 'WebSocket is closing' ) ) ;
96+ return ;
97+ }
98+
99+ this . websocketDuplex . write ( buffer ) ;
100+ } catch ( ex ) {
101+ this . close ( new Error ( ex . reason || `Could not write to WebSocket duplex connection: ${ ex } ` ) ) ;
102+ }
95103 }
96104
97- private handleClosed = ( e : CloseEvent ) : void => {
98- this . close (
99- new Error (
100- e . reason || "WebsocketDuplexConnection: Socket closed unexpectedly."
101- )
102- ) ;
105+ private handleClosed = ( e : WebSocket . CloseEvent ) : void => {
106+ this . close ( new Error ( e . reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.' ) ) ;
103107 } ;
104108
105- private handleError = ( e : ErrorEvent ) : void => {
109+ private handleError = ( e : WebSocket . ErrorEvent ) : void => {
106110 this . close ( e . error ) ;
107111 } ;
108112
@@ -125,23 +129,27 @@ export class WebsocketDuplexConnection
125129
126130 static create (
127131 socket : Duplex ,
128- connectionAcceptor : (
129- frame : Frame ,
130- connection : DuplexConnection
131- ) => Promise < void > ,
132+ connectionAcceptor : ( frame : Frame , connection : DuplexConnection ) => Promise < void > ,
132133 multiplexerDemultiplexerFactory : (
133134 frame : Frame ,
134135 outbound : Outbound & Closeable
135- ) => Multiplexer & Demultiplexer & FrameHandler
136+ ) => Multiplexer & Demultiplexer & FrameHandler ,
137+ rawSocket : WebSocket . WebSocket
136138 ) : void {
137139 // TODO: timeout on no data?
138- socket . once ( "data" , async ( buffer ) => {
139- const frame = deserializeFrame ( buffer ) ;
140- const connection = new WebsocketDuplexConnection (
141- socket ,
142- frame ,
143- multiplexerDemultiplexerFactory
144- ) ;
140+ socket . once ( 'data' , async ( buffer ) => {
141+ let frame : Frame | undefined = undefined ;
142+ try {
143+ frame = deserializeFrame ( buffer ) ;
144+ if ( ! frame ) {
145+ throw new Error ( `Unable to deserialize frame` ) ;
146+ }
147+ } catch ( ex ) {
148+ // The initial frame should always be parsable
149+ return socket . end ( ) ;
150+ }
151+
152+ const connection = new WebsocketDuplexConnection ( socket , frame , multiplexerDemultiplexerFactory , rawSocket ) ;
145153 if ( connection . done ) {
146154 return ;
147155 }
0 commit comments