Skip to content

Commit

Permalink
instrument HTTP/1 and HTTP/2 requests on the same server
Browse files Browse the repository at this point in the history
  • Loading branch information
ivantopo committed Apr 4, 2022
1 parent 63cf15c commit afd6c82
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kamon.instrumentation.akka.http;

import akka.NotUsed;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class FlowOpsMapAsyncAdvice {

public static class EndpointInfo {
public final String listenInterface;
public final int listenPort;

public EndpointInfo(String listenInterface, int listenPort) {
this.listenInterface = listenInterface;
this.listenPort = listenPort;
}
}

public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();

@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) akka.stream.scaladsl.FlowOps returnedFlow) {
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();

if(bindAndHandlerEndpoint != null) {
returnedFlow = ServerFlowWrapper.apply(
(Flow<HttpRequest, HttpResponse, NotUsed>) returnedFlow,
bindAndHandlerEndpoint.listenInterface,
bindAndHandlerEndpoint.listenPort
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Functio
@Advice.Argument(1) String iface,
@Advice.Argument(2) Integer port) {

FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
}

@Advice.OnMethodExit
public static void onExit() {
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ kanela.modules {

within = [
"akka.http.*",
"akka.grpc.internal.*"
"akka.grpc.internal.*",
"akka.stream.scaladsl.Flow",
"akka.stream.scaladsl.FlowOps"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
.intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor])
.intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor])

/**
* Support for HTTP/1 and HTTP/2 at the same time.
*/

onType("akka.stream.scaladsl.Flow")
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])

}

trait HasMatchingContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import akka.NotUsed
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

import scala.collection.immutable
Expand Down Expand Up @@ -103,6 +102,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
onType("akka.http.scaladsl.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])


/**
* Support for HTTP/1 and HTTP/2 at the same time.
*
*/

onType("akka.stream.scaladsl.FlowOps")
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])
}

trait HasMatchingContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.NotUsed
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

import scala.collection.immutable
Expand Down Expand Up @@ -86,6 +85,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
onType("akka.http.scaladsl.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

/**
* Support for HTTP/1 and HTTP/2 at the same time.
*
*/

onType("akka.stream.scaladsl.FlowOps")
.advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice])
}

trait HasMatchingContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

val (sslSocketFactory, trustManager) = clientSSL()
val okHttp = new OkHttpClient.Builder()
.sslSocketFactory(sslSocketFactory, trustManager)
.hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true })
.build()

val okHttp1ONly = new OkHttpClient.Builder()
.sslSocketFactory(sslSocketFactory, trustManager)
.protocols(List(Protocol.HTTP_1_1).asJava)
.hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true })
Expand All @@ -53,22 +58,24 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val httpWebServer = startServer(interface, 8081, https = false)
val httpsWebServer = startServer(interface, 8082, https = true)

testSuite("HTTP", httpWebServer)
testSuite("HTTPS", httpsWebServer)
testSuite("HTTP", httpWebServer, okHttp)
testSuite("HTTPS", httpsWebServer, okHttp)
testSuite("HTTPS with HTTP/1 only clients", httpsWebServer, okHttp1ONly)

def testSuite(httpVersion: String, server: WebServer) = {
def testSuite(httpVersion: String, server: WebServer, client: OkHttpClient) = {
val interface = server.interface
val port = server.port
val protocol = server.protocol

s"the Akka HTTP server instrumentation with ${httpVersion}" should {
"create a server Span when receiving requests" in {
val target = s"$protocol://$interface:$port/$dummyPathOk"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()


eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
span.tags.get(plain("http.url")) shouldBe target
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathOk")
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
span.metricTags.get(plain("http.method")) shouldBe "GET"
span.metricTags.get(plainLong("http.status_code")) shouldBe 200L
Expand All @@ -78,7 +85,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
"return the correct operation name with overloaded route" in {
val target = s"$protocol://$interface:$port/some_endpoint"

okHttp.newCall(new Request.Builder()
client.newCall(new Request.Builder()
.get()
.url(target).build())
.execute()
Expand All @@ -91,7 +98,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = s"extraction/nested/42/fixed/anchor/32/${UUID.randomUUID().toString}/fixed/44/CafE"
val expected = "/extraction/nested/{}/fixed/anchor/{}/{}/fixed/{}/{}"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -103,7 +110,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = "extraction/segment/special**"
val expected = "/extraction/segment/{}"
val target = s"$protocol://$interface:$port/$path"
val response = okHttp.newCall(new Request.Builder().url(target).build()).execute()
val response = client.newCall(new Request.Builder().url(target).build()).execute()

response.code() shouldBe 200
response.body().string() shouldBe "special**"
Expand All @@ -118,7 +125,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = "extraction/on-complete/42/more-path"
val expected = "/extraction/on-complete/{}/more-path"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -130,7 +137,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = "extraction/on-success/42/after"
val expected = "/extraction/on-success/{}/after"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -142,7 +149,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = "extraction/complete-or-recover-with/42/after"
val expected = "/extraction/complete-or-recover-with/{}/after"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -154,7 +161,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = "extraction/complete-or-recover-with-success/42/after"
val expected = "/extraction/complete-or-recover-with-success/{}"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -166,7 +173,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
val path = s"v3/user/3/post/3"
val expected = "/v3/user/{}/post/{}"
val target = s"$protocol://$interface:$port/$path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -177,12 +184,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"change the Span operation name when using the operationName directive" in {
val target = s"$protocol://$interface:$port/$traceOk"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "user-supplied-operation"
span.tags.get(plain("http.url")) shouldBe target
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$traceOk")
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
span.metricTags.get(plain("http.method")) shouldBe "GET"
span.metricTags.get(plainLong("http.status_code")) shouldBe 200L
Expand All @@ -191,12 +198,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"mark spans as failed when request fails" in {
val target = s"$protocol://$interface:$port/$dummyPathError"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe s"/$dummyPathError"
span.tags.get(plain("http.url")) shouldBe target
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathError")
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
span.metricTags.get(plain("http.method")) shouldBe "GET"
span.metricTags.get(plainBoolean("error")) shouldBe true
Expand All @@ -206,12 +213,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"change the operation name to 'unhandled' when the response status code is 404" in {
val target = s"$protocol://$interface:$port/unknown-path"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "unhandled"
span.tags.get(plain("http.url")) shouldBe target
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/unknown-path")
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
span.metricTags.get(plain("http.method")) shouldBe "GET"
span.metricTags.get(plainBoolean("error")) shouldBe false
Expand All @@ -221,7 +228,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"correctly time entity transfer timings" in {
val target = s"$protocol://$interface:$port/$stream"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

val span = eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand All @@ -233,14 +240,14 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala
case List(_ @ Mark(_, "http.response.ready")) =>
}

span.tags.get(plain("http.url")) shouldBe target
span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$stream")
span.metricTags.get(plain("component")) shouldBe "akka.http.server"
span.metricTags.get(plain("http.method")) shouldBe "GET"
}

"include the trace-id and keep all user-provided headers in the responses" in {
val target = s"$protocol://$interface:$port/extra-header"
val response = okHttp.newCall(new Request.Builder().url(target).build()).execute()
val response = client.newCall(new Request.Builder().url(target).build()).execute()

response.headers().names() should contain allOf (
"trace-id",
Expand All @@ -250,7 +257,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"keep operation names provided by the HTTP Server instrumentation" in {
val target = s"$protocol://$interface:$port/name-will-be-changed"
okHttp.newCall(new Request.Builder().url(target).build()).execute()
client.newCall(new Request.Builder().url(target).build()).execute()

eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand Down

0 comments on commit afd6c82

Please sign in to comment.