Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.query

import java.util.Optional
import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional

/**
* (Optional) mechanism for query implementations to pick up a correlation id from the caller, to use in logging and
* error messages. Used by akka-projections to make correlating projection logs with debug and trace logging from the
* underlying akka persistence query implementations possible.
*/
object QueryCorrelationId {

private val threadLocal = new ThreadLocal[String]

/**
* Scala API: Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
* to make sure there is no leak between logic executed on shared threads.
*/
def withCorrelationId[T](correlationId: String)(f: () => T): T = {
threadLocal.set(correlationId)
try {
f()
} finally {
threadLocal.remove()
}
}

/**
* Scala API: Expected to be used "around" calls to plugin query method to pass along a previously extracted optional correlation id,
* will clear the correlation id from thread local to make sure there is no leak between logic executed on shared threads.
*/
def withCorrelationId[T](correlationId: Option[String])(f: () => T): T = {
correlationId match {
case None => f()
case Some(actualId) => withCorrelationId(actualId)(f)
}
}

/**
* Scala API: Expected to be called by query plugins directly after receiving a query call, before starting any asynchronous tasks.
* Calling code is responsible to clear it out after method returns. The value is stored in a thread local so is not available
* across threads or streams. Further passing around of the uuid inside the query plugin implementation is up to the implementer.
*/
def get(): Option[String] =
Option(threadLocal.get)

/**
* Java API: Expected to be used "around" calls to plugin query method to pass along a previously extracted optional correlation id,
* will clear the correlation id from thread local to make sure there is no leak between logic executed on shared threads.
*/
def callWithCorrelationId[T](correlationId: Optional[String], function: java.util.function.Supplier[T]): T =
withCorrelationId(correlationId.toScala)(function.get _)

/**
* Java API: Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
* to make sure there is no leak between logic executed on shared threads.
*/
def callWithCorrelationId[T](correlationId: String, function: java.util.function.Supplier[T]): T =
withCorrelationId(correlationId)(function.get _)

/**
* Java API: Expected to be called by query plugins directly after receiving a query call, before starting any asynchronous tasks.
* Calling code is responsible to clear it out after method returns. The value is stored in a thread local so is not available
* across threads or streams. Further passing around of the uuid inside the query plugin implementation is up to the implementer.
*/
def getCorrelationId(): Optional[String] =
get().toJava

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.query

import akka.testkit.TestException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.UUID

class QueryCorrelationIdSpec extends AnyWordSpecLike with Matchers {

def pretendQueryMethod(): Option[String] =
QueryCorrelationId.get()

"The query correlation id utility" should {

"pass and clear correlation id" in {
val uuid = UUID.randomUUID().toString
val observed =
QueryCorrelationId.withCorrelationId(uuid) { () =>
pretendQueryMethod()
}
observed shouldEqual Some(uuid)

// cleared after returning
QueryCorrelationId.get() shouldBe None
}

"pass along and clear correlation id if present" in {
val uuid = UUID.randomUUID().toString
val observed =
QueryCorrelationId.withCorrelationId(Some(uuid)) { () =>
pretendQueryMethod()
}
observed shouldEqual Some(uuid)

// cleared after returning
QueryCorrelationId.get() shouldBe None
}

"just invoke the block if correlation id not present" in {
val observed =
QueryCorrelationId.withCorrelationId(None) { () =>
pretendQueryMethod()
}
observed shouldEqual None
}

"clear correlation id when call fails" in {
val uuid = UUID.randomUUID().toString
intercept[TestException] {
QueryCorrelationId.withCorrelationId(uuid) { () =>
throw TestException("boom")
}
}

// cleared after throwing
QueryCorrelationId.get() shouldBe None
}

}

}