66using System . Collections . Generic ;
77using System . IO ;
88using System . Threading ;
9+ using System . Threading . Tasks ;
910using System . Reflection ;
1011
1112namespace DBus
@@ -38,22 +39,26 @@ public class Connection
3839 Dictionary < uint , PendingCall > pendingCalls = new Dictionary < uint , PendingCall > ( ) ;
3940 Queue < Message > inbound = new Queue < Message > ( ) ;
4041 Dictionary < ObjectPath , BusObject > registeredObjects = new Dictionary < ObjectPath , BusObject > ( ) ;
42+ private readonly ReadMessageTask readMessageTask ;
4143
4244 public delegate void MonitorEventHandler ( Message msg ) ;
4345 public MonitorEventHandler Monitors ; // subscribe yourself to this list of observers if you want to get notified about each incoming message
4446
47+ private ManualResetEventSlim iterateEvent = MakeNewEventToNextIterate ( ) ;
48+ private readonly object iterateLocker = new object ( ) ;
49+
4550 protected Connection ( )
4651 {
47-
52+ readMessageTask = new ReadMessageTask ( this ) ;
4853 }
4954
50- internal Connection ( Transport transport )
55+ internal Connection ( Transport transport ) : this ( )
5156 {
5257 this . transport = transport ;
5358 transport . Connection = this ;
5459 }
5560
56- internal Connection ( string address )
61+ internal Connection ( string address ) : this ( )
5762 {
5863 OpenPrivate ( address ) ;
5964 Authenticate ( ) ;
@@ -183,11 +188,12 @@ internal uint GenerateSerial ()
183188
184189 internal Message SendWithReplyAndBlock ( Message msg , bool keepFDs )
185190 {
186- PendingCall pending = SendWithReply ( msg , keepFDs ) ;
187- return pending . Reply ;
191+ using ( PendingCall pending = SendWithPendingReply ( msg , keepFDs ) ) {
192+ return pending . Reply ;
193+ }
188194 }
189195
190- internal PendingCall SendWithReply ( Message msg , bool keepFDs )
196+ internal PendingCall SendWithPendingReply ( Message msg , bool keepFDs )
191197 {
192198 msg . ReplyExpected = true ;
193199
@@ -215,27 +221,28 @@ internal virtual uint Send (Message msg)
215221 return msg . Header . Serial ;
216222 }
217223
218- //temporary hack
219- internal void DispatchSignals ( )
224+ public void Iterate ( )
220225 {
221- lock ( inbound ) {
222- while ( inbound . Count != 0 ) {
223- Message msg = inbound . Dequeue ( ) ;
224- try {
225- HandleSignal ( msg ) ;
226- } finally {
227- msg . Dispose ( ) ;
228- }
226+ Iterate ( new CancellationToken ( false ) ) ;
227+ }
228+
229+ public void Iterate ( CancellationToken stopWaitToken )
230+ {
231+ if ( TryGetStoredSignalMessage ( out Message inboundMsg ) ) {
232+ try {
233+ HandleSignal ( inboundMsg ) ;
234+ } finally {
235+ inboundMsg . Dispose ( ) ;
229236 }
237+ } else {
238+ var msg = readMessageTask . MakeSureTaskRunAndWait ( stopWaitToken ) ;
239+ HandleMessage ( msg ) ;
230240 }
231241 }
232242
233- public void Iterate ( )
243+ private static ManualResetEventSlim MakeNewEventToNextIterate ( )
234244 {
235- Message msg = transport . ReadMessage ( ) ;
236-
237- HandleMessage ( msg ) ;
238- DispatchSignals ( ) ;
245+ return new ManualResetEventSlim ( true ) ;
239246 }
240247
241248 internal virtual void HandleMessage ( Message msg )
@@ -251,21 +258,19 @@ internal virtual void HandleMessage (Message msg)
251258 try {
252259
253260 //TODO: Restrict messages to Local ObjectPath?
254-
255261 {
256- object field_value = msg . Header [ FieldCode . ReplySerial ] ;
262+ object field_value = msg . Header [ FieldCode . ReplySerial ] ;
257263 if ( field_value != null ) {
258264 uint reply_serial = ( uint ) field_value ;
259- PendingCall pending ;
260265
261266 lock ( pendingCalls ) {
267+ PendingCall pending ;
262268 if ( pendingCalls . TryGetValue ( reply_serial , out pending ) ) {
263- if ( pendingCalls . Remove ( reply_serial ) ) {
264- pending . Reply = msg ;
265- if ( pending . KeepFDs )
266- cleanupFDs = false ; // caller is responsible for closing FDs
267- }
268-
269+ if ( ! pendingCalls . Remove ( reply_serial ) )
270+ return ;
271+ pending . Reply = msg ;
272+ if ( pending . KeepFDs )
273+ cleanupFDs = false ; // caller is responsible for closing FDs
269274 return ;
270275 }
271276 }
@@ -285,8 +290,7 @@ internal virtual void HandleMessage (Message msg)
285290 break ;
286291 case MessageType . Signal :
287292 //HandleSignal (msg);
288- lock ( inbound )
289- inbound . Enqueue ( msg ) ;
293+ StoreInboundSignalMessage ( msg ) ; //temporary hack
290294 cleanupFDs = false ; // FDs are closed after signal is handled
291295 break ;
292296 case MessageType . Error :
@@ -391,17 +395,19 @@ internal void HandleMethodCall (MessageContainer method_call)
391395 //this is messy and inefficient
392396 List < string > linkNodes = new List < string > ( ) ;
393397 int depth = method_call . Path . Decomposed . Length ;
394- foreach ( ObjectPath pth in registeredObjects . Keys ) {
395- if ( pth . Value == ( method_call . Path . Value ) ) {
396- ExportObject exo = ( ExportObject ) registeredObjects [ pth ] ;
397- exo . WriteIntrospect ( intro ) ;
398- } else {
399- for ( ObjectPath cur = pth ; cur != null ; cur = cur . Parent ) {
400- if ( cur . Value == method_call . Path . Value ) {
401- string linkNode = pth . Decomposed [ depth ] ;
402- if ( ! linkNodes . Contains ( linkNode ) ) {
403- intro . WriteNode ( linkNode ) ;
404- linkNodes . Add ( linkNode ) ;
398+ lock ( registeredObjects ) {
399+ foreach ( ObjectPath pth in registeredObjects . Keys ) {
400+ if ( pth . Value == ( method_call . Path . Value ) ) {
401+ ExportObject exo = ( ExportObject ) registeredObjects [ pth ] ;
402+ exo . WriteIntrospect ( intro ) ;
403+ } else {
404+ for ( ObjectPath cur = pth ; cur != null ; cur = cur . Parent ) {
405+ if ( cur . Value == method_call . Path . Value ) {
406+ string linkNode = pth . Decomposed [ depth ] ;
407+ if ( ! linkNodes . Contains ( linkNode ) ) {
408+ intro . WriteNode ( linkNode ) ;
409+ linkNodes . Add ( linkNode ) ;
410+ }
405411 }
406412 }
407413 }
@@ -415,12 +421,14 @@ internal void HandleMethodCall (MessageContainer method_call)
415421 return ;
416422 }
417423
418- BusObject bo ;
419- if ( registeredObjects . TryGetValue ( method_call . Path , out bo ) ) {
420- ExportObject eo = ( ExportObject ) bo ;
421- eo . HandleMethodCall ( method_call ) ;
422- } else {
423- MaybeSendUnknownMethodError ( method_call ) ;
424+ lock ( registeredObjects ) {
425+ BusObject bo ;
426+ if ( registeredObjects . TryGetValue ( method_call . Path , out bo ) ) {
427+ ExportObject eo = ( ExportObject ) bo ;
428+ eo . HandleMethodCall ( method_call ) ;
429+ } else {
430+ MaybeSendUnknownMethodError ( method_call ) ;
431+ }
424432 }
425433 }
426434
@@ -459,17 +467,19 @@ public void Register (ObjectPath path, object obj)
459467 eo . Registered = true ;
460468
461469 //TODO: implement some kind of tree data structure or internal object hierarchy. right now we are ignoring the name and putting all object paths in one namespace, which is bad
462- registeredObjects [ path ] = eo ;
470+ lock ( registeredObjects )
471+ registeredObjects [ path ] = eo ;
463472 }
464473
465474 public object Unregister ( ObjectPath path )
466475 {
467476 BusObject bo ;
468477
469- if ( ! registeredObjects . TryGetValue ( path , out bo ) )
470- throw new Exception ( "Cannot unregister " + path + " as it isn't registered" ) ;
471-
472- registeredObjects . Remove ( path ) ;
478+ lock ( registeredObjects ) {
479+ if ( ! registeredObjects . TryGetValue ( path , out bo ) )
480+ throw new Exception ( "Cannot unregister " + path + " as it isn't registered" ) ;
481+ registeredObjects . Remove ( path ) ;
482+ }
473483
474484 ExportObject eo = ( ExportObject ) bo ;
475485 eo . Registered = false ;
@@ -486,6 +496,25 @@ internal protected virtual void RemoveMatch (string rule)
486496 {
487497 }
488498
499+ private void StoreInboundSignalMessage ( Message msg )
500+ {
501+ lock ( inbound ) {
502+ inbound . Enqueue ( msg ) ;
503+ }
504+ }
505+
506+ private bool TryGetStoredSignalMessage ( out Message msg )
507+ {
508+ msg = null ;
509+ lock ( inbound ) {
510+ if ( inbound . Count != 0 ) {
511+ msg = inbound . Dequeue ( ) ;
512+ return true ;
513+ }
514+ }
515+ return false ;
516+ }
517+
489518 static UUID ReadMachineId ( string fname )
490519 {
491520 byte [ ] data = File . ReadAllBytes ( fname ) ;
@@ -494,5 +523,31 @@ static UUID ReadMachineId (string fname)
494523
495524 return UUID . Parse ( System . Text . Encoding . ASCII . GetString ( data , 0 , 32 ) ) ;
496525 }
526+
527+ private class ReadMessageTask
528+ {
529+ private readonly Connection ownerConnection ;
530+ private Task < Message > task = null ;
531+ private object taskLock = new object ( ) ;
532+
533+ public ReadMessageTask ( Connection connection )
534+ {
535+ ownerConnection = connection ;
536+ }
537+
538+ public Message MakeSureTaskRunAndWait ( CancellationToken stopWaitToken )
539+ {
540+ Task < Message > catchedTask = null ;
541+
542+ lock ( taskLock ) {
543+ if ( task == null || task . IsCompleted ) {
544+ task = Task < Message > . Run ( ( ) => ownerConnection . transport . ReadMessage ( ) ) ;
545+ }
546+ catchedTask = task ;
547+ }
548+ catchedTask . Wait ( stopWaitToken ) ;
549+ return catchedTask . Result ;
550+ }
551+ }
497552 }
498553}
0 commit comments