@@ -42,49 +42,50 @@ export class ConnectionHandler {
42
42
private readonly websocket : WebSocket
43
43
private readonly registry : Registry
44
44
private readonly prometheus : Prometheus
45
+ private readonly logger : Logger
45
46
private readonly routes : Record < Topics , Methods >
46
- private readonly pingInterval = 10000
47
+ private readonly pingIntervalMs = 10000
47
48
48
49
private pingTimeout ?: NodeJS . Timeout
49
50
private subscriptionIds = new Map < string , void > ( )
50
51
51
- private logger = new Logger ( { namespace : [ 'unchained' , 'coinstacks' , 'common' , 'api' ] , level : process . env . LOG_LEVEL } )
52
-
53
- private constructor ( websocket : WebSocket , registry : Registry , prometheus : Prometheus ) {
52
+ private constructor ( websocket : WebSocket , registry : Registry , prometheus : Prometheus , logger : Logger ) {
54
53
this . clientId = v4 ( )
55
54
this . registry = registry
56
55
this . prometheus = prometheus
56
+ this . logger = logger . child ( { namespace : [ 'websocket' ] } )
57
57
this . routes = {
58
58
txs : {
59
59
subscribe : ( subscriptionId : string , data ?: TxsTopicData ) => this . handleSubscribeTxs ( subscriptionId , data ) ,
60
60
unsubscribe : ( subscriptionId : string , data ?: TxsTopicData ) => this . handleUnsubscribeTxs ( subscriptionId , data ) ,
61
61
} ,
62
62
}
63
63
64
- const interval = setInterval ( ( ) => {
65
- this . websocket . ping ( )
66
- } , this . pingInterval )
64
+ this . pingTimeout = undefined
65
+ this . prometheus . metrics . websocketCount . inc ( )
66
+ this . websocket = websocket
67
+ this . websocket . ping ( )
67
68
68
- this . heartbeat ( )
69
+ const pingInterval = setInterval ( ( ) => {
70
+ this . websocket . ping ( )
71
+ } , this . pingIntervalMs )
69
72
70
- this . websocket = websocket
71
73
this . websocket . onerror = ( error ) => {
72
74
this . logger . error ( { clientId : this . clientId , error, fn : 'ws.onerror' } , 'websocket error' )
73
- this . close ( interval )
75
+ this . close ( pingInterval )
74
76
}
75
77
this . websocket . onclose = ( { code, reason } ) => {
76
78
this . prometheus . metrics . websocketCount . dec ( )
77
79
this . logger . debug ( { clientId : this . clientId , code, reason, fn : 'ws.close' } , 'websocket closed' )
78
- this . close ( interval )
80
+ this . close ( pingInterval )
79
81
}
80
82
this . websocket . on ( 'pong' , ( ) => this . heartbeat ( ) )
81
83
this . websocket . on ( 'ping' , ( ) => this . websocket . pong ( ) )
82
84
this . websocket . onmessage = ( event ) => this . onMessage ( event )
83
85
}
84
86
85
- static start ( websocket : WebSocket , registry : Registry , prometheus : Prometheus ) : void {
86
- prometheus . metrics . websocketCount . inc ( )
87
- new ConnectionHandler ( websocket , registry , prometheus )
87
+ static start ( websocket : WebSocket , registry : Registry , prometheus : Prometheus , logger : Logger ) : void {
88
+ new ConnectionHandler ( websocket , registry , prometheus , logger )
88
89
}
89
90
90
91
private heartbeat ( ) : void {
@@ -93,9 +94,9 @@ export class ConnectionHandler {
93
94
}
94
95
95
96
this . pingTimeout = setTimeout ( ( ) => {
96
- this . logger . debug ( { fn : 'pingTimeout' } , 'heartbeat failed' )
97
+ this . logger . debug ( { clientId : this . clientId , fn : 'pingTimeout' } , 'heartbeat failed' )
97
98
this . websocket . terminate ( )
98
- } , this . pingInterval + 1000 )
99
+ } , this . pingIntervalMs + 1000 )
99
100
}
100
101
101
102
private sendError ( message : string , subscriptionId : string ) : void {
@@ -130,7 +131,7 @@ export class ConnectionHandler {
130
131
}
131
132
}
132
133
} catch ( err ) {
133
- this . logger . error ( err , { fn : 'onMessage' , event } , 'Error processing message' )
134
+ this . logger . error ( err , { clientId : this . clientId , fn : 'onMessage' , event } , 'Error processing message' )
134
135
}
135
136
}
136
137
0 commit comments