From 5eb04099b8f1cf66639cfcb7162b81927be3ef0a Mon Sep 17 00:00:00 2001 From: howcheng Date: Wed, 10 Feb 2016 10:36:42 -0800 Subject: [PATCH 1/2] RedisTableCache --- .../Caching/EnyimIndexCreator.cs | 43 +- .../Caching/EnyimMemcachedClient.cs | 149 ++++ .../Caching/EnyimProjectionIndexCreator.cs | 43 +- .../Caching/EnyimTableCache.cs | 648 ++------------- .../Caching/ICacheClient.cs | 38 + .../Caching/IIndexCreator.cs | 1 + .../Caching/IndexCreator.cs | 74 ++ .../Caching/ProjectionIndexCreator.cs | 63 ++ .../Caching/RedisCacheClient.cs | 164 ++++ .../Caching/RedisTableCache.cs | 16 + .../Caching/TableCache.cs | 761 ++++++++++++++++++ .../Caching/TableLock.cs | 4 +- .../Linq2DynamoDb.DataContext.csproj | 16 +- .../Utils/Base64Serializer.cs | 41 + .../Linq2DynamoDb.DataContext/packages.config | 1 + 15 files changed, 1425 insertions(+), 637 deletions(-) create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/ICacheClient.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/IndexCreator.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/ProjectionIndexCreator.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/RedisCacheClient.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/RedisTableCache.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Caching/TableCache.cs create mode 100644 Sources/Linq2DynamoDb.DataContext/Utils/Base64Serializer.cs diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimIndexCreator.cs b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimIndexCreator.cs index 70415ca..99f0123 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimIndexCreator.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimIndexCreator.cs @@ -8,44 +8,41 @@ public partial class EnyimTableCache /// /// Implements the process of creating and filling the index /// - private class EnyimIndexCreator : IIndexCreator + private class EnyimIndexCreator : IndexCreator { - private readonly EnyimTableCache _parent; + private EnyimTableCache _tableCache + { + get { return (EnyimTableCache) _parent; } + } - private TableIndex _index; - private readonly string _indexKey; - private readonly string _indexKeyInCache; private ulong _indexVersionInCache; internal EnyimIndexCreator(EnyimTableCache parent, string indexKeyInCache, SearchConditions searchConditions) + : base(parent, indexKeyInCache, searchConditions) { - this._parent = parent; - this._index = new TableIndex(searchConditions); - this._indexKey = searchConditions.Key; - this._indexKeyInCache = indexKeyInCache; } - public bool StartCreatingIndex() + public override bool StartCreatingIndex() { // Marking the index as being rebuilt. // Note: we're using Set mode. This means, that if an index exists in cache, it will be // overwritten. That's OK, because if an index exists in cache, then in most cases we // will not be in this place (the data will be simply read from cache). - if (!this._parent._cacheClient.Store(StoreMode.Set, this._indexKeyInCache, this._index, this._parent._ttl)) + if (!_tableCache._cacheClient.SetValue(this._indexKeyInCache, this._index)) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + _tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } // (re)registering it in the list of indexes (it should be visible for update operations) - if (!this._parent.PutIndexToList(this._indexKey)) + if (!_tableCache.PutIndexToList(this._indexKey)) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + _tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } // remembering the index's current version - var casResult = this._parent._cacheClient.GetWithCas(this._indexKeyInCache); + var casResult = _tableCache._memcachedClient.GetWithCas(this._indexKeyInCache); if ( (casResult.StatusCode != 0) @@ -53,7 +50,7 @@ public bool StartCreatingIndex() (casResult.Result == null) ) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + _tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } @@ -64,8 +61,8 @@ public bool StartCreatingIndex() public void AddEntityToIndex(EntityKey entityKey, Document doc) { - string key = this._parent.GetEntityKeyInCache(entityKey); - if (key.Length > MaxKeyLength) + string key = _tableCache.GetEntityKeyInCache(entityKey); + if (key == null) { this._index = null; return; @@ -76,26 +73,26 @@ public void AddEntityToIndex(EntityKey entityKey, Document doc) // Putting the entity to cache, but only if it doesn't exist there. // That's because when loading from DynamoDb whe should never overwrite local updates. - this._parent._cacheClient.Store(StoreMode.Add, key, new CacheDocumentWrapper(doc), this._parent._ttl); + _tableCache._cacheClient.AddValue(key, new CacheDocumentWrapper(doc)); } public void Dispose() { if (this._index == null) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + _tableCache._cacheClient.Remove(this._indexKeyInCache); return; } this._index.IsBeingRebuilt = false; // saving the index to cache only if it's version didn't change since we started reading results from DynamoDb - var casResult = this._parent._cacheClient.Cas + var casResult = _tableCache._memcachedClient.Cas ( StoreMode.Replace, this._indexKeyInCache, - this._index, - this._parent._ttl, + this._index, + _tableCache._cacheClient.DefaultTimeToLive, this._indexVersionInCache ); diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs new file mode 100644 index 0000000..6482f52 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Enyim.Caching; +using Enyim.Caching.Memcached; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public class EnyimMemcachedClient : ICacheClient + { + MemcachedClient _cacheClient; + public TimeSpan DefaultTimeToLive { get; set; } + + public EnyimMemcachedClient(MemcachedClient client, TimeSpan? defaultTtl = null) + { + _cacheClient = client; + DefaultTimeToLive = defaultTtl ?? TimeSpan.FromMinutes(15); + } + + //public string this[string key] + //{ + // get + // { + // throw new NotImplementedException(); + // } + // set + // { + // throw new NotImplementedException(); + // } + //} + + //public string this[string key, DateTime expiration] + //{ + // set { throw new NotImplementedException(); } + //} + + //public string this[string key, TimeSpan? timeToLive] + //{ + // set { throw new NotImplementedException(); } + //} + + public bool Remove(string key) + { + return _cacheClient.Remove(key); + } + + public bool TryRemove(string key) + { + var removeResult = this._cacheClient.ExecuteRemove(key); + return (removeResult.InnerResult == null) || + (removeResult.InnerResult.Exception == null); + } + + public bool TryGetValue(string key, out T value) + { + value = default(T); + var result = _cacheClient.ExecuteGet(key); + if (result.Success) + value = result.Value; + + return result.Success; + } + + //public bool TryGetValue(string key, out T value, out TimeSpan? timeToLive) + //{ + // throw new NotImplementedException(); + //} + + //public bool TryGetValue(string key, out T value, out DateTime? expiration) + //{ + // throw new NotImplementedException(); + //} + + //public bool TryGetTimeToLive(string key, out TimeSpan? timetoLive) + //{ + // throw new NotImplementedException(); + //} + + public bool AddValue(string key, T value) + { + return _cacheClient.Store(StoreMode.Add, key, value, DefaultTimeToLive); + } + + public bool AddValue(string key, T value, TimeSpan? timeToLive) + { + return _cacheClient.Store(StoreMode.Add, key, value, timeToLive ?? DefaultTimeToLive); + } + + public bool AddValue(string key, T value, DateTime? expiration) + { + return StoreWithExpiration(StoreMode.Add, key, value, expiration); + } + + public bool SetValue(string key, T value) + { + return _cacheClient.Store(StoreMode.Set, key, value, DefaultTimeToLive); + } + + public bool SetValue(string key, T value, TimeSpan? timeToLive) + { + return _cacheClient.Store(StoreMode.Set, key, value, timeToLive ?? DefaultTimeToLive); + } + + public bool SetValue(string key, T value, DateTime? expiration) + { + return StoreWithExpiration(StoreMode.Set, key, value, expiration); + } + + public bool ReplaceValue(string key, T value) + { + return _cacheClient.Store(StoreMode.Replace, key, value, DefaultTimeToLive); + } + + public bool ReplaceValue(string key, T value, TimeSpan? timeToLive) + { + return _cacheClient.Store(StoreMode.Replace, key, value, timeToLive ?? DefaultTimeToLive); + } + + public bool ReplaceValue(string key, T value, DateTime? expiration) + { + return StoreWithExpiration(StoreMode.Replace, key, value, expiration); + } + + private bool StoreWithExpiration(StoreMode mode, string key, T value, DateTime? expiration) + { + if (expiration.HasValue) + { + if (expiration.Value.Kind != DateTimeKind.Utc) + expiration = expiration.Value.ToUniversalTime(); + } + else + { + expiration = DateTime.UtcNow + DefaultTimeToLive; + } + return _cacheClient.Store(mode, key, value, expiration.Value); + } + + //public bool SetTimeToLive(string key, TimeSpan? timetoLive) + //{ + // object value; + // if (!TryGetValue(key, out value)) + // return false; + + // return ReplaceValue(key, value, ) + //} + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimProjectionIndexCreator.cs b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimProjectionIndexCreator.cs index 20e506a..2a379d1 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimProjectionIndexCreator.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimProjectionIndexCreator.cs @@ -9,44 +9,41 @@ public partial class EnyimTableCache /// /// Implements the process of creating and filling the projection (readonly) index /// - private class EnyimProjectionIndexCreator : IIndexCreator + private class EnyimProjectionIndexCreator : ProjectionIndexCreator { - private readonly EnyimTableCache _parent; + private EnyimTableCache _tableCache + { + get { return (EnyimTableCache) _parent; } + } - private readonly TableProjectionIndex _index; - private readonly string _indexKey; - private readonly string _indexKeyInCache; private ulong _indexVersionInCache; internal EnyimProjectionIndexCreator(EnyimTableCache parent, string indexKey, string indexKeyInCache, SearchConditions searchConditions) + : base(parent, indexKey, indexKeyInCache, searchConditions) { - this._parent = parent; - this._index = new TableProjectionIndex(searchConditions); - this._indexKey = indexKey; - this._indexKeyInCache = indexKeyInCache; } - public bool StartCreatingIndex() + public override bool StartCreatingIndex() { // Marking the index as being rebuilt. // Note: we're using Set mode. This means, that if an index exists in cache, it will be // overwritten. That's OK, because if an index exists in cache, then in most cases we // will not be in this place (the data will be simply read from cache). - if (!this._parent._cacheClient.Store(StoreMode.Set, this._indexKeyInCache, this._index, this._parent._ttl)) + if (!this._tableCache._cacheClient.SetValue(this._indexKeyInCache, this._index)) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + this._tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } // (re)registering it in the list of indexes (it should be visible for update operations) - if (!this._parent.PutIndexToList(this._indexKey)) + if (!this._tableCache.PutIndexToList(this._indexKey)) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + this._tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } // remembering the index's current version - var casResult = this._parent._cacheClient.GetWithCas(this._indexKeyInCache); + var casResult = this._tableCache._memcachedClient.GetWithCas(this._indexKeyInCache); if ( (casResult.StatusCode != 0) @@ -54,16 +51,16 @@ public bool StartCreatingIndex() (casResult.Result == null) ) { - this._parent._cacheClient.Remove(this._indexKeyInCache); + this._tableCache._cacheClient.Remove(this._indexKeyInCache); return false; } this._indexVersionInCache = casResult.Cas; - this._parent.Log("Index ({0}) was marked as being rebuilt", (object)this._indexKey); + this._tableCache.Log("Index ({0}) was marked as being rebuilt", (object)this._indexKey); return true; } - public void AddEntityToIndex(EntityKey entityKey, Document doc) + public override void AddEntityToIndex(EntityKey entityKey, Document doc) { // when creating a projection index, the entity key should not be passed Debug.Assert(entityKey == null); @@ -72,27 +69,27 @@ public void AddEntityToIndex(EntityKey entityKey, Document doc) this._index.AddEntity(doc); } - public void Dispose() + public override void Dispose() { this._index.IsBeingRebuilt = false; // saving the index to cache only if it's version didn't change since we started reading results from DynamoDb - var casResult = this._parent._cacheClient.Cas + var casResult = this._tableCache._memcachedClient.Cas ( StoreMode.Replace, this._indexKeyInCache, this._index, - this._parent._ttl, + this._tableCache._cacheClient.DefaultTimeToLive, this._indexVersionInCache ); if (casResult.Result) { - this._parent.Log("Index ({0}) with {1} entities saved to cache", (object)this._indexKey, this._index.Entities.Length); + this._tableCache.Log("Index ({0}) with {1} entities saved to cache", (object)this._indexKey, this._index.Entities.Length); } else { - this._parent.Log("Index ({0}) wasn't saved to cache due to version conflict", (object)this._indexKey); + this._tableCache.Log("Index ({0}) wasn't saved to cache due to version conflict", (object)this._indexKey); } } } diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimTableCache.cs b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimTableCache.cs index 970cb80..e9b976a 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimTableCache.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimTableCache.cs @@ -14,234 +14,34 @@ namespace Linq2DynamoDb.DataContext.Caching /// /// Implements caching in MemcacheD/ElastiCache via Enyim caching client /// - public partial class EnyimTableCache : ITableCache + public partial class EnyimTableCache : TableCache { + private MemcachedClient _memcachedClient + { + get { return (MemcachedClient) _cacheClient; } + } public EnyimTableCache(MemcachedClient cacheClient, TimeSpan cacheItemsTtl) + : base(new EnyimMemcachedClient(cacheClient, cacheItemsTtl)) { - this._cacheClient = cacheClient; - this._ttl = cacheItemsTtl; } + public EnyimTableCache(EnyimMemcachedClient cacheClient) + : base(cacheClient) + { + } #region ITableCache implementation - public void Initialize(string tableName, Type tableEntityType, Primitive hashKeyValue) + public override void Initialize(string tableName, Type tableEntityType, Primitive hashKeyValue) { - if (this._tableEntityType != null) - { - throw new InvalidOperationException("An attempt to re-use an instance of EnyimTableCache for another : pair was made. This is not allowed"); - } - - this._tableEntityType = tableEntityType; + base.Initialize(tableName, tableEntityType, hashKeyValue); - this._tableName = tableName; - this._hashKeyValue = hashKeyValue == null ? string.Empty : hashKeyValue.AsString(); - - if (this.GetIndexListKeyInCache(this._hashKeyValue).Length > MaxKeyLength) + if (GetIndexListKeyInCache(_hashKeyValue).Length > MaxKeyLength) { throw new ArgumentException("The hash key value is too long for MemcacheD. Cannot use cache with that value."); } } - public Document GetSingleEntity(EntityKey entityKey) - { - string entityKeyInCache = this.GetEntityKeyInCache(entityKey); - if (entityKeyInCache.Length > MaxKeyLength) - { - return null; - } - - var wrapper = this._cacheClient.Get(entityKeyInCache); - if (wrapper == null) - { - this.OnMiss.FireSafely(); - return null; - } - - this.OnHit.FireSafely(); - return wrapper.Document; - } - - public IEnumerable GetEntities(SearchConditions searchConditions, IEnumerable projectedFields, string orderByFieldName, bool orderByDesc) - { - // first trying to find a full index - string indexKey = searchConditions.Key; - var index = this.TryLoadHealthyIndex(indexKey); - - Document[] result = null; - - // if no full index exist - if (index == null) - { - if (projectedFields != null) - { - // then there still might be a projection index - indexKey = this.GetProjectionIndexKey(searchConditions, projectedFields); - result = this.TryLoadProjectionIndexEntities(indexKey); - } - } - else - { - result = this.TryLoadIndexEntities(index, indexKey); - } - - // if we failed to load both full and projection index - if (result == null) - { - this.OnMiss.FireSafely(); - return null; - } - - this.OnHit.FireSafely(); - this.Log("Index ({0}) with {1} items successfully loaded from cache", indexKey, result.Length); - - if (string.IsNullOrEmpty(orderByFieldName)) - { - return result; - } - - // creating a comparer to sort the results - var comparer = PrimitiveComparer.GetComparer(this._tableEntityType, orderByFieldName); - - return orderByDesc - ? - result.OrderByDescending(doc => doc[orderByFieldName].AsPrimitive(), comparer) - : - result.OrderBy(doc => doc[orderByFieldName].AsPrimitive(), comparer) - ; - } - - public int? GetCount(SearchConditions searchConditions) - { - var index = this.TryLoadHealthyIndex(searchConditions.Key); - - if (index == null) - { - this.OnMiss.FireSafely(); - return null; - } - - this.OnHit.FireSafely(); - this.Log("Contents of index ({0}) successfully loaded from cache and number of items returned is {1}", searchConditions.Key, index.Count); - - return index.Count; - } - - public void PutSingleLoadedEntity(EntityKey entityKey, Document doc) - { - string entityKeyInCache = this.GetEntityKeyInCache(entityKey); - if (entityKeyInCache.Length > MaxKeyLength) - { - return; - } - - // Putting the entity to cache, but only if it doesn't exist there. - // That's because when loading from DynamoDb whe should never overwrite local updates. - this._cacheClient.Store(StoreMode.Add, entityKeyInCache, new CacheDocumentWrapper(doc), this._ttl); - } - - public void RemoveEntities(IEnumerable entities) - { - var entityKeysInCache = entities - .Select(this.GetEntityKeyInCache) - // extracting too long keys - .Where(ek => ek.Length <= MaxKeyLength); - - Parallel.ForEach(entityKeysInCache, key => this._cacheClient.Remove(key)); - } - - /// - /// Applies modifications to cached entities and indexes - /// - public void UpdateCacheAndIndexes(IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) - { - var allEntities = addedEntities - .Union(modifiedEntities) - .Union(removedEntities.ToDictionary(k => k, k => (Document)null)) - // converting keys - .Select(kv => new KeyValuePair(this.GetEntityKeyInCache(kv.Key), kv.Value)) - // extracting entities with too long keys - .Where(kv => kv.Key.Length <= MaxKeyLength) - .ToArray(); - - // modifying/removing all entities in parallel - var loopResult = Parallel.ForEach(allEntities, (entityPair, loopState) => - { - bool result = true; - if (entityPair.Value == null) - { - var removeResult = this._cacheClient.ExecuteRemove(entityPair.Key); - if - ( - (removeResult.InnerResult != null) - && - (removeResult.InnerResult.Exception != null) - ) - { - // this means, that item failed to be removed because of a communication error, not because the entity doesn't exist already - result = false; - } - } - else - { - result = this._cacheClient.Store(StoreMode.Set, entityPair.Key, new CacheDocumentWrapper(entityPair.Value), this._ttl); - } - - if (!result) - { - loopState.Stop(); - } - }); - - // All operations should succeed, otherwise removing all affected entities from cache. - // That's because in some cases partially succeded updates might become a problem. - if (!loopResult.IsCompleted) - { - this.Log("Failed to put updates for table {0} to cache", this._tableName); - - Parallel.ForEach(allEntities, entityPair => this._cacheClient.Remove(entityPair.Key)); - return; - } - - // now updating indexes - this.UpdateIndexes(this._hashKeyValue, addedEntities, modifiedEntities, removedEntities); - - // To support scenarios, when a context contains both a full table and a table filtered by a HashKey, - // by updating HashKey-filtered indexes we also should update indexes of the full table. - // And vice versa. - - if (string.IsNullOrEmpty(this._hashKeyValue)) - { - // Trying to update lists of indexes with predefined HashKey values - var affectedHashKeys = addedEntities - .Where(kv => kv.Key.RangeKey != null) - .Select(kv => kv.Key.HashKey.AsString()) - .Union - ( - modifiedEntities - .Where(kv => kv.Key.RangeKey != null) - .Select(kv => kv.Key.HashKey.AsString()) - ) - .Union - ( - removedEntities - .Where(kv => kv.RangeKey != null) - .Select(kv => kv.HashKey.AsString()) - ) - .Distinct(); - - foreach (var hashKeyValue in affectedHashKeys) - { - this.UpdateIndexes(hashKeyValue, addedEntities, modifiedEntities, removedEntities); - } - } - else - { - // updating the full table indexes as well - this.UpdateIndexes(string.Empty, addedEntities, modifiedEntities, removedEntities); - } - } - - public IIndexCreator StartCreatingIndex(SearchConditions searchConditions) + public override IIndexCreator StartCreatingIndex(SearchConditions searchConditions) { string indexKeyInCache = this.GetIndexKeyInCache(searchConditions.Key, this._hashKeyValue); if (indexKeyInCache.Length > MaxKeyLength) @@ -250,18 +50,10 @@ public IIndexCreator StartCreatingIndex(SearchConditions searchConditions) return null; } - var creator = new EnyimIndexCreator(this, indexKeyInCache, searchConditions); - - if (!creator.StartCreatingIndex()) - { - this.Log("Failed to start creating index ({0})", searchConditions.Key); - return null; - } - - return creator; + return base.StartCreatingIndex(indexKeyInCache, searchConditions); } - public IIndexCreator StartCreatingProjectionIndex(SearchConditions searchConditions, IList projectedFields) + public override IIndexCreator StartCreatingProjectionIndex(SearchConditions searchConditions, IList projectedFields) { string indexKey = this.GetProjectionIndexKey(searchConditions, projectedFields); string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); @@ -271,254 +63,61 @@ public IIndexCreator StartCreatingProjectionIndex(SearchConditions searchConditi return null; } - var creator = new EnyimProjectionIndexCreator(this, indexKey, indexKeyInCache, searchConditions); - - if (!creator.StartCreatingIndex()) - { - this.Log("Failed to start creating projection index ({0})", indexKey); - return null; - } - - return creator; - } - - /// - /// Acquires a table-wide named lock and returns a disposable object, that represents it - /// - public IDisposable AcquireTableLock(string lockKey, TimeSpan lockTimeout) - { - return new TableLock(this, lockKey, lockTimeout); - } - - public event Action OnHit; - public event Action OnMiss; - public event Action OnLog; - - #endregion - - #region Public Methods - - /// - /// Acquires a named lock around the table by storing a random value in cache - /// - internal void LockTable(string lockKey, TimeSpan lockTimeout) - { - if (this._lockIds.ContainsKey(lockKey)) - { - throw new NotSupportedException("Recursive locks are not supported. Or maybe you're trying to use EnyimTableCache object from multiple threads?"); - } - - string cacheLockKey = this.GetLockKeyInCache(this._hashKeyValue, lockKey); - int cacheLockId = Rnd.Next(); - - var timeStart = DateTime.Now; - while (true) - { - if (DateTime.Now - timeStart > lockTimeout) - { - break; - } - - // Trying to create a new value in cache - if (this._cacheClient.Store(StoreMode.Add, cacheLockKey, cacheLockId)) - { - this._lockIds[lockKey] = cacheLockId; - return; - } - - Thread.Sleep(10); - } - - // If we failed to acquire a lock within CacheLockTimeoutInSeconds - // (this means, that another process crached), then we should forcibly acquire it - - this.Log("Forcibly acquiring the table lock object {0} after {1} ms of waiting", lockKey, lockTimeout.TotalMilliseconds); - - this._cacheClient.Store(StoreMode.Set, cacheLockKey, cacheLockId); - this._lockIds[lockKey] = cacheLockId; - } - - /// - /// Releases a named lock around the table - /// - internal void UnlockTable(string lockKey) - { - int lockId; - if (!this._lockIds.TryRemove(lockKey, out lockId)) - { - throw new InvalidOperationException - ( - string.Format("The table lock {0} wasn't acquired, so it cannot be released. Check your code!", lockKey) - ); - } - - string cacheLockKey = this.GetLockKeyInCache(this._hashKeyValue, lockKey); - var cacheLockId = this._cacheClient.Get(cacheLockKey); - if (cacheLockId == null) - { - // The cache miss might happen here, if a cache server crashed. - // In this case we just silently return. - this.Log("The table lock object {0} is missing in cache, but we don't care about that too much (probably, the cache node was restarted)", lockKey); - return; - } - - if (((int)cacheLockId) != lockId) - { - // This means, that another process has forcibly replaced our lock. - throw new InvalidOperationException - ( - string.Format("The table lock {0} was forcibly acquired by another process", lockKey) - ); - } - - this._cacheClient.Remove(cacheLockKey); + return base.StartCreatingProjectionIndex(indexKey, indexKeyInCache, searchConditions); } - #endregion #region Private Properties - private readonly MemcachedClient _cacheClient; - - /// - /// The time-to-live for all entities and indexes stored in cache - /// - private readonly TimeSpan _ttl; - /// /// Entities and indexes with key longer than this limit are not saved to cache /// private const int MaxKeyLength = 250; - /// - /// Limit to the number of indexes - /// TODO: estimate this number more precisely - /// - private const int MaxNumberOfIndexes = 100; - /// /// The number of times to try an optimistic update operation /// private const int MaxUpdateAttempts = 10; - private Type _tableEntityType; - - private string _tableName; - private string _hashKeyValue; - - /// - /// Here all lock keys and their IDs are stored, for debugging purposes - /// - private readonly ConcurrentDictionary _lockIds = new ConcurrentDictionary(); - - private static readonly Random Rnd = new Random(DateTime.Now.Millisecond); - #endregion #region Private Methods - - private const string ProjectionIndexKeyPrefix = "[proj]"; - - /// - /// Composes a key for a projection index - /// - private string GetProjectionIndexKey(SearchConditions searchConditions, IEnumerable projectedFields) - { - return ProjectionIndexKeyPrefix + projectedFields.Aggregate((i, s) => i + "," + s) + "; " + searchConditions.Key; - } - - /// - /// Checks if the provided key is a projection index's key - /// - private bool IsProjectionIndex(string indexKey) - { - return indexKey.StartsWith(ProjectionIndexKeyPrefix); - } - - /// - /// Gets a cache key for an entity - /// - private string GetEntityKeyInCache(EntityKey entityKey) - { - // entities will always be identified in cache by their key prefixed by table name. - // entityKey might contain spaces, which are not allowed for MemcacheD keys. This is why ToBase64() is used. - return (this._tableName + ":" + entityKey).ToBase64(); - } - - /// - /// Gets a cache key for an index - /// - private string GetIndexKeyInCache(string indexKey, string hashKeyValue) - { - // indexKey might contain spaces, which are not allowed for MemcacheD keys - return (this._tableName + hashKeyValue + indexKey).ToBase64(); - } - - /// - /// Gets a cache key for the list of all indexes - /// - private string GetIndexListKeyInCache(string hashKeyValue) - { - return (this._tableName + hashKeyValue + ":indexes").ToBase64(); - } - - /// - /// Gets a cache key for a table-wide lock - /// - private string GetLockKeyInCache(string lockKey, string hashKeyValue) - { - return (this._tableName + hashKeyValue + lockKey).ToBase64(); - } - - /// - /// Adds matching entities and removes removed entities from indexes - /// - private void UpdateIndexes(string hashKeyValue, IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) - { - string indexListKeyInCache = this.GetIndexListKeyInCache(hashKeyValue); - - var indexes = this._cacheClient.Get(indexListKeyInCache); - if (indexes == null) - { - this._cacheClient.Remove(indexListKeyInCache); - return; - } - - // storing indexes, that fail to be updated - var indexesToBeRemoved = new ConcurrentDictionary(); - - Parallel.ForEach(indexes, indexKey => - { - string indexKeyInCache = this.GetIndexKeyInCache(indexKey, hashKeyValue); - - bool indexUpdateSucceeded = - this.IsProjectionIndex(indexKey) - ? - this.UpdateProjectionIndex(indexKey, indexKeyInCache, addedEntities, modifiedEntities, removedEntities) - : - this.UpdateIndex(indexKey, indexKeyInCache, addedEntities, removedEntities); - - if (!indexUpdateSucceeded) - { - indexesToBeRemoved[indexKey] = indexKeyInCache; - } - }); - - // removing bad indexes - if (indexesToBeRemoved.Count > 0) - { - this.RemoveIndexesFromList(indexesToBeRemoved, indexListKeyInCache); - } - } - - /// + protected override string GetEntityKeyInCache(EntityKey entityKey) + { + string entityKeyInCache = base.GetEntityKeyInCache(entityKey); + if (entityKeyInCache.Length > MaxKeyLength) + { + return null; + } + return entityKeyInCache; + } + protected override string[] GetEntityKeysInCache(IEnumerable entities) + { + var entityKeysInCache = entities + .Select(this.GetEntityKeyInCache) + // extracting too long keys + .Where(ek => ek.Length <= MaxKeyLength) + .ToArray(); + return entityKeysInCache; + } + protected override KeyValuePair[] CombineEntityDictionaries(params IDictionary[] entities) + { + var allEntities = base.CombineEntityDictionaries(entities); + allEntities = allEntities + // extracting entities with too long keys + .Where(kv => kv.Key.Length <= MaxKeyLength) + .ToArray(); + return allEntities; + } + + /// /// Adds matching entities and removes removed entities from an index /// - private bool UpdateIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, ICollection removedEntities) + protected override bool UpdateIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, ICollection removedEntities) { for (int i = 0; i < MaxUpdateAttempts; i++) { - var indexGetResult = this._cacheClient.GetWithCas(indexKeyInCache); + var indexGetResult = _memcachedClient.GetWithCas(indexKeyInCache); var index = indexGetResult.Result; if (index == null) { @@ -561,7 +160,7 @@ var entityPair in addedEntities.Where return true; } - if (this._cacheClient.Cas(StoreMode.Set, indexKeyInCache, index, this._ttl, indexGetResult.Cas).Result) + if (_memcachedClient.Cas(StoreMode.Set, indexKeyInCache, index, _cacheClient.DefaultTimeToLive, indexGetResult.Cas).Result) { this.Log("Index ({0}) updated. {1} entities added, {2} entities removed", indexKey, addedEntities.Count, removedEntities.Count); return true; @@ -573,7 +172,7 @@ var entityPair in addedEntities.Where /// /// Checks if a projection index should be dropped because of some added/modified/removed entities /// - private bool UpdateProjectionIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) + protected override bool UpdateProjectionIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) { // if some entities were modified or removed if @@ -588,7 +187,7 @@ private bool UpdateProjectionIndex(string indexKey, string indexKeyInCache, IDic return false; } - var indexGetResult = this._cacheClient.GetWithCas(indexKeyInCache); + var indexGetResult = _memcachedClient.GetWithCas(indexKeyInCache); var index = indexGetResult.Result; if (index == null) { @@ -620,14 +219,14 @@ private bool UpdateProjectionIndex(string indexKey, string indexKeyInCache, IDic return true; } - private bool PutIndexToList(string indexKey) + protected override bool PutIndexToList(string indexKey) { string indexListKeyInCache = this.GetIndexListKeyInCache(this._hashKeyValue); // trying multiple times for (int i = 0; i < MaxUpdateAttempts; i++) { - var indexesGetResult = this._cacheClient.GetWithCas(indexListKeyInCache); + var indexesGetResult = _memcachedClient.GetWithCas(indexListKeyInCache); if (indexesGetResult.StatusCode != 0) { @@ -649,7 +248,7 @@ private bool PutIndexToList(string indexKey) this.Log("An old index ({0}) was removed from cache because the number of supported indexes ({1}) is exceeded", poppedIndexKey, MaxNumberOfIndexes); } - if (this._cacheClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, this._ttl, indexesGetResult.Cas).Result) + if (_memcachedClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, _cacheClient.DefaultTimeToLive, indexesGetResult.Cas).Result) { return true; } @@ -659,7 +258,7 @@ private bool PutIndexToList(string indexKey) return false; } - private void RemoveIndexFromList(string indexKey, string indexKeyInCache, string indexListKeyInCache) + protected override void RemoveIndexFromList(string indexKey, string indexKeyInCache, string indexListKeyInCache) { // always removing the index itself this._cacheClient.Remove(indexKeyInCache); @@ -667,7 +266,7 @@ private void RemoveIndexFromList(string indexKey, string indexKeyInCache, string // trying multiple times for (int i = 0; i < MaxUpdateAttempts; i++) { - var indexesGetResult = this._cacheClient.GetWithCas(indexListKeyInCache); + var indexesGetResult = _memcachedClient.GetWithCas(indexListKeyInCache); if (indexesGetResult.StatusCode != 0) { @@ -691,7 +290,7 @@ private void RemoveIndexFromList(string indexKey, string indexKeyInCache, string indexes.Remove(indexKey); - var casResult = this._cacheClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, this._ttl, indexesGetResult.Cas); + var casResult = _memcachedClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, _cacheClient.DefaultTimeToLive, indexesGetResult.Cas); if (casResult.Result) { return; @@ -702,7 +301,7 @@ private void RemoveIndexFromList(string indexKey, string indexKeyInCache, string this._cacheClient.Remove(indexListKeyInCache); } - private void RemoveIndexesFromList(IDictionary indexKeys, string indexListKeyInCache) + protected override void RemoveIndexesFromList(IDictionary indexKeys, string indexListKeyInCache) { // always removing indexes themselves Parallel.ForEach(indexKeys.Values, indexKeyInCache => this._cacheClient.Remove(indexKeyInCache)); @@ -711,7 +310,7 @@ private void RemoveIndexesFromList(IDictionary indexKeys, string //TODO: implement a helper for multiple attempts for (int i = 0; i < MaxUpdateAttempts; i++) { - var indexesGetResult = this._cacheClient.GetWithCas(indexListKeyInCache); + var indexesGetResult = _memcachedClient.GetWithCas(indexListKeyInCache); if (indexesGetResult.StatusCode != 0) { @@ -743,7 +342,7 @@ private void RemoveIndexesFromList(IDictionary indexKeys, string return; } - var casResult = this._cacheClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, this._ttl, indexesGetResult.Cas); + var casResult = _memcachedClient.Cas(StoreMode.Set, indexListKeyInCache, indexes, _cacheClient.DefaultTimeToLive, indexesGetResult.Cas); if (casResult.Result) { return; @@ -753,133 +352,6 @@ private void RemoveIndexesFromList(IDictionary indexKeys, string this.Log("Failed to remove {0} indexes from list after {1} attempts. Removing the whole list.", indexKeys.Count, MaxUpdateAttempts); this._cacheClient.Remove(indexListKeyInCache); } - - /// - /// Loads the index by it's key, ensuring everything - /// - private HashSet TryLoadHealthyIndex(string indexKey) - { - string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); - if (indexKeyInCache.Length > MaxKeyLength) - { - return null; - } - - // first trying to get the index from cache - var index = this._cacheClient.Get(indexKeyInCache); - if (index == null) - { - return null; - } - - // Checking, that index is mentioned in the list of indexes. - // Only indexes from list are updated with local updates. - if (!this.DoesIndexExistInTheListOfIndexes(indexKey)) - { - this._cacheClient.Remove(indexKeyInCache); - return null; - } - - // If index is currently being filled with data from DynamoDb, then we can't use it yet - if (index.IsBeingRebuilt) - { - return null; - } - - return index.Index; - } - - /// - /// Loads a projection index by it's key - /// - private Document[] TryLoadProjectionIndexEntities(string indexKey) - { - string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); - if (indexKeyInCache.Length > MaxKeyLength) - { - return null; - } - - // first trying to get the index from cache - var index = this._cacheClient.Get(indexKeyInCache); - if (index == null) - { - return null; - } - - // Checking, that index is mentioned in the list of indexes. - // Only indexes from list are updated with local updates. - if (!this.DoesIndexExistInTheListOfIndexes(indexKey)) - { - this._cacheClient.Remove(indexKeyInCache); - return null; - } - - // If index is currently being filled with data from DynamoDb, then we can't use it yet - if (index.IsBeingRebuilt) - { - return null; - } - - return index.Entities; - } - - private Document[] TryLoadIndexEntities(HashSet entityKeys, string indexKey) - { - var result = new Document[entityKeys.Count]; - - // now we have to succeed loading all entities from cache - var loopResult = Parallel.ForEach(entityKeys, (entityKey, loopState, i) => - { - var wrapper = this._cacheClient.Get(this.GetEntityKeyInCache(entityKey)); - if (wrapper == null) - { - loopState.Stop(); - return; - } - - result[i] = wrapper.Document; - }); - - if (loopResult.IsCompleted) - { - return result; - } - - this.Log("Failed to get contents of index ({0}) from cache", indexKey); - - // the index is not usable any more, we'd better delete it - this.RemoveIndexFromList - ( - indexKey, - this.GetIndexKeyInCache(indexKey, this._hashKeyValue), - this.GetIndexListKeyInCache(this._hashKeyValue) - ); - return null; - } - - private bool DoesIndexExistInTheListOfIndexes(string indexKey) - { - var indexList = this._cacheClient.Get(this.GetIndexListKeyInCache(this._hashKeyValue)); - return ((indexList != null) && (indexList.Contains(indexKey))); - } - - internal void Log(string format, params object[] args) - { - var handler = this.OnLog; - if (handler == null) - { - return; - } - string tableName = this._tableName; - if (!string.IsNullOrEmpty(this._hashKeyValue)) - { - tableName += ":" + this._hashKeyValue; - } - - handler("EnyimTableCache(" + tableName + ") : " + string.Format(format, args)); - } - #endregion } } diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/ICacheClient.cs b/Sources/Linq2DynamoDb.DataContext/Caching/ICacheClient.cs new file mode 100644 index 0000000..f4bb1e4 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/ICacheClient.cs @@ -0,0 +1,38 @@ +using System; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public interface ICacheClient + { + TimeSpan DefaultTimeToLive { get; set; } + //TValue this[string key] { get; set; } + + //TValue this[string key, DateTime expiration] { set; } + //TValue this[string key, TimeSpan? timeToLive] { set; } + + bool Remove(string key); + bool TryRemove(string key); + + bool TryGetValue(string key, out TValue value); + //bool TryGetValue(string key, out TValue value, out TimeSpan? timeToLive); + //bool TryGetValue(string key, out TValue value, out DateTime? expiration); + //bool TryGetTimeToLive(string key, out TimeSpan? timetoLive); + + bool AddValue(string key, TValue value); + bool AddValue(string key, TValue value, TimeSpan? timeToLive); + bool AddValue(string key, TValue value, DateTime? expiration); + bool SetValue(string key, TValue value); + bool SetValue(string key, TValue value, TimeSpan? timeToLive); + bool SetValue(string key, TValue value, DateTime? expiration); + + bool ReplaceValue(string key, TValue value); + bool ReplaceValue(string key, TValue value, TimeSpan? timeToLive); + bool ReplaceValue(string key, TValue value, DateTime? expiration); + + //bool SetTimeToLive(string key, TimeSpan? timetoLive); + } + + //public interface ICacheClient : ICacheClient + //{ + //} +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/IIndexCreator.cs b/Sources/Linq2DynamoDb.DataContext/Caching/IIndexCreator.cs index 7005a4c..69be228 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/IIndexCreator.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/IIndexCreator.cs @@ -10,5 +10,6 @@ namespace Linq2DynamoDb.DataContext.Caching public interface IIndexCreator : IDisposable { void AddEntityToIndex(EntityKey entityKey, Document doc); + bool StartCreatingIndex(); } } diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/IndexCreator.cs b/Sources/Linq2DynamoDb.DataContext/Caching/IndexCreator.cs new file mode 100644 index 0000000..749cd46 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/IndexCreator.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Amazon.DynamoDBv2.DocumentModel; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public abstract partial class TableCache + { + protected class IndexCreator : IIndexCreator + { + protected readonly TableCache _parent; + protected TableIndex _index; + protected readonly string _indexKey; + protected readonly string _indexKeyInCache; + + internal IndexCreator(TableCache parent, string indexKeyInCache, SearchConditions searchConditions) + { + _parent = parent; + _index = new TableIndex(searchConditions); + _indexKey = searchConditions.Key; + _indexKeyInCache = indexKeyInCache; + } + + public virtual bool StartCreatingIndex() + { + // Marking the index as being rebuilt. + // Note: we're using Set mode. This means, that if an index exists in cache, it will be + // overwritten. That's OK, because if an index exists in cache, then in most cases we + // will not be in this place (the data will be simply read from cache). + _parent._cacheClient.SetValue(_indexKeyInCache, _index); + + // (re)registering it in the list of indexes (it should be visible for update operations) + if (!this._parent.PutIndexToList(_indexKey)) + { + _parent._cacheClient.Remove(_indexKeyInCache); + return false; + } + + _parent.Log("Index ({0}) was marked as being rebuilt", _indexKey); + return true; + } + + public virtual void AddEntityToIndex(EntityKey entityKey, Document doc) + { + string key = _parent.GetEntityKeyInCache(entityKey); + + // adding key to index (it's essential to do this _before_ checking the key length - the index should fail to be read next time) + _index.Index.Add(entityKey); + + // Putting the entity to cache, but only if it doesn't exist there. + // That's because when loading from DynamoDb whe should never overwrite local updates. + _parent._cacheClient.SetValue(key, new CacheDocumentWrapper(doc)); + } + + public virtual void Dispose() + { + if (this._index == null) + { + _parent._cacheClient.Remove(_indexKeyInCache); + return; + } + + this._index.IsBeingRebuilt = false; + + // saving the index to cache + _parent._cacheClient.ReplaceValue(_indexKeyInCache, _index); + _parent.Log("Index ({0}) with {1} entities saved to cache", _indexKey, _index.Index.Count); + } + } + } +} \ No newline at end of file diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/ProjectionIndexCreator.cs b/Sources/Linq2DynamoDb.DataContext/Caching/ProjectionIndexCreator.cs new file mode 100644 index 0000000..1bc45d0 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/ProjectionIndexCreator.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Amazon.DynamoDBv2.DocumentModel; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public abstract partial class TableCache + { + protected class ProjectionIndexCreator : IIndexCreator + { + protected readonly TableCache _parent; + + protected readonly TableProjectionIndex _index; + protected readonly string _indexKey; + protected readonly string _indexKeyInCache; + + internal ProjectionIndexCreator(TableCache parent, string indexKey, string indexKeyInCache, SearchConditions searchConditions) + { + this._parent = parent; + this._index = new TableProjectionIndex(searchConditions); + this._indexKey = indexKey; + this._indexKeyInCache = indexKeyInCache; + } + + public virtual bool StartCreatingIndex() + { + // Marking the index as being rebuilt. + // Note: we're using Set mode. This means, that if an index exists in cache, it will be + // overwritten. That's OK, because if an index exists in cache, then in most cases we + // will not be in this place (the data will be simply read from cache). + _parent._cacheClient.SetValue(_indexKeyInCache, _index); + + // (re)registering it in the list of indexes (it should be visible for update operations) + if (!this._parent.PutIndexToList(this._indexKey)) + { + this._parent._cacheClient.Remove(this._indexKeyInCache); + return false; + } + + this._parent.Log("Index ({0}) was marked as being rebuilt", (object) this._indexKey); + return true; + } + + public virtual void AddEntityToIndex(EntityKey entityKey, Document doc) + { + // adding document to index + this._index.AddEntity(doc); + } + + public virtual void Dispose() + { + this._index.IsBeingRebuilt = false; + + // saving the index to cache only if its version didn't change since we started reading results from DynamoDb + _parent._cacheClient.ReplaceValue(_indexKeyInCache, _index); + this._parent.Log("Index ({0}) with {1} entities saved to cache", (object) this._indexKey, this._index.Entities.Length); + } + } + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/RedisCacheClient.cs b/Sources/Linq2DynamoDb.DataContext/Caching/RedisCacheClient.cs new file mode 100644 index 0000000..430c3a0 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/RedisCacheClient.cs @@ -0,0 +1,164 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Linq2DynamoDb.DataContext.Utils; +using StackExchange.Redis; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public class RedisCacheClient : ICacheClient + { + private static readonly object s_syncObject = new object(); + private static IConnectionMultiplexer s_singConnectionMultiplexer; + + private static readonly Lazy s_connectionMultiplexer = new Lazy(ConnectToCluster); + + private static string[] s_initialRegistrationCallChain; + + private static IConnectionMultiplexer Multiplexer + { + get { return s_connectionMultiplexer.Value; } + } + + private static IConnectionMultiplexer ConnectToCluster() + { + lock (s_syncObject) + { + if (s_singConnectionMultiplexer != null) + throw new ApplicationException( + string.Format( + "Already connected to cluster. Connection established, object created, been there, done that. Initial call made through call chain: {0}", + string.Join(", ", s_initialRegistrationCallChain))); + + s_initialRegistrationCallChain = (new StackTrace() + .GetFrames() ?? new StackFrame[] { }) + .Select(x => x.GetMethod()) + .Select(x => x.Name) + .Reverse() + .ToArray(); + + return s_singConnectionMultiplexer = ConnectionMultiplexer.Connect(RedisServer); + } + } + + public static string RedisServer { get; set; } + public TimeSpan DefaultTimeToLive { get; set; } + + public RedisCacheClient(string connectionString, TimeSpan? ttl) + { + RedisServer = connectionString; + DefaultTimeToLive = ttl ?? TimeSpan.FromMinutes(15); + } + + public bool Remove(string key) + { + IDatabase db = Multiplexer.GetDatabase(); + return db.KeyDelete(key); + } + public bool TryRemove(string key) + { + try + { + return Remove(key); + } + catch + { + return false; + } + } + + public bool TryGetValue(string key, out T value) + { + value = default(T); + + IDatabase db = Multiplexer.GetDatabase(); + var returnValue = db.StringGet(key); + + if (!returnValue.HasValue) + return false; + + Base64Serializer serializer = new Base64Serializer(); + value = serializer.Deserialize(returnValue); + return true; + } + + public bool AddValue(string key, T value) + { + string existing; + if (TryGetValue(key, out existing)) + throw new ArgumentException(string.Format("Key '{0}' already exists in database", key)); + + return SetValue(key, value); + } + + public bool AddValue(string key, T value, TimeSpan? timeToLive) + { + string existing; + if (TryGetValue(key, out existing)) + throw new ArgumentException(string.Format("Key '{0}' already exists in database", key)); + + return SetValue(key, value, timeToLive); + } + + public bool AddValue(string key, T value, DateTime? expiration) + { + string existing; + if (TryGetValue(key, out existing)) + throw new ArgumentException(string.Format("Key '{0}' already exists in database", key)); + + return SetValue(key, value, expiration); + } + + public bool SetValue(string key, T value) + { + IDatabase db = Multiplexer.GetDatabase(); + Base64Serializer serializer = new Base64Serializer(); + string serialized = serializer.Serialize(value); + return db.StringSet(key, serialized); + } + + public bool SetValue(string key, T value, TimeSpan? timeToLive) + { + IDatabase db = Multiplexer.GetDatabase(); + Base64Serializer serializer = new Base64Serializer(); + string serialized = serializer.Serialize(value); + return db.StringSet(key, serialized, timeToLive); + } + + public bool SetValue(string key, T value, DateTime? expiration) + { + var timeToLive = expiration == null + ? null + : expiration - DateTime.UtcNow; + + return SetValue(key, value, timeToLive); + } + public bool ReplaceValue(string key, T value) + { + string existing; + if (!TryGetValue(key, out existing)) + throw new KeyNotFoundException(string.Format("Key '{0}' not found in database", key)); + + return SetValue(key, value); + } + public bool ReplaceValue(string key, T value, TimeSpan? timeToLive) + { + string existing; + if (!TryGetValue(key, out existing)) + throw new KeyNotFoundException(string.Format("Key '{0}' not found in database", key)); + + return SetValue(key, value, timeToLive); + } + public bool ReplaceValue(string key, T value, DateTime? expiration) + { + string existing; + if (!TryGetValue(key, out existing)) + throw new KeyNotFoundException(string.Format("Key '{0}' not found in database", key)); + + return SetValue(key, value, expiration); + } + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/RedisTableCache.cs b/Sources/Linq2DynamoDb.DataContext/Caching/RedisTableCache.cs new file mode 100644 index 0000000..8492445 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/RedisTableCache.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public class RedisTableCache : TableCache + { + public RedisTableCache(RedisCacheClient client) + : base(client) + { + } + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/TableCache.cs b/Sources/Linq2DynamoDb.DataContext/Caching/TableCache.cs new file mode 100644 index 0000000..470cca5 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Caching/TableCache.cs @@ -0,0 +1,761 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Amazon.DynamoDBv2.DocumentModel; +using Linq2DynamoDb.DataContext.Utils; + +namespace Linq2DynamoDb.DataContext.Caching +{ + public abstract partial class TableCache : ITableCache + { + protected Type _tableEntityType; + protected string _tableName; + protected string _hashKeyValue; + protected ICacheClient _cacheClient; + protected const string ProjectionIndexKeyPrefix = "[proj]"; + + /// + /// Limit to the number of indexes + /// TODO: estimate this number more precisely + /// + protected const int MaxNumberOfIndexes = 100; + + /// + /// Here all lock keys and their IDs are stored, for debugging purposes + /// + private readonly ConcurrentDictionary _lockIds = new ConcurrentDictionary(); + private static readonly Random Rnd = new Random(DateTime.Now.Millisecond); + + protected TableCache(ICacheClient client) + { + _cacheClient = client; + } + + #region ITableCache implementation + public event Action OnHit; + public event Action OnLog; + public event Action OnMiss; + + public virtual void Initialize(string tableName, Type tableEntityType, Primitive hashKeyValue) + { + if (this._tableEntityType != null) + { + throw new InvalidOperationException("An attempt to re-use an instance of TableCache for another
: pair was made. This is not allowed"); + } + + this._tableEntityType = tableEntityType; + + this._tableName = tableName; + this._hashKeyValue = hashKeyValue == null ? string.Empty : hashKeyValue.AsString(); + } + + public virtual Document GetSingleEntity(EntityKey entityKey) + { + string entityKeyInCache = this.GetEntityKeyInCache(entityKey); + if (entityKeyInCache == null) + return null; + + CacheDocumentWrapper wrapper; + if (!_cacheClient.TryGetValue(entityKeyInCache, out wrapper)) + { + this.OnMiss.FireSafely(); + return null; + } + + this.OnHit.FireSafely(); + return wrapper.Document; + } + + public virtual IEnumerable GetEntities(SearchConditions searchConditions, IEnumerable projectedFields, string orderByFieldName, bool orderByDesc) + { + // first trying to find a full index + string indexKey = searchConditions.Key; + HashSet index = this.TryLoadHealthyIndex(indexKey); + + Document[] result = null; + + // if no full index exist + if (index == null) + { + if (projectedFields != null) + { + // then there still might be a projection index + indexKey = this.GetProjectionIndexKey(searchConditions, projectedFields); + result = this.TryLoadProjectionIndexEntities(indexKey); + } + } + else + { + result = this.TryLoadIndexEntities(index, indexKey); + } + + // if we failed to load both full and projection index + if (result == null) + { + this.OnMiss.FireSafely(); + return null; + } + + this.OnHit.FireSafely(); + this.Log("Index ({0}) with {1} items successfully loaded from cache", indexKey, result.Length); + + if (string.IsNullOrEmpty(orderByFieldName)) + { + return result; + } + + // creating a comparer to sort the results + var comparer = PrimitiveComparer.GetComparer(this._tableEntityType, orderByFieldName); + + return orderByDesc + ? + result.OrderByDescending(doc => doc[orderByFieldName].AsPrimitive(), comparer) + : + result.OrderBy(doc => doc[orderByFieldName].AsPrimitive(), comparer) + ; + } + + public int? GetCount(SearchConditions searchConditions) + { + HashSet index = this.TryLoadHealthyIndex(searchConditions.Key); + + if (index == null) + { + this.OnMiss.FireSafely(); + return null; + } + + this.OnHit.FireSafely(); + this.Log("Contents of index ({0}) successfully loaded from cache and number of items returned is {1}", searchConditions.Key, index.Count); + + return index.Count; + } + + public void PutSingleLoadedEntity(EntityKey entityKey, Document doc) + { + string entityKeyInCache = this.GetEntityKeyInCache(entityKey); + if (entityKeyInCache == null) + return; + + // Putting the entity to cache, but only if it doesn't exist there. + // That's because when loading from DynamoDb whe should never overwrite local updates. + this._cacheClient.AddValue(entityKeyInCache, new CacheDocumentWrapper(doc)); + } + + public virtual void RemoveEntities(IEnumerable entities) + { + var entityKeysInCache = entities + .Select(this.GetEntityKeyInCache); + + Parallel.ForEach(entityKeysInCache, key => this._cacheClient.Remove(key)); + } + + /// + /// Applies modifications to cached entities and indexes + /// + public virtual void UpdateCacheAndIndexes(IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) + { + var allEntities = CombineEntityDictionaries(addedEntities, modifiedEntities, removedEntities.ToDictionary(k => k, k => (Document) null)); + + // modifying/removing all entities in parallel + var loopResult = Parallel.ForEach(allEntities, (entityPair, loopState) => + { + bool result = true; + if (entityPair.Value == null) + { + result = this._cacheClient.TryRemove(entityPair.Key); + } + else + { + result = this._cacheClient.SetValue(entityPair.Key, new CacheDocumentWrapper(entityPair.Value)); + } + + if (!result) + { + loopState.Stop(); + } + }); + + // All operations should succeed, otherwise removing all affected entities from cache. + // That's because in some cases partially succeded updates might become a problem. + if (!loopResult.IsCompleted) + { + this.Log("Failed to put updates for table {0} to cache", this._tableName); + + Parallel.ForEach(allEntities, entityPair => this._cacheClient.Remove(entityPair.Key)); + return; + } + + // now updating indexes + this.UpdateIndexes(this._hashKeyValue, addedEntities, modifiedEntities, removedEntities); + + // To support scenarios, when a context contains both a full table and a table filtered by a HashKey, + // by updating HashKey-filtered indexes we also should update indexes of the full table. + // And vice versa. + + if (string.IsNullOrEmpty(this._hashKeyValue)) + { + // Trying to update lists of indexes with predefined HashKey values + var affectedHashKeys = addedEntities + .Where(kv => kv.Key.RangeKey != null) + .Select(kv => kv.Key.HashKey.AsString()) + .Union + ( + modifiedEntities + .Where(kv => kv.Key.RangeKey != null) + .Select(kv => kv.Key.HashKey.AsString()) + ) + .Union + ( + removedEntities + .Where(kv => kv.RangeKey != null) + .Select(kv => kv.HashKey.AsString()) + ) + .Distinct(); + + foreach (var hashKeyValue in affectedHashKeys) + { + this.UpdateIndexes(hashKeyValue, addedEntities, modifiedEntities, removedEntities); + } + } + else + { + // updating the full table indexes as well + this.UpdateIndexes(string.Empty, addedEntities, modifiedEntities, removedEntities); + } + } + + public virtual IIndexCreator StartCreatingIndex(SearchConditions searchConditions) + { + string indexKeyInCache = this.GetIndexKeyInCache(searchConditions.Key, this._hashKeyValue); + return StartCreatingIndex(indexKeyInCache, searchConditions); + } + + protected T StartCreatingIndex(string indexKeyInCache, SearchConditions searchConditions) + where T : IIndexCreator + { + T creator = (T) Activator.CreateInstance(typeof(T), this, indexKeyInCache, searchConditions); + + if (!creator.StartCreatingIndex()) + { + this.Log("Failed to start creating index ({0})", searchConditions.Key); + return default(T); + } + + return creator; + } + + public virtual IIndexCreator StartCreatingProjectionIndex(SearchConditions searchConditions, IList projectedFields) + { + string indexKey = this.GetProjectionIndexKey(searchConditions, projectedFields); + string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); + return StartCreatingProjectionIndex(indexKey, indexKeyInCache, searchConditions); + } + + protected T StartCreatingProjectionIndex(string indexKey, string indexKeyInCache, SearchConditions searchConditions) + where T : IIndexCreator + { + T creator = (T) Activator.CreateInstance(typeof(T), this, indexKey, indexKeyInCache, searchConditions); + + if (!creator.StartCreatingIndex()) + { + this.Log("Failed to start creating projection index ({0})", indexKey); + return default(T); + } + + return creator; + } + + /// + /// Acquires a table-wide named lock and returns a disposable object, that represents it + /// + public IDisposable AcquireTableLock(string lockKey, TimeSpan lockTimeout) + { + return new TableLock(this, lockKey, lockTimeout); + } + /// + /// Acquires a named lock around the table by storing a random value in cache + /// + internal void LockTable(string lockKey, TimeSpan lockTimeout) + { + if (this._lockIds.ContainsKey(lockKey)) + { + throw new NotSupportedException("Recursive locks are not supported. Or maybe you're trying to use EnyimTableCache object from multiple threads?"); + } + + string cacheLockKey = this.GetLockKeyInCache(this._hashKeyValue, lockKey); + int cacheLockId = Rnd.Next(); + + var timeStart = DateTime.Now; + while (true) + { + if (DateTime.Now - timeStart > lockTimeout) + { + break; + } + + // Trying to create a new value in cache + if (this._cacheClient.AddValue(cacheLockKey, cacheLockId)) + { + this._lockIds[lockKey] = cacheLockId; + return; + } + + Thread.Sleep(10); + } + + // If we failed to acquire a lock within CacheLockTimeoutInSeconds + // (this means, that another process crached), then we should forcibly acquire it + + this.Log("Forcibly acquiring the table lock object {0} after {1} ms of waiting", lockKey, lockTimeout.TotalMilliseconds); + + this._cacheClient.SetValue(cacheLockKey, cacheLockId); + this._lockIds[lockKey] = cacheLockId; + } + + /// + /// Releases a named lock around the table + /// + internal void UnlockTable(string lockKey) + { + int lockId; + if (!this._lockIds.TryRemove(lockKey, out lockId)) + { + throw new InvalidOperationException + ( + string.Format("The table lock {0} wasn't acquired, so it cannot be released. Check your code!", lockKey) + ); + } + + string cacheLockKey = this.GetLockKeyInCache(this._hashKeyValue, lockKey); + int cacheLockId; + if (!_cacheClient.TryGetValue(cacheLockKey, out cacheLockId)) + { + // The cache miss might happen here, if a cache server crashed. + // In this case we just silently return. + this.Log("The table lock object {0} is missing in cache, but we don't care about that too much (probably, the cache node was restarted)", lockKey); + return; + } + + if (cacheLockId != lockId) + { + // This means, that another process has forcibly replaced our lock. + throw new InvalidOperationException + ( + string.Format("The table lock {0} was forcibly acquired by another process", lockKey) + ); + } + + this._cacheClient.Remove(cacheLockKey); + } + #endregion + + #region Cache key methods + /// + /// Gets a cache key for an entity + /// + protected virtual string GetEntityKeyInCache(EntityKey entityKey) + { + return string.Format("{0}:{1}", _tableName, entityKey); + } + + protected virtual string[] GetEntityKeysInCache(IEnumerable entities) + { + string[] entityKeysInCache = entities + .Select(this.GetEntityKeyInCache).ToArray(); + return entityKeysInCache; + } + + protected virtual KeyValuePair[] CombineEntityDictionaries(params IDictionary[] entities) + { + IEnumerable> allEntities = new Dictionary(entities.Sum(x => x.Count)); + bool first = true; + foreach (var item in entities) + { + allEntities = allEntities.Union(item); + } + return allEntities.Select(kv => new KeyValuePair(this.GetEntityKeyInCache(kv.Key), kv.Value)).ToArray(); + } + + /// + /// Gets a cache key for an index. Returns null if the key is invalid for any reason. + /// + protected string GetIndexKeyInCache(string indexKey, string hashKeyValue) + { + return string.Format("{0}{1}{2}", _tableName, hashKeyValue, indexKey).ToBase64(); + } + + /// + /// Gets a cache key for the list of all indexes + /// + protected string GetIndexListKeyInCache(string hashKeyValue) + { + return string.Format("{0}{1}:indexes", _tableName, hashKeyValue).ToBase64(); + } + + /// + /// Gets a cache key for a table-wide lock + /// + protected string GetLockKeyInCache(string lockKey, string hashKeyValue) + { + return string.Format("{0}{1}{2}", _tableName, hashKeyValue, lockKey).ToBase64(); + } + + /// + /// Composes a key for a projection index + /// + protected string GetProjectionIndexKey(SearchConditions searchConditions, IEnumerable projectedFields) + { + return string.Format("{0}{1}; {2}", + ProjectionIndexKeyPrefix, + projectedFields.Aggregate((i, s) => string.Format("{0},{1}", i, s)), + searchConditions.Key); + } + #endregion + + #region Index-related methods + /// + /// Loads the index by its key, ensuring everything + /// + protected HashSet TryLoadHealthyIndex(string indexKey) + { + string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); + + // first trying to get the index from cache + TableIndex index; + if (!_cacheClient.TryGetValue(indexKeyInCache, out index)) + { + return null; + } + + // Checking, that index is mentioned in the list of indexes. + // Only indexes from list are updated with local updates. + if (!DoesIndexExistInTheListOfIndexes(indexKey)) + { + _cacheClient.Remove(indexKeyInCache); + return null; + } + + // If index is currently being filled with data from DynamoDb, then we can't use it yet + if (index.IsBeingRebuilt) + { + return null; + } + + return index.Index; + } + + /// + /// Loads a projection index by its key + /// + protected Document[] TryLoadProjectionIndexEntities(string indexKey) + { + string indexKeyInCache = this.GetIndexKeyInCache(indexKey, this._hashKeyValue); + if (indexKeyInCache == null) + { + return null; + } + + // first trying to get the index from cache + TableProjectionIndex index; + if (!_cacheClient.TryGetValue(indexKeyInCache, out index)) + { + return null; + } + + // Checking, that index is mentioned in the list of indexes. + // Only indexes from list are updated with local updates. + if (!this.DoesIndexExistInTheListOfIndexes(indexKey)) + { + this._cacheClient.Remove(indexKeyInCache); + return null; + } + + // If index is currently being filled with data from DynamoDb, then we can't use it yet + if (index.IsBeingRebuilt) + { + return null; + } + + return index.Entities; + } + + private Document[] TryLoadIndexEntities(HashSet entityKeys, string indexKey) + { + var result = new Document[entityKeys.Count]; + + // now we have to succeed loading all entities from cache + var loopResult = Parallel.ForEach(entityKeys, (entityKey, loopState, i) => + { + CacheDocumentWrapper wrapper; + string entityKeyInCache = GetEntityKeyInCache(entityKey); + if (!_cacheClient.TryGetValue(entityKeyInCache, out wrapper)) + { + loopState.Stop(); + return; + } + + result[i] = wrapper.Document; + }); + + if (loopResult.IsCompleted) + { + return result; + } + + this.Log("Failed to get contents of index ({0}) from cache", indexKey); + + // the index is not usable any more, we'd better delete it + this.RemoveIndexFromList + ( + indexKey, + this.GetIndexKeyInCache(indexKey, this._hashKeyValue), + this.GetIndexListKeyInCache(this._hashKeyValue) + ); + return null; + } + + protected bool DoesIndexExistInTheListOfIndexes(string indexKey) + { + TableIndexList indexList; + _cacheClient.TryGetValue(GetIndexListKeyInCache(_hashKeyValue), out indexList); + return ((indexList != null) && (indexList.Contains(indexKey))); + } + + /// + /// Adds matching entities and removes removed entities from indexes + /// + protected virtual void UpdateIndexes(string hashKeyValue, IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) + { + string indexListKeyInCache = this.GetIndexListKeyInCache(hashKeyValue); + + TableIndexList indexes; + if (!_cacheClient.TryGetValue(indexListKeyInCache, out indexes)) + { + _cacheClient.Remove(indexListKeyInCache); + return; + } + + // storing indexes, that fail to be updated + var indexesToBeRemoved = new ConcurrentDictionary(); + + Parallel.ForEach(indexes, indexKey => + { + string indexKeyInCache = this.GetIndexKeyInCache(indexKey, hashKeyValue); + + bool indexUpdateSucceeded = + this.IsProjectionIndex(indexKey) + ? + this.UpdateProjectionIndex(indexKey, indexKeyInCache, addedEntities, modifiedEntities, removedEntities) + : + this.UpdateIndex(indexKey, indexKeyInCache, addedEntities, removedEntities); + + if (!indexUpdateSucceeded) + { + indexesToBeRemoved[indexKey] = indexKeyInCache; + } + }); + + // removing bad indexes + if (indexesToBeRemoved.Count > 0) + { + this.RemoveIndexesFromList(indexesToBeRemoved, indexListKeyInCache); + } + } + + /// + /// Adds matching entities and removes removed entities from an index + /// + protected virtual bool UpdateIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, ICollection removedEntities) + { + TableIndex index; + if (!_cacheClient.TryGetValue(indexKeyInCache, out index)) + { + return false; + } + + bool indexChanged = false; + + try + { + // adding added entities, if they match the index conditions + foreach + ( + var entityPair in addedEntities.Where + ( + entityPair => index.MatchesSearchConditions(entityPair.Value, this._tableEntityType) + ) + ) + { + index.Index.Add(entityPair.Key); + indexChanged = true; + } + } + catch (Exception ex) + { + // some stupid exceptions might occur within TableIndex.MatchesSearchConditions() + this.Log("Failed to update index ({0}) because of the following exception: {1}", indexKey, ex); + return false; + } + + // removing removed entities + foreach (var entityKey in removedEntities.Where(entityKey => index.Index.Contains(entityKey))) + { + index.Index.Remove(entityKey); + indexChanged = true; + } + + if (!indexChanged) + { + return true; + } + + _cacheClient.SetValue(indexKeyInCache, index); + this.Log("Index ({0}) updated. {1} entities added, {2} entities removed", indexKey, addedEntities.Count, removedEntities.Count); + return true; + } + + /// + /// Checks if a projection index should be dropped because of some added/modified/removed entities + /// + protected virtual bool UpdateProjectionIndex(string indexKey, string indexKeyInCache, IDictionary addedEntities, IDictionary modifiedEntities, ICollection removedEntities) + { + // if some entities were modified or removed + if + ( + (modifiedEntities.Count > 0) + || + (removedEntities.Count > 0) + ) + { + // then the only option for us is to drop the index - as we don't know, if these entities conform to index's conditions or not + this.Log("Projection index ({0}) removed because some entities were removed", indexKey); + return false; + } + + TableProjectionIndex index; + if (!_cacheClient.TryGetValue(indexKeyInCache, out index)) + { + return false; + } + + try + { + // if any added or modified entities conform to index's conditions + if + ( + addedEntities.Values.Any + ( + en => index.MatchesSearchConditions(en, this._tableEntityType) + ) + ) + { + this.Log("Projection index ({0}) removed because of some added entities", indexKey); + return false; + } + } + catch (Exception ex) + { + // some stupid exceptions might occur within TableIndex.MatchesSearchConditions() + this.Log("Failed to check projection index ({0}) because of the following exception: {1}", indexKey, ex); + return false; + } + + return true; + } + + /// + /// Checks if the provided key is a projection index's key + /// + protected bool IsProjectionIndex(string indexKey) + { + return indexKey.StartsWith(ProjectionIndexKeyPrefix); + } + + protected virtual bool PutIndexToList(string indexKey) + { + string indexListKeyInCache = this.GetIndexListKeyInCache(this._hashKeyValue); + + TableIndexList indexes; + if (!_cacheClient.TryGetValue(indexListKeyInCache, out indexes)) + indexes = new TableIndexList(MaxNumberOfIndexes); + + if (indexes.Contains(indexKey)) + { + return true; + } + + string poppedIndexKey = indexes.Push(indexKey); + if (poppedIndexKey != null) + { + // if an older index was removed from the list during push operation - then removing it from cache + this._cacheClient.Remove(this.GetIndexKeyInCache(poppedIndexKey, this._hashKeyValue)); + this.Log("An old index ({0}) was removed from cache because the number of supported indexes ({1}) is exceeded", poppedIndexKey, MaxNumberOfIndexes); + } + + try + { + _cacheClient.SetValue(indexListKeyInCache, indexes); + return true; + } + catch + { + return false; + } + } + + protected virtual void RemoveIndexFromList(string indexKey, string indexKeyInCache, string indexListKeyInCache) + { + // always removing the index itself + _cacheClient.Remove(indexKeyInCache); + + TableIndexList indexes; + if (!_cacheClient.TryGetValue(indexListKeyInCache, out indexes)) + return; + + indexes.Remove(indexKey); + + // store updated index list + _cacheClient.ReplaceValue(indexListKeyInCache, indexes); + } + + protected virtual void RemoveIndexesFromList(IDictionary indexKeys, string indexListKeyInCache) + { + // always removing indexes themselves + Parallel.ForEach(indexKeys.Values, indexKeyInCache => this._cacheClient.Remove(indexKeyInCache)); + + TableIndexList indexes; + if (!_cacheClient.TryGetValue(indexListKeyInCache, out indexes)) + return; + + foreach (KeyValuePair pair in indexKeys) + { + indexes.Remove(pair.Value); + } + + // store updated index list + _cacheClient.ReplaceValue(indexListKeyInCache, indexes); + } + #endregion + + internal void Log(string format, params object[] args) + { + var handler = this.OnLog; + if (handler == null) + { + return; + } + string tableName = this._tableName; + if (!string.IsNullOrEmpty(this._hashKeyValue)) + { + tableName = string.Format("{0}:{1}", tableName, this._hashKeyValue); + } + + handler(string.Format("TableCache({0}) : ", tableName, string.Format(format, args))); + } + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/TableLock.cs b/Sources/Linq2DynamoDb.DataContext/Caching/TableLock.cs index e972b59..d931777 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/TableLock.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/TableLock.cs @@ -7,11 +7,11 @@ namespace Linq2DynamoDb.DataContext.Caching /// internal class TableLock : IDisposable { - private readonly EnyimTableCache _cache; + private readonly TableCache _cache; private readonly string _lockKey; private bool _disposed; - internal TableLock(EnyimTableCache repository, string lockKey, TimeSpan lockTimeout) + internal TableLock(TableCache repository, string lockKey, TimeSpan lockTimeout) { this._cache = repository; this._lockKey = lockKey; diff --git a/Sources/Linq2DynamoDb.DataContext/Linq2DynamoDb.DataContext.csproj b/Sources/Linq2DynamoDb.DataContext/Linq2DynamoDb.DataContext.csproj index 0606c48..b466139 100644 --- a/Sources/Linq2DynamoDb.DataContext/Linq2DynamoDb.DataContext.csproj +++ b/Sources/Linq2DynamoDb.DataContext/Linq2DynamoDb.DataContext.csproj @@ -48,6 +48,10 @@ False ..\packages\EnyimMemcached.2.12\lib\net35\Enyim.Caching.dll + + ..\packages\StackExchange.Redis.1.0.488\lib\net45\StackExchange.Redis.dll + True + @@ -65,7 +69,17 @@ + + + Code + + + + + + + @@ -89,7 +103,6 @@ - @@ -111,6 +124,7 @@ + diff --git a/Sources/Linq2DynamoDb.DataContext/Utils/Base64Serializer.cs b/Sources/Linq2DynamoDb.DataContext/Utils/Base64Serializer.cs new file mode 100644 index 0000000..ab9f051 --- /dev/null +++ b/Sources/Linq2DynamoDb.DataContext/Utils/Base64Serializer.cs @@ -0,0 +1,41 @@ +using System; +using System.IO; +using System.Runtime.Serialization.Formatters.Binary; + +namespace Linq2DynamoDb.DataContext.Utils +{ + public class Base64Serializer + { + public string Serialize(T obj) + { + byte[] bytes = SerializeToBytes(obj); + return Convert.ToBase64String(bytes); + } + + public T Deserialize(string text) + { + byte[] bytes = Convert.FromBase64String(text); + return DeserializeBytes(bytes); + } + + public static byte[] SerializeToBytes(T obj) + { + using (MemoryStream memoryStream = new MemoryStream()) + { + BinaryFormatter serializer = new BinaryFormatter(); + serializer.Serialize(memoryStream, obj); + return memoryStream.ToArray(); + } + } + + public static T DeserializeBytes(byte[] bytes) + { + using (MemoryStream memoryStream = new MemoryStream(bytes)) + { + BinaryFormatter serializer = new BinaryFormatter(); + memoryStream.Seek(0, SeekOrigin.Begin); + return (T) serializer.Deserialize(memoryStream); + } + } + } +} diff --git a/Sources/Linq2DynamoDb.DataContext/packages.config b/Sources/Linq2DynamoDb.DataContext/packages.config index 79f1b7e..92d3c90 100644 --- a/Sources/Linq2DynamoDb.DataContext/packages.config +++ b/Sources/Linq2DynamoDb.DataContext/packages.config @@ -3,4 +3,5 @@ + \ No newline at end of file From 014cf4e1574abdea78bc3138b12ceef2fea92767 Mon Sep 17 00:00:00 2001 From: howcheng Date: Wed, 10 Feb 2016 10:47:12 -0800 Subject: [PATCH 2/2] remove commented-out code --- .../Caching/EnyimMemcachedClient.cs | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs index 6482f52..65ebb16 100644 --- a/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs +++ b/Sources/Linq2DynamoDb.DataContext/Caching/EnyimMemcachedClient.cs @@ -19,28 +19,6 @@ public EnyimMemcachedClient(MemcachedClient client, TimeSpan? defaultTtl = null) DefaultTimeToLive = defaultTtl ?? TimeSpan.FromMinutes(15); } - //public string this[string key] - //{ - // get - // { - // throw new NotImplementedException(); - // } - // set - // { - // throw new NotImplementedException(); - // } - //} - - //public string this[string key, DateTime expiration] - //{ - // set { throw new NotImplementedException(); } - //} - - //public string this[string key, TimeSpan? timeToLive] - //{ - // set { throw new NotImplementedException(); } - //} - public bool Remove(string key) { return _cacheClient.Remove(key); @@ -63,21 +41,6 @@ public bool TryGetValue(string key, out T value) return result.Success; } - //public bool TryGetValue(string key, out T value, out TimeSpan? timeToLive) - //{ - // throw new NotImplementedException(); - //} - - //public bool TryGetValue(string key, out T value, out DateTime? expiration) - //{ - // throw new NotImplementedException(); - //} - - //public bool TryGetTimeToLive(string key, out TimeSpan? timetoLive) - //{ - // throw new NotImplementedException(); - //} - public bool AddValue(string key, T value) { return _cacheClient.Store(StoreMode.Add, key, value, DefaultTimeToLive); @@ -136,14 +99,5 @@ private bool StoreWithExpiration(StoreMode mode, string key, T value, DateTim } return _cacheClient.Store(mode, key, value, expiration.Value); } - - //public bool SetTimeToLive(string key, TimeSpan? timetoLive) - //{ - // object value; - // if (!TryGetValue(key, out value)) - // return false; - - // return ReplaceValue(key, value, ) - //} } }