11import  COMMANDS  from  '../commands' ; 
2- import  RedisSocket ,  {  RedisSocketOptions  }  from  './socket' ; 
2+ import  RedisSocket ,  {  RedisSocketOptions ,   RedisTcpSocketOptions  }  from  './socket' ; 
33import  {  BasicAuth ,  CredentialsError ,  CredentialsProvider ,  StreamingCredentialsProvider ,  UnableToObtainNewCredentialsError ,  Disposable  }  from  '../authx' ; 
44import  RedisCommandsQueue ,  {  CommandOptions  }  from  './commands-queue' ; 
55import  {  EventEmitter  }  from  'node:events' ; 
@@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121import  SingleEntryCache  from  '../single-entry-cache' ; 
2222import  {  version  }  from  '../../package.json' 
2323import  EnterpriseMaintenanceManager ,  {  MaintenanceUpdate ,  MovingEndpointType  }  from  './enterprise-maintenance-manager' ; 
24+ import  {  OTelMetrics  }  from  '../opentelemetry/metrics' ; 
2425
2526export  interface  RedisClientOptions < 
2627  M  extends  RedisModules  =  RedisModules , 
@@ -1064,21 +1065,47 @@ export default class RedisClient<
10641065    args : ReadonlyArray < RedisArgument > , 
10651066    options ?: CommandOptions 
10661067  ) : Promise < T >  { 
1068+     const  recordOperation  =  OTelMetrics . createRecordOperationDuration ( args ,  { 
1069+       host : ( this . _self . #options. socket  as  RedisTcpSocketOptions ) ?. host  ||  "" , 
1070+       port :
1071+         ( 
1072+           this . _self . #options. socket  as  RedisTcpSocketOptions 
1073+         ) ?. port ?. toString ( )  ||  "" , 
1074+       db : this . _self . #selectedDB. toString ( ) , 
1075+     } ) ; 
1076+ 
10671077    if  ( ! this . _self . #socket. isOpen )  { 
1078+       recordOperation ( new  ClientClosedError ( ) ) ; 
10681079      return  Promise . reject ( new  ClientClosedError ( ) ) ; 
1069-     }  else  if  ( ! this . _self . #socket. isReady  &&  this . _self . #options. disableOfflineQueue )  { 
1080+     }  else  if  ( 
1081+       ! this . _self . #socket. isReady  && 
1082+       this . _self . #options. disableOfflineQueue 
1083+     )  { 
1084+       recordOperation ( new  ClientOfflineError ( ) ) ; 
10701085      return  Promise . reject ( new  ClientOfflineError ( ) ) ; 
10711086    } 
10721087
10731088    // Merge global options with provided options 
10741089    const  opts  =  { 
10751090      ...this . _self . _commandOptions , 
1076-       ...options 
1077-     } 
1091+       ...options , 
1092+     } ; 
10781093
10791094    const  promise  =  this . _self . #queue. addCommand < T > ( args ,  opts ) ; 
1095+     OTelMetrics . recordPendingRequests ( 1 ) ; 
1096+ 
1097+     const  trackedPromise  =  promise . then ( ( reply )  =>  { 
1098+       recordOperation ( ) ; 
1099+       return  reply ; 
1100+     } ) . catch ( ( err )  =>  { 
1101+       recordOperation ( err ) ; 
1102+       throw  err ; 
1103+     } ) . finally ( ( )  =>  { 
1104+       OTelMetrics . recordPendingRequests ( - 1 ) ; 
1105+     } ) ; 
1106+ 
10801107    this . _self . #scheduleWrite( ) ; 
1081-     return  promise ; 
1108+     return  trackedPromise ; 
10821109  } 
10831110
10841111  async  SELECT ( db : number ) : Promise < void >  { 
0 commit comments