Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL cache retries more frequently on failures #806

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions online/src/main/scala/ai/chronon/online/TTLCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
package ai.chronon.online

import org.slf4j.LoggerFactory

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{AbstractExecutorService, ArrayBlockingQueue, ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function
import scala.util.Failure

object TTLCache {
private[TTLCache] val executor = FlexibleExecutionContext.buildExecutor
private[TTLCache] var executor: AbstractExecutorService = FlexibleExecutionContext.buildExecutor

// Used for testing.
def setExecutor(executor: AbstractExecutorService): Unit = {
this.executor = executor
}
}
// can continuously grow, only used for schemas
// has two methods apply & refresh. Apply uses a longer ttl before updating than refresh
Expand All @@ -33,7 +39,9 @@ class TTLCache[I, O](f: I => O,
contextBuilder: I => Metrics.Context,
ttlMillis: Long = 2 * 60 * 60 * 1000, // 2 hours
nowFunc: () => Long = { () => System.currentTimeMillis() },
refreshIntervalMillis: Long = 8 * 1000 // 8 seconds
refreshIntervalMillis: Long = 8 * 1000, // 8 seconds
// same as ttlMillis, so behavior is unchanged barring an override
failureTTLMillis: Long = 2 * 60 * 60 * 1000 // 2 hours
Copy link
Contributor

@nikhilsimha nikhilsimha Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should lower this to 30 seconds - I am no longer at airbnb, but I think it would benefit airbnb too. we have had incidents in the past, similar to yours where this could have helped.

cc: @pengyu-hou who is familiar with the Airbnb incident.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I feel 30 seconds may be too frequent as default. Some of these TTLCache is used to cache metadata, which doesn't get updated frequently anyway.

But I can see why for things like BatchIr cache, a more frequent failure refresh is desired.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nikhilsimha. Our incident was caused by a stale but valid metadata. To mitigate it, we would have to flush the TTL cache. This should be addressed with @yuli-han 's recent work that we will only fetch active configs.

I am curious what is the failureTTLMillis from Stripe side? @yizzlez

For failure cases, I agree that we should use a lower TTL.

) {
case class Entry(value: O, updatedAtMillis: Long, var markedForUpdate: AtomicBoolean = new AtomicBoolean(false))
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
Expand All @@ -60,8 +68,14 @@ class TTLCache[I, O](f: I => O,
contextBuilder(i).increment("cache.insert")
entry.value
} else {
val minFailureUpdateTTL = Math.min(intervalMillis, failureTTLMillis)
val shouldUpdate = entry.value match {
// Encountered a failure, update according to failure TTL.
case Failure(_) => nowFunc() - entry.updatedAtMillis > minFailureUpdateTTL
case _ => nowFunc() - entry.updatedAtMillis > intervalMillis
}
Comment on lines +71 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val minFailureUpdateTTL = Math.min(intervalMillis, failureTTLMillis)
val shouldUpdate = entry.value match {
// Encountered a failure, update according to failure TTL.
case Failure(_) => nowFunc() - entry.updatedAtMillis > minFailureUpdateTTL
case _ => nowFunc() - entry.updatedAtMillis > intervalMillis
}
val effectiveExpiry = entry.map(_ => intervalMillis).getOrElse(Math.min(intervalMillis, failureTTLMillis))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor simplification.

if (
(nowFunc() - entry.updatedAtMillis > intervalMillis) &&
shouldUpdate &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
shouldUpdate &&
(nowFunc() - entry.updatedAtMillis > effectiveExpiry) &&

// CAS so that update is enqueued only once per expired entry
entry.markedForUpdate.compareAndSet(false, true)
) {
Expand Down
138 changes: 138 additions & 0 deletions online/src/test/scala/ai/chronon/online/test/TTLCacheTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package ai.chronon.online.test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for adding this!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


import ai.chronon.online.{Metrics, TTLCache}

import scala.collection.mutable
import org.junit.{Before, Test}

import java.util.concurrent.{AbstractExecutorService, ExecutorService, SynchronousQueue, ThreadPoolExecutor, TimeUnit}
import scala.util.{Failure, Success, Try}

class TTLCacheTest {

class MockTime(var currentTime: Long) {

def getTime: Long = currentTime

def setTime(time: Long): Unit = {
currentTime = time
}
}

// Creates an executor that blocks and runs within the current thread.
def currentThreadExecutorService(): AbstractExecutorService = {
val callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy()

new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue[Runnable](), callerRunsPolicy) {
override def execute(command: Runnable): Unit = {
callerRunsPolicy.rejectedExecution(command, this)
}
}
}

var mockTime: MockTime = _
val nowFunc: () => Long = () => { mockTime.getTime}
val currentThreadExecutor: AbstractExecutorService = currentThreadExecutorService()
var ttlCache: TTLCache[String, Try[String]] = _
var fetchData: mutable.Map[String, Try[String]] = _
var fetchTimes: mutable.Map[String, Long] = _

@Before
def setupTTLCache(): Unit = {
mockTime = new MockTime(0L)
fetchData = mutable.Map[String, Try[String]]()
fetchTimes = mutable.Map[String, Long]()
TTLCache.setExecutor(currentThreadExecutor)

ttlCache = new TTLCache[String, Try[String]](
{
key => {
fetchTimes(key) = nowFunc()
fetchData(key)
}
},
{_ => Metrics.Context(environment = "test")},
100, // Normal timeout interval.
nowFunc,
20, // Refresh timeout interval.
10, // Failure timeout interval.
)
}

@Test
def testValidEntry(): Unit = {
// Test apply.
fetchData("valid_1") = Success("success_1") // valid_1 enters cache at t = 0
assert(ttlCache.apply("valid_1").get == "success_1") // key is correct.
assert(fetchTimes("valid_1") == 0L) // We actively fetch.
mockTime.setTime(1L) // Set to 1 ms.
assert(ttlCache.apply("valid_1").get == "success_1") // key is correct.
assert(fetchTimes("valid_1") == 0L) // We don't fetch.
mockTime.setTime(200L) // Expire the key.
assert(ttlCache.apply("valid_1").get == "success_1") // key is correct.
assert(fetchTimes("valid_1") == 200L) // We actively fetch.

// Test refresh.
mockTime.setTime(230L)
fetchData("valid_1") = Success("success_2")
assert(ttlCache.refresh("valid_1").get == "success_1") // Get old value.
assert(fetchTimes("valid_1") == 230L) // We actively fetch.
assert(ttlCache.apply("valid_1").get == "success_2") // Value is replaced.

// Test force.
mockTime.setTime(231L)
fetchData("valid_1") = Success("success_3")
assert(ttlCache.force("valid_1").get == "success_2") // Get old value.
assert(fetchTimes("valid_1") == 231L) // We actively fetch.
assert(ttlCache.apply("valid_1").get == "success_3") // Value is replaced.
}

@Test
def testFailureEntry(): Unit = {
// invalid_1 enters cache at t = 0
fetchData("invalid_1") = Failure(new Exception("test_exception_1"))
assert(ttlCache.apply("invalid_1").isFailure) // key is correct.
assert(fetchTimes("invalid_1") == 0L) // We actively fetch.
mockTime.setTime(20L) // Expire the key.
assert(ttlCache.apply("invalid_1").isFailure)
assert(fetchTimes("invalid_1") == 20L) // We actively fetch.

// Test refresh.
mockTime.setTime(31L) // This is under the refresh interval but we should still fetch.
assert(ttlCache.refresh("invalid_1").isFailure)
assert(fetchTimes("invalid_1") == 31L)

// Test force.
mockTime.setTime(32L) // Under force, we should always refetch.
assert(ttlCache.force("invalid_1").isFailure)
assert(fetchTimes("invalid_1") == 32L)
}

@Test
def testFailureRefreshes(): Unit = {
ttlCache = new TTLCache[String, Try[String]](
{
key => {
fetchTimes(key) = nowFunc()
fetchData(key)
}
},
{_ => Metrics.Context(environment = "test")},
100, // Normal timeout interval.
nowFunc,
20, // Refresh timeout interval (less than failure timeout in this case).
50, // Failure timeout interval.
)

fetchData("invalid_1") = Failure(new Exception("test_exception_1"))
assert(ttlCache.apply("invalid_1").isFailure) // key is correct.
assert(fetchTimes("invalid_1") == 0L) // We actively fetch.

mockTime.setTime(21L) // Hits refresh but not failure timeout interval.
fetchData("invalid_1") = Success("success_1")
assert(ttlCache.refresh("invalid_1").isFailure) // Return the old value.
assert(ttlCache.apply("invalid_1").get == "success_1")
assert(fetchTimes("invalid_1") == 21L)
}

}