diff --git a/source/guillotine/providers/sequential.d b/source/guillotine/providers/sequential.d index 2494aab..d5d7e50 100644 --- a/source/guillotine/providers/sequential.d +++ b/source/guillotine/providers/sequential.d @@ -3,11 +3,14 @@ */ module guillotine.providers.sequential; -import libsnooze; import guillotine.provider; import std.container.slist; import std.range : walkLength; import core.sync.mutex : Mutex; +import core.sync.condition : Condition; +import core.sync.exception : SyncError; +import core.time : dur; + import core.thread : Thread; import guillotine.exceptions; @@ -22,7 +25,8 @@ version(unittest) */ public final class Sequential : Provider { - private Event event; + private Mutex mutex; + private Condition event; private SList!(Task) taskQueue; private Mutex taskQueueLock; private Thread runner; @@ -33,10 +37,10 @@ public final class Sequential : Provider */ this() { - this.event = new Event(); + this.mutex = new Mutex(); + this.event = new Condition(this.mutex); this.taskQueueLock = new Mutex(); this.runner = new Thread(&worker); - this.event.ensure(runner); } /** @@ -62,10 +66,8 @@ public final class Sequential : Provider // Unlock the queue taskQueueLock.unlock(); - // Wake up the runner (just using all to avoid a catch for exception - // ... which would occur if wait() hasn't been called atleast once - // ... in `runner` - event.notifyAll(); + // Wake up the runner + event.notify(); version(unittest) { @@ -92,8 +94,19 @@ public final class Sequential : Provider { try { + // Lock mutex + this.mutex.lock(); + // Sleep till awoken for an enqueue - event.wait(); + writeln("Worker wait..."); + bool b = event.wait(dur!("msecs")(10)); + writeln("Worker wait... [done] ", b); + + // TODO: Add syncerror checking? + + // Unlock mutex + this.mutex.unlock(); + // Check if we are running, if not, exit if(!running) @@ -127,15 +140,12 @@ public final class Sequential : Provider } } - catch(InterruptedException e) + catch(SyncError e) { + // TODO: What to do? // Handle by doing nothing, retry wait() continue; } - catch(SnoozeError e) - { - // TODO: Stop and handle this - } } } @@ -158,19 +168,10 @@ public final class Sequential : Provider // Set running flag to false this.running = false; - try - { - // Notify the sleeping worker to wake up - this.event.notify(runner); - } - catch(SnoozeError e) - { - throw new GuillotineException("Error notifying() sleeping worker in stop()"); - } + // Notify the sleeping worker to wake up + this.event.notify(); // Wait for the runner thread to fully exit this.runner.join(); - - // TODO: Destroy the libsnooze event here } } \ No newline at end of file