Skip to content

Commit

Permalink
Merge branch 'feature_LWDEV-7546-Improve-caching-for-asset-pairs' int…
Browse files Browse the repository at this point in the history
…o dev
  • Loading branch information
njannink committed May 31, 2018
2 parents 29bcc8d + 9484c15 commit a50c978
Show file tree
Hide file tree
Showing 21 changed files with 348 additions and 289 deletions.
79 changes: 17 additions & 62 deletions client/Lykke.Service.Assets.Client/AssetsServiceWithCache.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Common.Log;
Expand All @@ -10,87 +8,44 @@
namespace Lykke.Service.Assets.Client
{
///<inheritdoc/>
public class AssetsServiceWithCache : IAssetsServiceWithCache
internal class AssetsServiceWithCache : IAssetsServiceWithCache
{
private readonly IAssetsService _assetsService;
private readonly IDictionaryCache<Asset> _assetsCache;
private readonly IDictionaryCache<AssetPair> _assetPairsCache;
private readonly ILog _log;

///<inheritdoc/>
public AssetsServiceWithCache(IAssetsService assetsService, IDictionaryCache<Asset> assetsCache, IDictionaryCache<AssetPair> assetPairsCache, ILog log)
public AssetsServiceWithCache(IDictionaryCache<Asset> assetsCache, IDictionaryCache<AssetPair> assetPairsCache)
{
_assetsService = assetsService;
_assetsCache = assetsCache;
_assetPairsCache = assetPairsCache;
_log = log;
}

///<inheritdoc/>
public async Task<IReadOnlyCollection<AssetPair>> GetAllAssetPairsAsync(CancellationToken cancellationToken = new CancellationToken())
{
await _assetPairsCache.EnsureCacheIsUpdatedAsync(() => GetUncachedAssetPairsAsync(cancellationToken));

return _assetPairsCache.GetAll();
}
public Task<IReadOnlyCollection<AssetPair>> GetAllAssetPairsAsync(CancellationToken cancellationToken = new CancellationToken())
=> _assetPairsCache.GetAll(cancellationToken);

async Task<IReadOnlyCollection<Asset>> IAssetsServiceWithCache.GetAllAssetsAsync(CancellationToken cancellationToken)
=> await GetAllAssetsAsync(false, cancellationToken);
Task<IReadOnlyCollection<Asset>> IAssetsServiceWithCache.GetAllAssetsAsync(CancellationToken cancellationToken = new CancellationToken())
=> GetAllAssetsAsync(false, cancellationToken);

///<inheritdoc/>
public async Task<IReadOnlyCollection<Asset>> GetAllAssetsAsync(bool includeNonTradable, CancellationToken cancellationToken = new CancellationToken())
{
await _assetsCache.EnsureCacheIsUpdatedAsync(() => GetUncachedAssetsAsync(cancellationToken));

return _assetsCache.GetAll();
}
public Task<IReadOnlyCollection<Asset>> GetAllAssetsAsync(bool includeNonTradable, CancellationToken cancellationToken = new CancellationToken())
=> _assetsCache.GetAll(cancellationToken);

///<inheritdoc/>
public async Task<Asset> TryGetAssetAsync(string assetId, CancellationToken cancellationToken = new CancellationToken())
{
await _assetsCache.EnsureCacheIsUpdatedAsync(() => GetUncachedAssetsAsync(cancellationToken));

return _assetsCache.TryGet(assetId);
}

///<inheritdoc/>
public async Task<AssetPair> TryGetAssetPairAsync(string assetPairId, CancellationToken cancellationToken = new CancellationToken())
{
await _assetPairsCache.EnsureCacheIsUpdatedAsync(() => GetUncachedAssetPairsAsync(cancellationToken));

return _assetPairsCache.TryGet(assetPairId);
}
public Task<Asset> TryGetAssetAsync(string assetId, CancellationToken cancellationToken = new CancellationToken())
=> _assetsCache.TryGet(assetId, cancellationToken);

///<inheritdoc/>
public async Task UpdateAssetPairsCacheAsync(CancellationToken cancellationToken = new CancellationToken())
{
_assetPairsCache.Update(await GetUncachedAssetPairsAsync(cancellationToken));
}
public Task<AssetPair> TryGetAssetPairAsync(string assetPairId, CancellationToken cancellationToken = new CancellationToken())
=> _assetPairsCache.TryGet(assetPairId, cancellationToken);

///<inheritdoc/>
public async Task UpdateAssetsCacheAsync(CancellationToken cancellationToken = new CancellationToken())
{
_assetsCache.Update(await GetUncachedAssetsAsync(cancellationToken));
}
public Task UpdateAssetPairsCacheAsync(CancellationToken cancellationToken = new CancellationToken())
=> _assetPairsCache.Reset(cancellationToken);

///<inheritdoc/>
public IDisposable StartAutoCacheUpdate()
{
return new CompositeDisposable
{
_assetPairsCache.StartAutoUpdate(nameof(AssetsServiceWithCache), _log, () => GetUncachedAssetPairsAsync(new CancellationToken())),
_assetsCache.StartAutoUpdate(nameof(AssetsServiceWithCache), _log, () => GetUncachedAssetsAsync(new CancellationToken()))
};
}

private async Task<IEnumerable<Asset>> GetUncachedAssetsAsync(CancellationToken cancellationToken)
{
return await _assetsService.AssetGetAllAsync(false, cancellationToken);
}

private async Task<IEnumerable<AssetPair>> GetUncachedAssetPairsAsync(CancellationToken cancellationToken)
{
return await _assetsService.AssetPairGetAllAsync(cancellationToken);
}
public Task UpdateAssetsCacheAsync(CancellationToken cancellationToken = new CancellationToken())
=> _assetsCache.Reset(cancellationToken);
}
}
9 changes: 0 additions & 9 deletions client/Lykke.Service.Assets.Client/Cache/DateTimeProvider.cs

This file was deleted.

99 changes: 34 additions & 65 deletions client/Lykke.Service.Assets.Client/Cache/DictionaryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,65 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
using Lykke.Common.Cache;
using Lykke.Service.Assets.Client.Updaters;

namespace Lykke.Service.Assets.Client.Cache
{
/// <inheritdoc />
public class DictionaryCache<T> : IDictionaryCache<T>
/// <summary>
/// Base class for a dictionary cache.
/// </summary>
internal class DictionaryCache<T> : IDictionaryCache<T>
where T : ICacheItem
{
private readonly IDateTimeProvider _dateTimeProvider;
private readonly TimeSpan _cacheExpirationPeriod;
private const string AllItems = @"AllItems";
private readonly OnDemandDataCache<Dictionary<string, T>> _innerCache;
private readonly IUpdater<T> _updater;
private readonly TimeSpan _expirationTime;

private Dictionary<string, T> _items;
private DateTime _cacheExpirationMoment;
private bool _inAutoUpdate;

/// <inheritdoc />
public DictionaryCache(IDateTimeProvider dateTimeProvider, TimeSpan cacheExpirationPeriod)
/// <summary>
/// Create new dictionary cache.
/// </summary>
protected DictionaryCache(IUpdater<T> updater, TimeSpan expirationTime)
{
_items = new Dictionary<string, T>();
_cacheExpirationMoment = DateTime.MinValue;
_dateTimeProvider = dateTimeProvider;
_cacheExpirationPeriod = cacheExpirationPeriod;
_innerCache = new OnDemandDataCache<Dictionary<string, T>>();
_updater = updater;
_expirationTime = expirationTime;
}

/// <inheritdoc />
public IDisposable StartAutoUpdate(string componentName, ILog log, Func<Task<IEnumerable<T>>> getAllAsync)
public async Task Reset(CancellationToken token)
{
if (_inAutoUpdate)
{
throw new InvalidOperationException("Dictionary is already in auto update mode.");
}

_inAutoUpdate = true;
async Task UpdateCache(ITimerTrigger trigger, TimerTriggeredHandlerArgs args, CancellationToken token)
{
await Update(getAllAsync);
}

var timer = new TimerTrigger(componentName, _cacheExpirationPeriod, log, UpdateCache);
timer.Start();

return Disposable.Create(() =>
{
_inAutoUpdate = false;
timer.Dispose();
});
_innerCache.Remove(AllItems);
await GetItems(token);
}

/// <inheritdoc />
public async Task EnsureCacheIsUpdatedAsync(Func<Task<IEnumerable<T>>> getAllItemsAsync)
{
if (_inAutoUpdate)
{
return;
}

if (_cacheExpirationMoment < _dateTimeProvider.UtcNow)
{
await Update(getAllItemsAsync);
}
}

private async Task Update(Func<Task<IEnumerable<T>>> getAllItemsAsync)
public async Task<T> TryGet(string id, CancellationToken token)
{
var items = await getAllItemsAsync();
Update(items);
var items = await GetItems(token);
items.TryGetValue(id, out var item);
return item;
}

/// <inheritdoc />
public void Update(IEnumerable<T> items)
public async Task<IReadOnlyCollection<T>> GetAll(CancellationToken token)
{
_items = items.ToDictionary(p => p.Id, p => p);

_cacheExpirationMoment = _dateTimeProvider.UtcNow + _cacheExpirationPeriod;
var items = await GetItems(token);
return items.Values;
}

/// <inheritdoc />
public T TryGet(string id)
private async Task<Dictionary<string, T>> GetItems(CancellationToken token)
{
_items.TryGetValue(id, out var pair);

return pair;
}
async Task<Dictionary<string, T>> Refresh()
{
var items = await _updater.GetItemsAsync(token);
return items.ToDictionary(x => x.Id);
}

/// <inheritdoc />
public IReadOnlyCollection<T> GetAll()
{
return _items.Values;
return await _innerCache.GetOrAddAsync(AllItems, _ => Refresh(), _expirationTime);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using Lykke.Service.Assets.Client.Updaters;

namespace Lykke.Service.Assets.Client.Cache
{
/// <summary>
/// Expiring dictionary cache where the cache entry expires after given time.
/// </summary>
/// <typeparam name="T">the type of cached item</typeparam>
internal sealed class ExpiringDictionaryCache<T> : DictionaryCache<T>
where T : ICacheItem
{
/// <summary>
/// Create a new expiring dictionary cache.
/// </summary>
/// <param name="expirationTime">expiration time</param>
/// <param name="updater">item updater</param>
public ExpiringDictionaryCache(TimeSpan expirationTime, IUpdater<T> updater)
: base(updater, expirationTime)
{
}
}
}
8 changes: 7 additions & 1 deletion client/Lykke.Service.Assets.Client/Cache/ICacheItem.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
namespace Lykke.Service.Assets.Client.Cache
{
public interface ICacheItem
/// <summary>
/// Cache item {T} in a <see cref="IDictionaryCache{T}"/>.
/// </summary>
internal interface ICacheItem
{
/// <summary>
/// The id of the entry
/// </summary>
string Id { get; }
}
}

This file was deleted.

24 changes: 7 additions & 17 deletions client/Lykke.Service.Assets.Client/Cache/IDictionaryCache.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Common.Log;

Expand All @@ -8,32 +8,22 @@ namespace Lykke.Service.Assets.Client.Cache
/// <summary>
/// Simple in-memory client side cache.
/// </summary>
public interface IDictionaryCache<T>
internal interface IDictionaryCache<T>
where T : ICacheItem
{
/// <summary>
/// Starts an automatic updater that keeps the cache updated on a background thread.
/// Resets the cache.
/// </summary>
IDisposable StartAutoUpdate(string componentName, ILog log, Func<Task<IEnumerable<T>>> getAllAsync);

/// <summary>
/// Update the cache when cache has expired.
/// </summary>
Task EnsureCacheIsUpdatedAsync(Func<Task<IEnumerable<T>>> getAllAsync);

/// <summary>
/// Update the cache with given data.
/// </summary>
void Update(IEnumerable<T> items);
Task Reset(CancellationToken token);

/// <summary>
/// Try to get cached item with given id.
/// </summary>
T TryGet(string id);
Task<T> TryGet(string id, CancellationToken token);

/// <summary>
/// Get all cached items.
/// </summary>
IReadOnlyCollection<T> GetAll();
Task<IReadOnlyCollection<T>> GetAll(CancellationToken token);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using Common;
using Common.Log;
using Lykke.Service.Assets.Client.Updaters;

namespace Lykke.Service.Assets.Client.Cache
{
/// <summary>
/// A dictionary cache that refreshes/synchronizes in the background.
/// </summary>
/// <typeparam name="T">the type of cached item</typeparam>
internal sealed class RefreshingDictionaryCache<T> : DictionaryCache<T>, IDisposable
where T : ICacheItem
{
private readonly TimerTrigger _trigger;

/// <summary>
/// Creates a new refreshing dictionary cache.
/// </summary>
/// <param name="refreshTime">the refresh time</param>
/// <param name="updater">the item updater</param>
/// <param name="log">the lykke log</param>
public RefreshingDictionaryCache(TimeSpan refreshTime, IUpdater<T> updater, ILog log)
: base(updater, refreshTime.Add(refreshTime))
{
_trigger = new TimerTrigger(nameof(AssetsService), refreshTime, log,
async (x, y, token) => await Reset(token));
_trigger.Start();
}

/// <inheritdoc />
public void Dispose()
{
_trigger?.Dispose();
}
}
}
6 changes: 0 additions & 6 deletions client/Lykke.Service.Assets.Client/IAssetsServiceWithCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,5 @@ public interface IAssetsServiceWithCache
/// Forcibly updates client-side assets cache
/// </summary>
Task UpdateAssetsCacheAsync(CancellationToken cancellationToken = new CancellationToken());

/// <summary>
/// Starts an automatic update process that will keep the caches updated the background.
/// </summary>
/// <returns>the update process, when disposed the auto update will stop</returns>
IDisposable StartAutoCacheUpdate();
}
}
Loading

0 comments on commit a50c978

Please sign in to comment.