Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.query

/**
* (Optional) mechanism for query implementations to pick up a correlation id from the caller, to use in logging and
* error messages. Used by akka-projections to make correlating projection logs with debug and trace logging from the
* underlying akka persistence query implementations possible.
*/
object QueryCorrelationId {

private val threadLocal = new ThreadLocal[String]

/**
* Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
* to make sure there is no leak between logic executed on shared threads.
*/
def withCorrelationId[T](correlationId: String)(block: () => T): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll probably only use this ourselves, with Scala, but shall we think about if it works for Java if we are making it public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We could mark it as internal, but it will need to be binary compatible either way so I thought it can just as well be public API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Java API

threadLocal.set(correlationId)
try {
block()
} finally {
threadLocal.remove()
}
}

/**
* Expected to be used "around" calls to plugin query method to pass along a prevously extracted optional correlation id,
* will clear the correlation id from thread local to make sure there is no leak between logic executed on shared threads.
*/
def withCorrelationId[T](correlationId: Option[String])(block: () => T): T = {
correlationId match {
case None => block()
case Some(actualId) => withCorrelationId(actualId)(block)
}
}

/**
* @return Expected to be called directly after receiving a query call, before starting any asynchronous tasks,
* returns and clears out the correlation id to make sure there is no leak between tasks. Further passing
Copy link
Contributor

@patriknw patriknw Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method doesn't "clears out"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups, leftover from previous iteration

* around of the uuid inside the query plugin implementation is up to the implementer.
*/
def get(): Option[String] =
Option(threadLocal.get)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.query

import akka.testkit.TestException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.UUID

class QueryCorrelationIdSpec extends AnyWordSpecLike with Matchers {

def pretendQueryMethod(): Option[String] =
QueryCorrelationId.get()

"The query correlation id utility" should {

"pass and clear correlation id" in {
val uuid = UUID.randomUUID().toString
val observed =
QueryCorrelationId.withCorrelationId(uuid) { () =>
pretendQueryMethod()
}
observed shouldEqual Some(uuid)

// cleared after returning
QueryCorrelationId.get() shouldBe None
}

"pass along and clear correlation id if present" in {
val uuid = UUID.randomUUID().toString
val observed =
QueryCorrelationId.withCorrelationId(Some(uuid)) { () =>
pretendQueryMethod()
}
observed shouldEqual Some(uuid)

// cleared after returning
QueryCorrelationId.get() shouldBe None
}

"just invoke the block if correlation id not present" in {
val observed =
QueryCorrelationId.withCorrelationId(None) { () =>
pretendQueryMethod()
}
observed shouldEqual None
}

"clear correlation id when call fails" in {
val uuid = UUID.randomUUID().toString
intercept[TestException] {
QueryCorrelationId.withCorrelationId(uuid) { () =>
throw TestException("boom")
}
}

// cleared after throwing
QueryCorrelationId.get() shouldBe None
}

}

}