Skip to content

Commit

Permalink
Sequential
Browse files Browse the repository at this point in the history
- Stripped out `libsnooze` and now using condtiion variables with periodic wakeups
  • Loading branch information
deavmi committed Oct 1, 2023
1 parent 202ea89 commit bffa29c
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions source/guillotine/providers/sequential.d
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
*/
module guillotine.providers.sequential;

import libsnooze;
import guillotine.provider;
import std.container.slist;
import std.range : walkLength;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import core.time : dur;

import core.thread : Thread;
import guillotine.exceptions;

Expand All @@ -22,7 +25,8 @@ version(unittest)
*/
public final class Sequential : Provider
{
private Event event;
private Mutex mutex;
private Condition event;
private SList!(Task) taskQueue;
private Mutex taskQueueLock;
private Thread runner;
Expand All @@ -33,10 +37,10 @@ public final class Sequential : Provider
*/
this()
{
this.event = new Event();
this.mutex = new Mutex();
this.event = new Condition(this.mutex);
this.taskQueueLock = new Mutex();
this.runner = new Thread(&worker);
this.event.ensure(runner);
}

/**
Expand All @@ -62,10 +66,8 @@ public final class Sequential : Provider
// Unlock the queue
taskQueueLock.unlock();

// Wake up the runner (just using all to avoid a catch for exception
// ... which would occur if wait() hasn't been called atleast once
// ... in `runner`
event.notifyAll();
// Wake up the runner
event.notify();

version(unittest)
{
Expand All @@ -92,8 +94,19 @@ public final class Sequential : Provider
{
try
{
// Lock mutex
this.mutex.lock();

// Sleep till awoken for an enqueue
event.wait();
writeln("Worker wait...");
bool b = event.wait(dur!("msecs")(10));
writeln("Worker wait... [done] ", b);

// TODO: Add syncerror checking?

// Unlock mutex
this.mutex.unlock();


// Check if we are running, if not, exit
if(!running)
Expand Down Expand Up @@ -127,15 +140,12 @@ public final class Sequential : Provider
}

}
catch(InterruptedException e)
catch(SyncError e)
{
// TODO: What to do?
// Handle by doing nothing, retry wait()
continue;
}
catch(SnoozeError e)
{
// TODO: Stop and handle this
}
}
}

Expand All @@ -158,19 +168,10 @@ public final class Sequential : Provider
// Set running flag to false
this.running = false;

try
{
// Notify the sleeping worker to wake up
this.event.notify(runner);
}
catch(SnoozeError e)
{
throw new GuillotineException("Error notifying() sleeping worker in stop()");
}
// Notify the sleeping worker to wake up
this.event.notify();

// Wait for the runner thread to fully exit
this.runner.join();

// TODO: Destroy the libsnooze event here
}
}

0 comments on commit bffa29c

Please sign in to comment.