@@ -65,13 +65,15 @@ public void Open()
6565
6666 public async IAsyncEnumerable < NetMQMessage > SendMessageAsync (
6767 NetMQMessage message ,
68+ TimeSpan ? timeout ,
6869 int expectedResponses ,
6970 [ EnumeratorCancellation ] CancellationToken cancellationToken )
7071 {
7172 var channel = Channel . CreateUnbounded < NetMQMessage > ( ) ;
7273 await _requests . Writer . WriteAsync (
7374 new MessageRequest (
7475 message ,
76+ timeout ,
7577 expectedResponses ,
7678 channel ,
7779 cancellationToken ) ,
@@ -88,14 +90,19 @@ private async Task ProcessRuntime(CancellationToken ct)
8890 {
8991 using var dealer = new DealerSocket ( ) ;
9092 dealer . Options . DisableTimeWait = true ;
91- dealer . Connect ( await _peer . ResolveNetMQAddressAsync ( ) ) ;
93+ var address = await _peer . ResolveNetMQAddressAsync ( ) ;
94+ _logger . Debug ( "[NetMQChannel] Connecting {Address}" , address ) ;
95+ dealer . Connect ( address ) ;
9296 while ( ! ct . IsCancellationRequested )
9397 {
9498 MessageRequest req = await _requests . Reader . ReadAsync ( ct ) ;
9599 _lastUpdated = DateTimeOffset . UtcNow ;
96100 CancellationTokenSource linked =
97101 CancellationTokenSource . CreateLinkedTokenSource ( ct , req . CancellationToken ) ;
98- _logger . Debug ( "[NetMQChannel] Trying to send message {Message}" , req . Message ) ;
102+ _logger . Debug (
103+ "[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})" ,
104+ req . Message ,
105+ req . ExpectedResponses ) ;
99106 if ( ! dealer . TrySendMultipartMessage ( req . Message ) )
100107 {
101108 _logger . Debug (
@@ -109,9 +116,18 @@ private async Task ProcessRuntime(CancellationToken ct)
109116
110117 foreach ( var i in Enumerable . Range ( 0 , req . ExpectedResponses ) )
111118 {
112- NetMQMessage raw = await dealer . ReceiveMultipartMessageAsync (
113- cancellationToken : linked . Token
114- ) ;
119+ _logger . Debug (
120+ "[NetMQChannel] Waiting for replies... (#{Index})" , i ) ;
121+ var raw = new NetMQMessage ( ) ;
122+ if ( ! dealer . TryReceiveMultipartMessage (
123+ req . Timeout ?? TimeSpan . FromSeconds ( 1 ) ,
124+ ref raw ) )
125+ {
126+ break ;
127+ }
128+
129+ _logger . Debug (
130+ "[NetMQChannel] Successfully received replies #{Index}" , i ) ;
115131 _lastUpdated = DateTimeOffset . UtcNow ;
116132
117133 await req . Channel . Writer . WriteAsync ( raw , linked . Token ) ;
@@ -125,18 +141,22 @@ private readonly struct MessageRequest
125141 {
126142 public MessageRequest (
127143 NetMQMessage message ,
144+ TimeSpan ? timeout ,
128145 in int expectedResponses ,
129146 Channel < NetMQMessage > channel ,
130147 CancellationToken cancellationToken )
131148 {
132149 Message = message ;
150+ Timeout = timeout ;
133151 ExpectedResponses = expectedResponses ;
134152 Channel = channel ;
135153 CancellationToken = cancellationToken ;
136154 }
137155
138156 public NetMQMessage Message { get ; }
139157
158+ public TimeSpan ? Timeout { get ; }
159+
140160 public int ExpectedResponses { get ; }
141161
142162 public Channel < NetMQMessage > Channel { get ; }
0 commit comments