From 3b76648c5f89a338a651c9d6a4ce5258aa55d2da Mon Sep 17 00:00:00 2001 From: Valentin Stavetski Date: Fri, 8 Jun 2018 19:22:17 +0300 Subject: [PATCH] #201 add ehcache --- build.gradle | 6 +++- .../configuration/TickersConfiguration.kt | 35 +++++++++++++++++++ .../markets/ticker/service/TickerService.kt | 32 +++++++++++++++-- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 7c263ef5..d937cc55 100644 --- a/build.gradle +++ b/build.gradle @@ -10,6 +10,7 @@ buildscript { rxkotlinVersion = "2.2.0" reactorVersion = "3.1.3.RELEASE" cassandraVersion = "3.5.0" + ehcacheVersion = "3.4.0" xchangeStreamCoreVersion = "4.3.2" xchangeStreamVersion = "4.3.3-SNAPSHOT" @@ -160,6 +161,8 @@ subprojects { dependency("io.micrometer:micrometer-core:$micrometerVersion") dependency("io.micrometer:micrometer-registry-prometheus:$micrometerVersion") + + dependency("org.ehcache:ehcache:$ehcacheVersion") } } @@ -284,7 +287,8 @@ project(":tickers-batch") { compile project(":common-kafka") compile project(":cassandra-service") - compile("org.springframework.boot:spring-boot-starter") + compile 'org.springframework.boot:spring-boot-starter' + compile 'org.ehcache:ehcache' testCompile("org.springframework.boot:spring-boot-starter-test") } diff --git a/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/configuration/TickersConfiguration.kt b/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/configuration/TickersConfiguration.kt index f00a38e8..6d6049e6 100644 --- a/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/configuration/TickersConfiguration.kt +++ b/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/configuration/TickersConfiguration.kt @@ -1,15 +1,27 @@ package fund.cyber.markets.ticker.configuration +import fund.cyber.markets.cassandra.model.CqlTokenTicker import fund.cyber.markets.common.LAG_FROM_REAL_TIME_MIN import fund.cyber.markets.common.LAG_FROM_REAL_TIME_MIN_DEFAULT import fund.cyber.markets.common.MINUTES_TO_MILLIS import fund.cyber.markets.common.WINDOW_INTERVALS_MIN import fund.cyber.markets.common.WINDOW_INTERVALS_MIN_DEFAULT import fund.cyber.markets.common.convert +import org.ehcache.Cache +import org.ehcache.CacheManager +import org.ehcache.config.builders.CacheConfigurationBuilder +import org.ehcache.config.builders.CacheManagerBuilder +import org.ehcache.config.builders.ResourcePoolsBuilder +import org.ehcache.config.units.MemoryUnit +import org.ehcache.core.spi.service.StatisticsService +import org.ehcache.impl.internal.statistics.DefaultStatisticsService import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +const val TICKERS_CACHE_NAME = "tickers" +const val TICKERS_CACHE_SIZE_GB = 5L + @Configuration class TickersConfiguration( @Value("\${$WINDOW_INTERVALS_MIN:$WINDOW_INTERVALS_MIN_DEFAULT}") @@ -28,4 +40,27 @@ class TickersConfiguration( @Bean fun lagFromRealTime(): Long = lag convert MINUTES_TO_MILLIS + @Bean + fun tickerCache(cacheManager: CacheManager): Cache> { + return cacheManager.getCache(TICKERS_CACHE_NAME, String::class.java, MutableList::class.java as Class>) + } + + @Bean + fun cacheStatisticsService() = DefaultStatisticsService() + + @Bean + fun cacheManager(cacheStatisticsService: StatisticsService): CacheManager { + + return CacheManagerBuilder.newCacheManagerBuilder() + .withCache(TICKERS_CACHE_NAME, + CacheConfigurationBuilder.newCacheConfigurationBuilder( + String::class.java, + MutableList::class.java as Class>, + ResourcePoolsBuilder.newResourcePoolsBuilder().heap(TICKERS_CACHE_SIZE_GB, MemoryUnit.GB) + ) + ) + .using(cacheStatisticsService) + .build(true) + } + } \ No newline at end of file diff --git a/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/service/TickerService.kt b/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/service/TickerService.kt index cfa136bb..9046203e 100644 --- a/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/service/TickerService.kt +++ b/tickers-batch/src/main/kotlin/fund/cyber/markets/ticker/service/TickerService.kt @@ -6,6 +6,7 @@ import fund.cyber.markets.common.Intervals import fund.cyber.markets.common.MILLIS_TO_DAYS import fund.cyber.markets.common.convert import fund.cyber.markets.common.model.TokenTicker +import org.ehcache.Cache import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import reactor.core.publisher.Flux @@ -13,7 +14,8 @@ import java.util.* @Service class TickerService( - private val tickerRepository: TickerRepository + private val tickerRepository: TickerRepository, + private val cache: Cache> ) { private val log = LoggerFactory.getLogger(TickerService::class.java)!! @@ -34,8 +36,30 @@ class TickerService( var timestampToVar = timestampFrom + Intervals.HOUR while (timestampTo - timestampToVar >= Intervals.HOUR) { + tickers = tickers.mergeWith( - tickerRepository.find(symbol, epochDay, Date(timestampFromVar), Date(timestampToVar), interval) + Flux.defer { + val cachedValue = cache.get(cacheKey(symbol, timestampFromVar, timestampToVar, interval)) ?: mutableListOf() + when { + cachedValue.isNotEmpty() -> Flux.fromIterable(cachedValue) + else -> Flux.empty() + } + }.switchIfEmpty( + tickerRepository + .find(symbol, epochDay, Date(timestampFromVar), Date(timestampToVar), interval) + .map { ticker -> + val key = cacheKey(symbol, timestampFrom, timestampTo, interval) + val list = cache.get(key) + + if (list == null) { + cache.put(key, mutableListOf(ticker)) + } else { + list.add(ticker) + } + + ticker + } + ) ) timestampFromVar += Intervals.HOUR timestampToVar += Intervals.HOUR @@ -50,4 +74,8 @@ class TickerService( return tickers } + private fun cacheKey(symbol: String, timestampFrom: Long, timestampTo: Long, interval: Long): String { + return "${symbol}_${timestampFrom}_${timestampTo}_$interval" + } + } \ No newline at end of file