Skip to content

Commit

Permalink
Merge pull request #10 from skomis-mm/dev
Browse files Browse the repository at this point in the history
Making use of System.Threading.Timer on netstandard1.2+
  • Loading branch information
nblumhardt authored Sep 29, 2016
2 parents b4f47dd + e9fdc2d commit d914f53
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ public abstract class PeriodicBatchingSink : ILogEventSink, IDisposable
readonly Queue<LogEvent> _waitingBatch = new Queue<LogEvent>();

readonly object _stateLock = new object();
#if WAITABLE_TIMER
readonly Timer _timer;
#else

readonly PortableTimer _timer;
#endif

bool _unloading;
bool _started;

Expand All @@ -60,12 +58,8 @@ protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period)
_batchSizeLimit = batchSizeLimit;
_queue = new BoundedConcurrentQueue<LogEvent>();
_status = new BatchedConnectionStatus(period);

#if WAITABLE_TIMER
_timer = new Timer(s => OnTick(), null, -1, -1);
#else

_timer = new PortableTimer(cancel => OnTick());
#endif
}

/// <summary>
Expand All @@ -89,14 +83,8 @@ void CloseAndFlush()

_unloading = true;
}

#if WAITABLE_TIMER
var wh = new ManualResetEvent(false);
if (_timer.Dispose(wh))
wh.WaitOne();
#else

_timer.Dispose();
#endif

OnTick();
}
Expand Down Expand Up @@ -210,12 +198,7 @@ void OnTick()

void SetTimer(TimeSpan interval)
{
#if WAITABLE_TIMER
_timer.Change(interval, Timeout.InfiniteTimeSpan);
#else
_timer.Start(interval);
#endif

}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#if !WAITABLE_TIMER

using Serilog.Debugging;
using System;
using System.Threading;
Expand All @@ -23,24 +21,27 @@ namespace Serilog.Sinks.PeriodicBatching
{
class PortableTimer : IDisposable
{
enum PortableTimerState
{
NotWaiting,
Waiting,
Active,
Disposed
}

readonly object _stateLock = new object();
PortableTimerState _state = PortableTimerState.NotWaiting;

readonly Action<CancellationToken> _onTick;
readonly CancellationTokenSource _cancel = new CancellationTokenSource();

#if THREADING_TIMER
readonly Timer _timer;
#endif

bool _running;
bool _disposed;

public PortableTimer(Action<CancellationToken> onTick)
{
if (onTick == null) throw new ArgumentNullException(nameof(onTick));

_onTick = onTick;

#if THREADING_TIMER
_timer = new Timer(_ => OnTick(), null, Timeout.Infinite, Timeout.Infinite);
#endif
}

public void Start(TimeSpan interval)
Expand All @@ -49,69 +50,90 @@ public void Start(TimeSpan interval)

lock (_stateLock)
{
if (_state == PortableTimerState.Disposed)
throw new ObjectDisposedException("PortableTimer");

// There's a little bit of raciness here, but it's needed to support the
// current API, which allows the tick handler to reenter and set the next interval.
if (_disposed)
throw new ObjectDisposedException(nameof(PortableTimer));

#if THREADING_TIMER
_timer.Change(interval, Timeout.InfiniteTimeSpan);
#else
Task.Delay(interval, _cancel.Token)
.ContinueWith(
_ => OnTick(),
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default)
.AsObserved();
#endif
}
}

if (_state == PortableTimerState.Waiting)
throw new InvalidOperationException("The timer is already set.");
private void OnTick()
{
try
{
lock (_stateLock)
{
if (_disposed)
{
return;
}

if (_cancel.IsCancellationRequested) return;
// There's a little bit of raciness here, but it's needed to support the
// current API, which allows the tick handler to reenter and set the next interval.

_state = PortableTimerState.Waiting;
}

Task.Delay(interval, _cancel.Token)
.ContinueWith(
_ =>
if (_running)
{
try
{
_state = PortableTimerState.Active;
Monitor.Wait(_stateLock);

if (!_cancel.Token.IsCancellationRequested)
{
_onTick(_cancel.Token);
}
}
catch (TaskCanceledException tcx)
{
SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx);
}
finally
if (_disposed)
{
lock (_stateLock)
_state = PortableTimerState.NotWaiting;
return;
}
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default)
.AsObserved();
}

_running = true;
}

if (!_cancel.Token.IsCancellationRequested)
{
_onTick(_cancel.Token);
}
}
catch (OperationCanceledException tcx)
{
SelfLog.WriteLine("The timer was canceled during invocation: {0}", tcx);
}
finally
{
lock (_stateLock)
{
_running = false;
Monitor.PulseAll(_stateLock);
}
}
}

public void Dispose()
{
_cancel.Cancel();

while (true)
lock (_stateLock)
{
lock (_stateLock)
if (_disposed)
{
if (_state == PortableTimerState.Disposed ||
_state == PortableTimerState.NotWaiting)
{
_state = PortableTimerState.Disposed;
return;
}
return;
}

while (_running)
{
Monitor.Wait(_stateLock);
}

// On the very old platforms, we've got no option but to spin here.
#if THREAD
Thread.Sleep(10);
#if THREADING_TIMER
_timer.Dispose();
#endif

_disposed = true;
}
}
}
Expand All @@ -123,5 +145,4 @@ public static async void AsObserved(this Task task)
await task.ConfigureAwait(false);
}
}
}
#endif
}
8 changes: 4 additions & 4 deletions src/Serilog.Sinks.PeriodicBatching/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
"frameworks": {
"net4.5": {
"buildOptions": {
"define": [ "WAITABLE_TIMER", "THREAD" ]
"define": [ "THREADING_TIMER" ]
}
},
"netstandard1.1": {
"dependencies": {
"System.Collections.Concurrent": "4.0.12"
}
},
"netstandard1.3": {
"netstandard1.2": {
"buildOptions": {
"define": [ "THREAD" ]
"define": [ "THREADING_TIMER" ]
},
"dependencies": {
"System.Collections.Concurrent": "4.0.12",
"System.Threading.Thread": "4.0.0"
"System.Threading.Timer": "4.0.1"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#if !WAITABLE_TIMER

using Serilog.Debugging;
using Serilog.Debugging;
using Serilog.Sinks.PeriodicBatching;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Serilog.Tests.Sinks.PeriodicBatching
Expand Down Expand Up @@ -33,7 +32,7 @@ public void WhenWaitingShouldCancel_OnDispose()

using (var timer = new PortableTimer(delegate { Thread.Sleep(50); wasCalled = true; }))
{
timer.Start(TimeSpan.FromMilliseconds(10));
timer.Start(TimeSpan.FromMilliseconds(20));
}

Thread.Sleep(100);
Expand All @@ -42,6 +41,28 @@ public void WhenWaitingShouldCancel_OnDispose()
Assert.False(writtenToSelflog, "message was written to SelfLog");
}

[Fact]
public void WhenActiveShouldCancel_OnDispose()
{
bool wasCalled = false;
bool writtenToSelflog = false;

SelfLog.Enable(_ => writtenToSelflog = true);

using (var timer = new PortableTimer(
token => {
wasCalled = true;
Task.Delay(TimeSpan.FromMilliseconds(50), token).GetAwaiter().GetResult();
}))
{
timer.Start(TimeSpan.FromMilliseconds(20));
Thread.Sleep(40);
}

Assert.True(wasCalled, "tick handler wasn't called");
Assert.True(writtenToSelflog, "message wasn't written to SelfLog");
}

[Fact]
public void WhenDisposedWillThrow_OnStart()
{
Expand All @@ -53,7 +74,51 @@ public void WhenDisposedWillThrow_OnStart()
Assert.False(wasCalled);
Assert.Throws<ObjectDisposedException>(() => timer.Start(TimeSpan.Zero));
}
}
}

#endif
[Fact]
public void WhenOverlapsShouldProcessOneAtTime_OnTick()
{
bool userHandlerOverlapped = false;

PortableTimer timer = null;
timer = new PortableTimer(
_ =>
{
if (Monitor.TryEnter(timer))
{
try
{
timer.Start(TimeSpan.FromMilliseconds(0));
Thread.Sleep(1);
}
finally
{
Monitor.Exit(timer);
}
}
else
{
userHandlerOverlapped = true;
}
});

timer.Start(TimeSpan.FromMilliseconds(1));
Thread.Sleep(50);
timer.Dispose();

Assert.False(userHandlerOverlapped);
}

[Fact]
public void CanBeDisposedFromMultipleThreads()
{
PortableTimer timer = null;
timer = new PortableTimer(_ => timer.Start(TimeSpan.FromMilliseconds(1)));

timer.Start(TimeSpan.Zero);
Thread.Sleep(50);

Parallel.For(0, Environment.ProcessorCount * 2, _ => timer.Dispose());
}
}
}
3 changes: 0 additions & 3 deletions test/Serilog.Sinks.PeriodicBatching.Tests/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
},

"net4.5.2": {
"buildOptions": {
"define": [ "WAITABLE_TIMER" ]
}
}
}
}

0 comments on commit d914f53

Please sign in to comment.