Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race conditions with DataContext #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void IUpdatable.DeleteResource(object targetResource)
// The only thing we can do is to try to remove the entity from all of them.
foreach (var pair in this.TableWrappers.Where(pair => pair.Key.Item1 == entityType))
{
pair.Value.RemoveEntity(targetResource);
pair.Value.Value.RemoveEntity(targetResource);
}
}

Expand All @@ -165,7 +165,7 @@ void IUpdatable.ClearChanges()
{
foreach (var pair in this.TableWrappers)
{
pair.Value.ClearModifications();
pair.Value.Value.ClearModifications();
}
}

Expand Down
106 changes: 103 additions & 3 deletions Sources/Linq2DynamoDb.DataContext.Tests/DataContextTests.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Amazon.DynamoDBv2;
using Amazon.DynamoDBv2.Model;
using Linq2DynamoDb.DataContext.Tests.Entities;

using NSubstitute;
using NUnit.Framework;

namespace Linq2DynamoDb.DataContext.Tests
Expand Down Expand Up @@ -51,5 +53,103 @@ public void DataContext()
var result = table.FirstOrDefault();
Assert.IsNotNull(result);
}

/// <remarks>Set to run 10 (magic number) times, as race conditions are hard to get to consistently fail.</remarks>
[Test, Repeat(10)]
public async Task GetTableDefinitionRaceConditionRunningSomewhatInParallelSynchronised()
{
//setup fake dynamodb interactions
var dynamoDbClient = Substitute.For<IAmazonDynamoDB>();
dynamoDbClient.DescribeTable(Arg.Any<DescribeTableRequest>()).Returns(new DescribeTableResponse
{
Table = new TableDescription { TableStatus = TableStatus.ACTIVE, KeySchema = new List<KeySchemaElement> { new KeySchemaElement("key", KeyType.HASH) }, AttributeDefinitions = new List<AttributeDefinition> { new AttributeDefinition("key", ScalarAttributeType.S) } }
});

//subject
var context = new DataContext(dynamoDbClient);
context.CreateTableIfNotExists(new CreateTableArgs<Book>(c => c.Name));

const int numberOfRacers = 10;

//exercise
var checkeredFlag = new SemaphoreSlim(0);
var racers = Enumerable.Range(0, numberOfRacers).Select(i => Task.Run(async () =>
{
await checkeredFlag.WaitAsync();
return context.GetTable<Book>();
}));
checkeredFlag.Release(numberOfRacers);
var tables = await Task.WhenAll(racers);
var actual = tables.OfType<ITableCudOperations>().Select(t => t.TableWrapper).ToList();

//assert
var expected = Enumerable.Repeat(actual.First(), numberOfRacers).ToList();
CollectionAssert.AreEqual(expected, actual);
}

/// <remarks>Set to run 10 (magic number) times, as race conditions are hard to get to consistently fail.</remarks>
[Test, Repeat(10)]
public async Task GetTableDefinitionRaceConditionRunningAsCloseToParallelSynchronised()
{
//setup fake dynamodb interactions
var dynamoDbClient = Substitute.For<IAmazonDynamoDB>();
dynamoDbClient.DescribeTable(Arg.Any<DescribeTableRequest>()).Returns(new DescribeTableResponse
{
Table = new TableDescription { TableStatus = TableStatus.ACTIVE, KeySchema = new List<KeySchemaElement> { new KeySchemaElement("key", KeyType.HASH) }, AttributeDefinitions = new List<AttributeDefinition> { new AttributeDefinition("key", ScalarAttributeType.S) } }
});

//subject
var context = new DataContext(dynamoDbClient);
context.CreateTableIfNotExists(new CreateTableArgs<Book>(c => c.Name));

const int numberOfRacers = 10;

//exercise
var flagman = new AsyncBarrier(numberOfRacers);
var racers = Enumerable.Range(0, numberOfRacers).Select(i => Task.Run(async () =>
{
await flagman.SignalAndWait();
return context.GetTable<Book>();
}));

var tables = await Task.WhenAll(racers);
var actual = tables.OfType<ITableCudOperations>().Select(t => t.TableWrapper).ToList();

//assert
var expected = Enumerable.Repeat(actual.First(), numberOfRacers).ToList();
CollectionAssert.AreEqual(expected, actual);
}
}

/// <summary>
/// Thanks to Stephen Toub :)
/// http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266932.aspx
/// </summary>
public class AsyncBarrier
{
private readonly int _participantCount;
private int _remainingParticipants;
private ConcurrentStack<TaskCompletionSource<bool>> m_waiters;

public AsyncBarrier(int participantCount)
{
if (participantCount <= 0) throw new ArgumentOutOfRangeException(nameof(participantCount));
_remainingParticipants = _participantCount = participantCount;
m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
}

public Task SignalAndWait()
{
var tcs = new TaskCompletionSource<bool>();
m_waiters.Push(tcs);
if (Interlocked.Decrement(ref _remainingParticipants) == 0)
{
_remainingParticipants = _participantCount;
var waiters = m_waiters;
m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
Parallel.ForEach(waiters, w => w.SetResult(true));
}
return tcs.Task;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\log4net.2.0.2\lib\net40-full\log4net.dll</HintPath>
</Reference>
<Reference Include="NSubstitute, Version=1.10.0.0, Culture=neutral, PublicKeyToken=92dd2e9066daa5ca, processorArchitecture=MSIL">
<HintPath>..\packages\NSubstitute.1.10.0.0\lib\net45\NSubstitute.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="nunit.framework, Version=2.6.3.13283, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
<HintPath>..\packages\NUnit.2.6.3\lib\nunit.framework.dll</HintPath>
</Reference>
Expand Down
1 change: 1 addition & 0 deletions Sources/Linq2DynamoDb.DataContext.Tests/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<package id="AWSSDK.DynamoDBv2" version="3.1.1.5" targetFramework="net45" />
<package id="EnyimMemcached" version="2.12" targetFramework="net45" />
<package id="log4net" version="2.0.2" targetFramework="net45" />
<package id="NSubstitute" version="1.10.0.0" targetFramework="net45" />
<package id="NUnit" version="2.6.3" targetFramework="net45" />
<package id="StackExchange.Redis" version="1.0.488" targetFramework="net45" />
</packages>
38 changes: 26 additions & 12 deletions Sources/Linq2DynamoDb.DataContext/DataContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Amazon;
Expand Down Expand Up @@ -124,7 +125,7 @@ public DataTable<TEntity> GetTable<TEntity>(object hashKeyValue, Func<ITableCach
var tableWrapper = TableWrappers.GetOrAdd
(
new Tuple<Type, object>(entityType, hashKeyValue),
t =>
t => new Lazy<TableDefinitionWrapper>(() =>
{
// if cache is not provided, then passing a fake implementation
var cacheImplementation =
Expand All @@ -144,8 +145,8 @@ public DataTable<TEntity> GetTable<TEntity>(object hashKeyValue, Func<ITableCach
);
wrapper.OnLog += this.OnLog;
return wrapper;
}
);
}, LazyThreadSafetyMode.ExecutionAndPublication)
).Value;

if (tableWrapper.TableDefinition != tableDefinition)
{
Expand All @@ -171,7 +172,7 @@ public DataTable<TEntity> GetTable<TEntity>(object hashKeyValue, Func<ITableCach
/// </summary>
public void SubmitChanges()
{
Task.WaitAll(this.TableWrappers.Values.Select(t => t.SubmitChangesAsync()).ToArray());
Task.WaitAll(this.TableWrappers.Values.Select(t => t.Value.SubmitChangesAsync()).ToArray());
}

#endregion
Expand All @@ -193,15 +194,15 @@ public void SubmitChanges()
/// <summary>
/// TableDefinitionWrapper instances for each entity type and HashKey value (if specified)
/// </summary>
protected internal readonly ConcurrentDictionary<Tuple<Type, object>, TableDefinitionWrapper> TableWrappers = new ConcurrentDictionary<Tuple<Type, object>, TableDefinitionWrapper>();
protected internal readonly ConcurrentDictionary<Tuple<Type, object>, Lazy<TableDefinitionWrapper>> TableWrappers = new ConcurrentDictionary<Tuple<Type, object>, Lazy<TableDefinitionWrapper>>();

/// <summary>
/// A fake cache implementation, which does no caching
/// </summary>
private static readonly ITableCache FakeCacheImplementation = new FakeTableCache();


private class CachedTableDefinitions : ConcurrentDictionary<string, Table>
private class CachedTableDefinitions : ConcurrentDictionary<string, Lazy<Table>>
{
/// <summary>
/// Instead of storing a reference to DynamoDBClient we're storing it's HashCode
Expand All @@ -219,6 +220,11 @@ public bool IsAssignedToThisClientInstance(IAmazonDynamoDB client)
}
}

/// <summary>
/// Used to lock while updating the <see cref="_cachedTableDefinitions" /> field
/// </summary>
private static readonly object CacheTableDefinitionsLock = new object();

/// <summary>
/// Table objects are cached here per DynamoDBClient instance.
/// In order not to reload table metadata between DataContext instance creations.
Expand Down Expand Up @@ -269,20 +275,28 @@ private Table GetTableDefinition(Type entityType)
throw new InvalidOperationException("An instance of AmazonDynamoDbClient was not provided. Use either a ctor, that takes AmazonDynamoDbClient instance or GetTable() method, that takes Table object");
}

var cachedTableDefinitions = _cachedTableDefinitions;
if
(
(cachedTableDefinitions == null)
(_cachedTableDefinitions == null)
||
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where it is replacing the shared static instance, used by all lightweight instances of all data context's.

(!cachedTableDefinitions.IsAssignedToThisClientInstance(this._client))
(!_cachedTableDefinitions.IsAssignedToThisClientInstance(this._client))
)
{
cachedTableDefinitions = new CachedTableDefinitions(this._client);
_cachedTableDefinitions = cachedTableDefinitions;
lock (CacheTableDefinitionsLock)
{
if ((_cachedTableDefinitions == null)
||
(!_cachedTableDefinitions.IsAssignedToThisClientInstance(this._client)))
{
_cachedTableDefinitions = new CachedTableDefinitions(this._client);
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ensuring safely swapping the shared static instance, used by all lightweight instances.
Utilising double-checked locking
https://msdn.microsoft.com/en-us/library/ff650316.aspx

}

var cachedTableDefinitions = _cachedTableDefinitions;

string tableName = this.GetTableNameForType(entityType);
return cachedTableDefinitions.GetOrAdd(tableName, name => Table.LoadTable(this._client, name));
return cachedTableDefinitions.GetOrAdd(tableName, name => new Lazy<Table>(() => Table.LoadTable(this._client, name), LazyThreadSafetyMode.ExecutionAndPublication)).Value;
}

#endregion
Expand Down
2 changes: 1 addition & 1 deletion Sources/Linq2DynamoDb.DataContext/DataContextExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static class DataContextExtensions
/// </summary>
public static async Task SubmitChangesAsync(this DataContext context)
{
await Task.WhenAll(context.TableWrappers.Values.Select(t => t.SubmitChangesAsync()));
await Task.WhenAll(context.TableWrappers.Values.Select(t => t.Value.SubmitChangesAsync()));
}

/// <summary>
Expand Down